In [1]:
import numpy as np
import pandas as pd 
import scipy.stats

from statsmodels.formula.api import ols
from statsmodels.stats.anova import anova_lm
import warnings
warnings.filterwarnings('ignore')

In [2]:
import findspark
from pyspark.sql import SparkSession
findspark.init()
spark = SparkSession \
    .builder \
    .appName("df_pre") \
    .config('spark.sql.session.timeZone', 'Asia/Shanghai') \
    .master("local[*]") \
    .getOrCreate()

In [3]:
df = pd.read_csv("D:/AllData/onewayanova_test.csv")
df 

Unnamed: 0,blood,value
0,patient,0.84
1,patient,1.05
2,patient,1.2
3,patient,1.2
4,patient,1.39
5,patient,1.53
6,patient,1.67
7,patient,1.8
8,patient,1.87
9,patient,2.07


#### pandas_df转成pyspark.sql.dataframe

In [4]:
df_spark = spark.createDataFrame(df)
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [5]:
df_spark.show()

+-------+-----+
|  blood|value|
+-------+-----+
|patient| 0.84|
|patient| 1.05|
|patient|  1.2|
|patient|  1.2|
|patient| 1.39|
|patient| 1.53|
|patient| 1.67|
|patient|  1.8|
|patient| 1.87|
|patient| 2.07|
|patient| 2.11|
|healthy| 0.54|
|healthy| 0.64|
|healthy| 0.64|
|healthy| 0.75|
|healthy| 0.76|
|healthy| 0.81|
|healthy| 1.16|
|healthy|  1.2|
+-------+-----+



#### pyspark.sql.dataframe转成pandas_df

In [6]:
df_pandas = df_spark.toPandas()
type(df_pandas)

pandas.core.frame.DataFrame

In [7]:
df_pandas

Unnamed: 0,blood,value
0,patient,0.84
1,patient,1.05
2,patient,1.2
3,patient,1.2
4,patient,1.39
5,patient,1.53
6,patient,1.67
7,patient,1.8
8,patient,1.87
9,patient,2.07


In [14]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

pyspark.sql.dataframe 转成 pyspark.pandas：两种方法

In [11]:
df1 = df_spark.to_pandas_on_spark()
type(df1)

pyspark.pandas.frame.DataFrame

In [15]:
df11 = df_spark.pandas_api()
type(df11)

pyspark.pandas.frame.DataFrame

pyspark.pandas 转成 pyspark.sql.dataframe

In [12]:
df2 = df1.to_spark()
type(df2)

pyspark.sql.dataframe.DataFrame

#### 方差分析

In [22]:
model = ols('value ~ C(blood)', df_pandas).fit()
anovaResults = anova_lm(model)
anovaResults

Unnamed: 0,df,sum_sq,mean_sq,F,PR(>F)
C(blood),1.0,2.324328,2.324328,18.039093,0.000543
Residual,17.0,2.190441,0.128849,,


#### pandas_df转成pyspark.sql.dataframe

In [26]:
type(anovaResults)

pandas.core.frame.DataFrame

In [27]:
import pyspark.pandas as ps
spark_results = ps.from_pandas(anovaResults).to_spark()
type(spark_results)

pyspark.sql.dataframe.DataFrame

In [28]:
spark_results.show()

+----+------------------+------------------+------------------+--------------------+
|  df|            sum_sq|           mean_sq|                 F|              PR(>F)|
+----+------------------+------------------+------------------+--------------------+
| 1.0|2.3243275119617217|2.3243275119617217|18.039093197792972|5.433378373621748E-4|
|17.0| 2.190440909090909|0.1288494652406417|              null|                null|
+----+------------------+------------------+------------------+--------------------+



#### pyspark.df基本操作

In [5]:
import pyspark.pandas as ps
df = pd.read_csv("D:/AllData/onewayanova_test.csv")
df_spark = ps.from_pandas(df).to_spark()
df_spark.show()



