In [1]:
import pyspark
# Import optimus
#import optimus as op
import pandas as pd

In [2]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()

In [10]:

import pandas as pd
from pyspark.sql.functions import udf

## Example from https://github.com/changhsinlee/changhsinlee.github.io/blob/master/notebook/20180129-python-function-pyspark-udf/example-notebook.ipynb

In [6]:
# example data
df_pd = pd.DataFrame(
    data={'integers': [1, 2, 3],
     'floats': [-1.0, 0.5, 2.7],
     'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]}
)
df = spark.createDataFrame(df_pd)
df.printSchema()

root
 |-- floats: double (nullable = true)
 |-- integer_arrays: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- integers: long (nullable = true)



In [7]:
df.show()

+------+--------------+--------+
|floats|integer_arrays|integers|
+------+--------------+--------+
|  -1.0|        [1, 2]|       1|
|   0.5|     [3, 4, 5]|       2|
|   2.7|  [6, 7, 8, 9]|       3|
+------+--------------+--------+



In [8]:
def square(x):
    return x**2

In [11]:
from pyspark.sql.types import IntegerType
square_udf_int = udf(lambda z: square(z), IntegerType())

In [13]:
(
    df.select('integers', 
              'floats', 
              square_udf_int('integers').alias('int_squared'), 
              square_udf_int('floats').alias('float_squared')) 
    .show()
)

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|          1|         null|
|       2|   0.5|          4|         null|
|       3|   2.7|          9|         null|
+--------+------+-----------+-------------+



In [14]:
from pyspark.sql.types import FloatType
square_udf_float = udf(lambda z: square(z), FloatType())

In [15]:
(
    df.select('integers', 
              'floats', 
              square_udf_float('integers').alias('int_squared'), 
              square_udf_float('floats').alias('float_squared')) 
    .show()
)

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|       null|          1.0|
|       2|   0.5|       null|         0.25|
|       3|   2.7|       null|         7.29|
+--------+------+-----------+-------------+



In [16]:
## Force the output to be float
def square_float(x):
    return float(x**2)
square_udf_float2 = udf(lambda z: square_float(z), FloatType())

In [17]:
(
    df.select('integers', 
              'floats', 
              square_udf_float2('integers').alias('int_squared'), 
              square_udf_float2('floats').alias('float_squared')) 
    .show()
)

+--------+------+-----------+-------------+
|integers|floats|int_squared|float_squared|
+--------+------+-----------+-------------+
|       1|  -1.0|        1.0|          1.0|
|       2|   0.5|        4.0|         0.25|
|       3|   2.7|        9.0|         7.29|
+--------+------+-----------+-------------+



## Creating first own-functions

In [19]:
from pyspark.sql.functions import max as max_
from pyspark.sql.window import Window


In [24]:
from pyspark.sql import functions as F
>>> df.agg(F.max(df.integers)).collect()

[Row(max(integers)=3)]

In [25]:
df.select("integers").rdd.max()[0]

3