In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Demo") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .getOrCreate()

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, lit
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType

In [37]:
df = spark.range(0, 1000 * 1000).withColumn('id', (col('id') / 10000).cast('integer')).withColumn('v', rand())
#df.cache()
df.count()

#df.show()

1000000

In [None]:
@udf('double')
def plus_one(v):
    return v + 1

%timeit df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

In [None]:
@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

%timeit df.withColumn('v', pandas_plus_one(df.v)).agg(count(col('v'))).show()

In [None]:
import pandas as pd
from scipy import stats

@udf('double')
def cdf(v):
    # stats.norm.cdf works on ints, lists, Pandas Series and NumPy arrays
    return float(stats.norm.cdf(v))

%timeit df.withColumn('cumulative_probability', cdf(df.v)).agg(count(col('cumulative_probability'))).show()

In [None]:
import pandas as pd
from scipy import stats

# returnType=PandasUDFType.SCALAR is implied
@pandas_udf('double')
def pandas_cdf(v):
    return pd.Series(stats.norm.cdf(v))

%timeit df.withColumn('cumulative_probability', pandas_cdf(df.v)).agg(count(col('cumulative_probability'))).show()

In [27]:
# Grouped Map
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
    [(1, 1.0, "apple"),
     (1, 2.0, "pear"),
     (2, 3.0, "orange"),
     (2, 5.0, "grape"),
     (2, 10.0, "peach")],
    ("id", "v", "fruit"))

@pandas_udf("id long, v double, norm double, fruit string", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(norm=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).show()

+---+----+-------------------+------+
| id|   v|               norm| fruit|
+---+----+-------------------+------+
|  1| 1.0|-0.7071067811865475| apple|
|  1| 2.0| 0.7071067811865475|  pear|
|  2| 3.0|-0.8320502943378437|orange|
|  2| 5.0|-0.2773500981126146| grape|
|  2|10.0| 1.1094003924504583| peach|
+---+----+-------------------+------+



In [68]:
import databricks.koalas as ks
import numpy as np

kdf = df.to_koalas()

#kdf['norm'] = kdf.v.sub(kdf.v.mean()).div(kdf.v.std())

#kdf.groupby(['id']).v.(1) #.sub(kdf.v.mean()).div(kdf.v.std())
#kdf.groupby(['id', 'v']).min()

def printer(s) -> np.int64:
    return s*2

kdf['id'].apply(printer)

0      0
1      0
2      0
3      0
4      0
5      0
6      0
7      0
8      0
9      0
10     0
11     0
12     0
13     0
14     0
15     0
16     0
17     0
18     0
19     0
20     0
21     0
22     0
23     0
24     0
25     0
26     0
27     0
28     0
29     0
      ..
970    0
971    0
972    0
973    0
974    0
975    0
976    0
977    0
978    0
979    0
980    0
981    0
982    0
983    0
984    0
985    0
986    0
987    0
988    0
989    0
990    0
991    0
992    0
993    0
994    0
995    0
996    0
997    0
998    0
999    0
Name: id, Length: 1000, dtype: int64
Showing only the first 1000