+-------+-----+
|  blood|value|
+-------+-----+
|patient| 0.84|
|patient| 1.05|
|patient|  1.2|
|patient|  1.2|
|patient| 1.39|
|patient| 1.53|
|patient| 1.67|
|patient|  1.8|
|patient| 1.87|
|patient| 2.07|
|patient| 2.11|
|healthy| 0.54|
|healthy| 0.64|
|healthy| 0.64|
|healthy| 0.75|
|healthy| 0.76|
|healthy| 0.81|
|healthy| 1.16|
|healthy|  1.2|
+-------+-----+



In [6]:
df_spark.select(df_spark.value).show(3)

+-----+
|value|
+-----+
| 0.84|
| 1.05|
|  1.2|
+-----+
only showing top 3 rows



In [10]:
from pyspark.sql.functions import col
df_spark.withColumn('value_more', col('value') + 1).show(3)

+-------+-----+------------------+
|  blood|value|        value_more|
+-------+-----+------------------+
|patient| 0.84|1.8399999999999999|
|patient| 1.05|              2.05|
|patient|  1.2|               2.2|
+-------+-----+------------------+
only showing top 3 rows



In [18]:
df_spark.filter("blood == 'healthy'").show()

+-------+-----+
|  blood|value|
+-------+-----+
|healthy| 0.54|
|healthy| 0.64|
|healthy| 0.64|
|healthy| 0.75|
|healthy| 0.76|
|healthy| 0.81|
|healthy| 1.16|
|healthy|  1.2|
+-------+-----+



In [20]:
df_spark_healthy = df_spark.where("blood == 'healthy'")
df_spark_healthy.show()

+-------+-----+
|  blood|value|
+-------+-----+
|healthy| 0.54|
|healthy| 0.64|
|healthy| 0.64|
|healthy| 0.75|
|healthy| 0.76|
|healthy| 0.81|
|healthy| 1.16|
|healthy|  1.2|
+-------+-----+



Scalar Pandas UDF用于向量化标量操作。常常与select和withColumn等函数一起使用。

其中调用的Python函数需要使用pandas.Series作为输入并返回一个具有相同长度的pandas.Series。

Scalar UDF定义了一个转换，函数输入一个或多个pd.Series,输出一个pd.Series，函数的输出和输入有相同的长度

In [70]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)     # long是长整型，double是浮点型
def pandas_plus_value(series):
    return series + 1

df_spark_healthy.withColumn("value+1", pandas_plus_value(df_spark_healthy.value)).show()

+-------+-----+------------------+
|  blood|value|           value+1|
+-------+-----+------------------+
|healthy| 0.54|              1.54|
|healthy| 0.64|1.6400000000000001|
|healthy| 0.64|1.6400000000000001|
|healthy| 0.75|              1.75|
|healthy| 0.76|              1.76|
|healthy| 0.81|              1.81|
|healthy| 1.16|              2.16|
|healthy|  1.2|               2.2|
+-------+-----+------------------+



In [71]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
 
@pandas_udf('double', PandasUDFType.SCALAR)
def multiply_func(a, b):
    return a * b

df_spark_healthy.withColumn("value_multiply", multiply_func(df_spark_healthy.value, df_spark_healthy.value)).show()

+-------+-----+------------------+
|  blood|value|    value_multiply|
+-------+-----+------------------+
|healthy| 0.54|            0.2916|
|healthy| 0.64|            0.4096|
|healthy| 0.64|            0.4096|
|healthy| 0.75|            0.5625|
|healthy| 0.76|            0.5776|
|healthy| 0.81|0.6561000000000001|
|healthy| 1.16|            1.3456|
|healthy|  1.2|              1.44|
+-------+-----+------------------+



In [72]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType(), PandasUDFType.SCALAR)   # 相当于'double'
def multiply_func(a, b):
    return a * b

df_spark_healthy.withColumn("value_multiply", multiply_func(df_spark_healthy.value, df_spark_healthy.value)).show()

+-------+-----+------------------+
|  blood|value|    value_multiply|
+-------+-----+------------------+
|healthy| 0.54|            0.2916|
|healthy| 0.64|            0.4096|
|healthy| 0.64|            0.4096|
|healthy| 0.75|            0.5625|
|healthy| 0.76|            0.5776|
|healthy| 0.81|0.6561000000000001|
|healthy| 1.16|            1.3456|
|healthy|  1.2|              1.44|
+-------+-----+------------------+



