In [1]:

import pandas as pd
from pyspark.sql.functions import struct, col
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType, DoubleType

from sparkstudy.deploy.demo_sessions import DemoSQLSessionFactory
from sparkstudy.libs.tools import create_random_data
%load_ext autoreload
%autoreload 2
%matplotlib inline

COLUMNS = ["name","age","salary"]

比较下开不开启arrow的区别

测试下来，感觉性能提升有点奇怪。有时候会快，有时候会慢。

In [2]:
def test_performance(session_factory:DemoSQLSessionFactory, n:int = 100000):
    data = create_random_data(n)
    spark_session = session_factory.build_session()
    df = spark_session.createDataFrame(data,COLUMNS).cache()
    df.toPandas().head(5)

In [3]:
session_factory_arrow = DemoSQLSessionFactory(name="with arraw")
session_factory_arrow.add_config("spark.sql.execution.arrow.pyspark.enabled","true")
%time test_performance(session_factory_arrow)

CPU times: user 2.64 s, sys: 36.7 ms, total: 2.68 s
Wall time: 11.3 s


常规的HelloWorld的example。
页面上面的第一个例子。本质就是生成一个新的dataframe
1. 在annotation上面列出的是新的dataframe的col和类型
2. 他会自动的把pd的转换成spark的
3. 函数应该会分批node执行。然后再汇总。因为我看到了。hello world的函数会被执行好几次

In [4]:
session_factory_arrow.add_config('spark.sql.execution.arrow.maxRecordsPerBatch',10)
spark = session_factory_arrow.build_session()
test_data = create_random_data(row_num=1000)
basic_df = spark.createDataFrame(test_data,COLUMNS)
basic_df.show()

+----+---+--------------------+
|name|age|              salary|
+----+---+--------------------+
|   W|205| 0.04327623602354236|
|   Y|958|  0.9260153652760735|
|   H| 45| 0.06502276504831062|
|   C|988|  0.3057879003138224|
|   Y|530| 0.12418639405654952|
|   H|689|  0.9823893447660684|
|   E|902|  0.9772530265771143|
|   J|659|  0.5908572731268977|
|   W| 36|  0.7185792840437409|
|   S|279|  0.8563242774434922|
|   W|671|   0.827294413531321|
|   V|698| 0.44847719327825175|
|   X|829|  0.9563856623025289|
|   O|422|  0.7659027237465561|
|   R|556| 0.30253081988566743|
|   W|710|0.005506032879962874|
|   D|217| 0.13622538180912724|
|   D|685| 0.32675390373979074|
|   R|107| 0.15811854785577317|
|   B|556|  0.9787700561733383|
+----+---+--------------------+
only showing top 20 rows



In [5]:
@pandas_udf("total double")
def func(s1: pd.Series, s2: pd.Series) -> pd.DataFrame:
    print("execute")
    s3 = pd.DataFrame()
    s3['total'] = s1 + s2
    return s3
basic_df.select(func("age","salary").alias("result")).show()

+--------------------+
|              result|
+--------------------+
|[205.04327623602353]|
|  [958.926015365276]|
| [45.06502276504831]|
| [988.3057879003138]|
| [530.1241863940566]|
|  [689.982389344766]|
| [902.9772530265772]|
| [659.5908572731269]|
| [36.71857928404374]|
| [279.8563242774435]|
| [671.8272944135314]|
| [698.4484771932782]|
| [829.9563856623025]|
|[422.76590272374654]|
| [556.3025308198856]|
| [710.0055060328799]|
|[217.13622538180914]|
| [685.3267539037398]|
|[107.15811854785578]|
| [556.9787700561733]|
+--------------------+
only showing top 20 rows



主要是想要看看。select方法，不能不能接受一个List

In [6]:
def to_str_func(s1: pd.Series) -> pd.Series:
    return s1.astype(dtype=str)
to_str = pandas_udf(to_str_func, returnType=StringType())

age_c = to_str("age").alias("age")
salary_c = to_str("salary").alias("salary")
selects = [age_c,salary_c]
basic_df.select(selects).show()

