In [4]:
import findspark
# Now import PySpark and use it
from pyspark.sql import SparkSession

In [5]:
findspark.init()

In [6]:
# Create SparkSession
spark = SparkSession.builder.appName("HighOrderFunctionSession").getOrCreate()

In [1]:
# In Python
from pyspark.sql.types import *

In [2]:
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

In [3]:
schema

StructType([StructField('celsius', ArrayType(IntegerType(), True), True)])

In [7]:
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

In [8]:
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



## Transform

In [9]:
# // In Scala/Python
#  // Calculate Fahrenheit from Celsius for an array of temperatures
spark.sql("""
SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
  FROM tC
 """).show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



## Filter

In [11]:
#  // In Scala/Python
#  // Filter temperatures > 38C for array of temperatures
spark.sql("""
 SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
  FROM tC
 """).show()## Filter

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



## Exists

In [12]:
#  // In Scala/Python
#  // Is there a temperature of 38C in the array of temperatures
spark.sql("""
 SELECT celsius, 
       exists(celsius, t -> t = 38) as threshold
  FROM tC
 """).show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



## Reduce

In [13]:
spark.sql("""
 SELECT celsius, 
       reduce(
          celsius, 
          0, 
          (t, acc) -> t + acc, 
          acc -> (acc div size(celsius) * 9 div 5) + 32
        ) as avgFahrenheit 
  FROM tC
 """).show()

+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+