In [45]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.value == 0.54]
        
df_spark_healthy.mapInPandas(pandas_filter_func, schema=df_spark_healthy.schema).show()

+-------+-----+
|  blood|value|
+-------+-----+
|healthy| 0.54|
+-------+-----+



Grouped map（分组映射）panda_udf与groupBy().apply()一起使用，后者实现了“split-apply-combine”模式

定义每个分组的Python计算函数，这里可以使用pandas包或者Python自带方法。

一个StructType对象或字符串，它定义输出DataFrame的格式，包括输出特征以及特征类型。

需要注意的是，StructType对象中的Dataframe特征顺序需要与分组中的Python计算函数返回特征顺序保持一致。

GROUPED_MAP UDF定义了转换：pd.DataFrame -> pd.DataFrame

In [67]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(df_spark.schema, PandasUDFType.GROUPED_MAP)    # 'blood string, value double' = df_spark.schema
def subtract_mean(df):
    value = df.value
    return df.assign(value = value - value.mean())
    
df_spark.groupBy('blood').apply(subtract_mean).show()

+-------+--------------------+
|  blood|               value|
+-------+--------------------+
|healthy|-0.27249999999999996|
|healthy|             -0.1725|
|healthy|             -0.1725|
|healthy|             -0.0625|
|healthy|-0.05249999999999999|
|healthy|-0.00249999999999...|
|healthy|  0.3474999999999999|
|healthy| 0.38749999999999996|
|patient|  -0.680909090909091|
|patient|-0.47090909090909094|
|patient|-0.32090909090909103|
|patient|-0.32090909090909103|
|patient|-0.13090909090909109|
|patient|0.009090909090909038|
|patient| 0.14909090909090894|
|patient| 0.27909090909090906|
|patient|  0.3490909090909091|
|patient|  0.5490909090909089|
|patient|  0.5890909090909089|
+-------+--------------------+



In [69]:
@pandas_udf('blood string, value double, col1 double', PandasUDFType.GROUPED_MAP)  # 不能用df_spark.schema，因为增加了新的一列col1
def subtract_mean(df):
    value = df.value
    return df.assign(col1 = value - value.mean())
    
df_spark.groupBy('blood').apply(subtract_mean).show()

+-------+-----+--------------------+
|  blood|value|                col1|
+-------+-----+--------------------+
|healthy| 0.54|-0.27249999999999996|
|healthy| 0.64|             -0.1725|
|healthy| 0.64|             -0.1725|
|healthy| 0.75|             -0.0625|
|healthy| 0.76|-0.05249999999999999|
|healthy| 0.81|-0.00249999999999...|
|healthy| 1.16|  0.3474999999999999|
|healthy|  1.2| 0.38749999999999996|
|patient| 0.84|  -0.680909090909091|
|patient| 1.05|-0.47090909090909094|
|patient|  1.2|-0.32090909090909103|
|patient|  1.2|-0.32090909090909103|
|patient| 1.39|-0.13090909090909109|
|patient| 1.53|0.009090909090909038|
|patient| 1.67| 0.14909090909090894|
|patient|  1.8| 0.27909090909090906|
|patient| 1.87|  0.3490909090909091|
|patient| 2.07|  0.5490909090909089|
|patient| 2.11|  0.5890909090909089|
+-------+-----+--------------------+



GROUPED_AGG定义了一个或多个pandas.Series -> 一个scalar，scalar的返回值类型（returnType）应该是原始数据类型

Grouped aggregate Panda UDF常常与groupBy().agg()和pyspark.sql.window一起使用。 

需要注意的是，这种类型的UDF不支持部分聚合，组或窗口的所有数据都将加载到内存中。

In [79]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def count_num(v):
    return v.mean()

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def count_std(v):
    return v.std()

df_spark.groupBy("blood").agg(  count_num(df_spark['value']).alias('avg_value'),  
                                count_std(df_spark['value']).alias('avg_std')     ).show()

+-------+-----------------+-------------------+
|  blood|        avg_value|            avg_std|
+-------+-----------------+-------------------+
|healthy|           0.8125|0.24241346025805932|
|patient|1.520909090909091| 0.4217927108297284|
+-------+-----------------+-------------------+