+---+--------------------+
|age|              salary|
+---+--------------------+
|205| 0.04327623602354236|
|958|  0.9260153652760735|
| 45| 0.06502276504831062|
|988|  0.3057879003138224|
|530| 0.12418639405654952|
|689|  0.9823893447660684|
|902|  0.9772530265771143|
|659|  0.5908572731268977|
| 36|  0.7185792840437409|
|279|  0.8563242774434922|
|671|   0.827294413531321|
|698| 0.44847719327825175|
|829|  0.9563856623025289|
|422|  0.7659027237465561|
|556| 0.30253081988566743|
|710|0.005506032879962874|
|217| 0.13622538180912724|
|685| 0.32675390373979074|
|107| 0.15811854785577317|
|556|  0.9787700561733383|
+---+--------------------+
only showing top 20 rows



测试以下。如果参数是不定的行不行

简单的来书，
- 确定的column个数，用Series
- 不确定用dataframe
- iterator是类似用流

In [7]:
@pandas_udf("double")
def to_sum_func(data: pd.DataFrame) -> pd.Series:
    return data.age*data.salary
cols = [col("age"),col("salary")]
headers = struct(cols)
#my_sum = pandas_udf(to_sum_func, returnType=DoubleType())
basic_df.select(to_sum_func(headers).alias("result")).show()


+------------------+
|            result|
+------------------+
| 8.871628384826183|
| 887.1227199344785|
| 2.926024427173978|
| 302.1184455100565|
| 65.81878884997124|
| 676.8662585438211|
| 881.4822299725571|
| 389.3749429906256|
|25.868854225574673|
|238.91447340673432|
| 555.1145514795164|
| 313.0370809082197|
| 792.8437140487964|
| 323.2109494210467|
|168.20713585643108|
| 3.909283344773641|
|29.560907852580613|
|223.82642406175665|
| 16.91868462056773|
| 544.1961512323761|
+------------------+
only showing top 20 rows



能不能用于SQL

In [8]:
basic_df.createOrReplaceTempView("pandas_udf")
spark.udf.register("pandas_to_str", to_str)
spark.sql("select pandas_to_str(age) from pandas_udf").show()

+------------------+
|pandas_to_str(age)|
+------------------+
|               205|
|               958|
|                45|
|               988|
|               530|
|               689|
|               902|
|               659|
|                36|
|               279|
|               671|
|               698|
|               829|
|               422|
|               556|
|               710|
|               217|
|               685|
|               107|
|               556|
+------------------+
only showing top 20 rows



basic_df.createOrReplaceTempView("pandas_udf")
spark.udf.register("pandas_to_str", to_str)
spark.sql("select pandas_to_str(age) from pandas_udf").show()

`__call__`这个方法能不能用哪用

In [9]:
class PandasFunc:
    def __call__(self, data: pd.DataFrame)-> pd.Series:
         return data.age*data.salary

cols = [col("age"),col("salary")]
headers = struct(cols)
class_my_sum = pandas_udf(PandasFunc(), returnType=DoubleType())
basic_df.select(class_my_sum(headers).alias("result")).show()

+------------------+
|            result|
+------------------+
| 8.871628384826183|
| 887.1227199344785|
| 2.926024427173978|
| 302.1184455100565|
| 65.81878884997124|
| 676.8662585438211|
| 881.4822299725571|
| 389.3749429906256|
|25.868854225574673|
|238.91447340673432|
| 555.1145514795164|
| 313.0370809082197|
| 792.8437140487964|
| 323.2109494210467|
|168.20713585643108|
| 3.909283344773641|
|29.560907852580613|
|223.82642406175665|
| 16.91868462056773|
| 544.1961512323761|
+------------------+
only showing top 20 rows



返回多列的处理方法。

In [10]:
@pandas_udf("col1 double, col2 double")
def to_multi_return_func(data: pd.DataFrame) -> pd.DataFrame:
    print("execute")
    s3 = pd.DataFrame()
    s3['col1'] = data.age
    s3['col2'] = data.salary
    return s3
cols = [col("age"),col("salary")]
headers = struct(cols)
#my_sum = pandas_udf(to_sum_func, returnType=DoubleType())
multi_return_df = basic_df.withColumn("abc",to_multi_return_func(headers))
multi_return_df.select(col("age"),col("salary"),col("abc.col1"),col("abc.col2")).show()

+---+--------------------+-----+--------------------+
|age|              salary| col1|                col2|
+---+--------------------+-----+--------------------+
|205| 0.04327623602354236|205.0| 0.04327623602354236|
|958|  0.9260153652760735|958.0|  0.9260153652760735|
| 45| 0.06502276504831062| 45.0| 0.06502276504831062|
|988|  0.3057879003138224|988.0|  0.3057879003138224|
|530| 0.12418639405654952|530.0| 0.12418639405654952|
|689|  0.9823893447660684|689.0|  0.9823893447660684|
|902|  0.9772530265771143|902.0|  0.9772530265771143|
|659|  0.5908572731268977|659.0|  0.5908572731268977|
| 36|  0.7185792840437409| 36.0|  0.7185792840437409|
|279|  0.8563242774434922|279.0|  0.8563242774434922|
|671|   0.827294413531321|671.0|   0.827294413531321|
|698| 0.44847719327825175|698.0| 0.44847719327825175|
|829|  0.9563856623025289|829.0|  0.9563856623025289|
|422|  0.7659027237465561|422.0|  0.7659027237465561|
|556| 0.30253081988566743|556.0| 0.30253081988566743|
|710|0.005506032879962874|71

这里的目的，主要还是为了验证一下partitionBy的用法

[normalize pyspark dataframe by group](https://stackoverflow.com/questions/54112439/normalize-pyspark-data-frame-by-group)

In [11]:
partition_key_data = [('A',x) for x in range(10)]+[('B',x) for x in range(10,20)]
partition_df = spark.createDataFrame(partition_key_data,["name","value"]).cache()
partition_df.show()

+----+-----+
|name|value|
+----+-----+
|   A|    0|
|   A|    1|
|   A|    2|
|   A|    3|
|   A|    4|
|   A|    5|
|   A|    6|
|   A|    7|
|   A|    8|
|   A|    9|
|   B|   10|
|   B|   11|
|   B|   12|
|   B|   13|
|   B|   14|
|   B|   15|
|   B|   16|
|   B|   17|
|   B|   18|
|   B|   19|
+----+-----+



In [12]:
@pandas_udf("new double,name string,value double",functionType=PandasUDFType.GROUPED_MAP)
def group_by_normalize(data) -> pd.DataFrame:
    value = data["value"]
    df = (value - value.mean())/value.std()
    data['new'] = df
    return data
partition_df.groupby("name").apply(group_by_normalize).show()



+--------------------+----+-----+
|                 new|name|value|
+--------------------+----+-----+
| -1.4863010829205867|   B| 10.0|
| -1.1560119533826787|   B| 11.0|
| -0.8257228238447705|   B| 12.0|
|-0.49543369430686224|   B| 13.0|
| -0.1651445647689541|   B| 14.0|
|  0.1651445647689541|   B| 15.0|
| 0.49543369430686224|   B| 16.0|
|  0.8257228238447705|   B| 17.0|
|  1.1560119533826787|   B| 18.0|
|  1.4863010829205867|   B| 19.0|
| -1.4863010829205867|   A|  0.0|
| -1.1560119533826787|   A|  1.0|
| -0.8257228238447705|   A|  2.0|
|-0.49543369430686224|   A|  3.0|
| -0.1651445647689541|   A|  4.0|
|  0.1651445647689541|   A|  5.0|
| 0.49543369430686224|   A|  6.0|
|  0.8257228238447705|   A|  7.0|
|  1.1560119533826787|   A|  8.0|
|  1.4863010829205867|   A|  9.0|
+--------------------+----+-----+



follow can't work
```python
import numpy as np
w = Window.partitionBy('name')
@pandas_udf("new double",functionType=PandasUDFType.GROUPED_MAP)
def group_by_normalize_2(data:pd.Series) -> pd.DataFrame:
    norm = (data - data.mean())/data.std()
    res = pd.DataFrame
    res["new"] = norm
    print(res)
    return res

partition_df.withColumn("new",group_by_normalize_2(col("value")).over(w)).show()
```