# Using user defined functions in Spark

- You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

- For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

- The `pyspark.sql.functions` library is available under the alias `F`. The classes from `pyspark.sql.types` are already imported.

## Instructions

- Edit the `getFirstAndMiddle()` function to return a space separated string of names, except the last entry in the names list.
- Define the function as a user-defined function. It should return a string type.
- Create a new column on `voter_df` called `first_and_middle_name` using your UDF.
- Show the Data Frame.

In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Load the json file
data_df = spark.read.json('file:///home/talentum/test-jupyter/Daily/Abohar_30.1445_74.1955_20040101_20081231.json', multiLine=True)

In [4]:
print(data_df.printSchema())

root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- api: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |-- end: string (nullable = true)
 |    |-- fill_value: double (nullable = true)
 |    |-- sources: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- start: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- messages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- parameters: struct (nullable = true)
 |    |-- PS: struct (nullable = true)
 |    |    |-- longname: string (nullable = true)
 |    |    |-- units: string (nullable = true)
 |    |-- PSC: struct (nullable = true)
 |    |    |-- longname: string (nullable = true)
 |    |    |-- unit

In [6]:
from pyspark.sql.functions import col

data_1_df = data_df.withColumn("Parameter", data_df.properties.parameter) \
.drop('properties','parameters','times','type','geometry','header','messages') \
.withColumn("PS", col('Parameter')['PS']) \
.withColumn("PSC", col('Parameter')['PSC']) \
.drop("Parameter")

In [7]:
# data_1_df.show()
type(data_1_df)
print(data_1_df.printSchema())


root
 |-- PS: struct (nullable = true)
 |    |-- 2004010105: double (nullable = true)
 |    |-- 2004010106: double (nullable = true)
 |    |-- 2004010107: double (nullable = true)
 |    |-- 2004010108: double (nullable = true)
 |    |-- 2004010109: double (nullable = true)
 |    |-- 2004010110: double (nullable = true)
 |    |-- 2004010111: double (nullable = true)
 |    |-- 2004010112: double (nullable = true)
 |    |-- 2004010113: double (nullable = true)
 |    |-- 2004010114: double (nullable = true)
 |    |-- 2004010115: double (nullable = true)
 |    |-- 2004010116: double (nullable = true)
 |-- PSC: struct (nullable = true)
 |    |-- 2004010105: double (nullable = true)
 |    |-- 2004010106: double (nullable = true)
 |    |-- 2004010107: double (nullable = true)
 |    |-- 2004010108: double (nullable = true)
 |    |-- 2004010109: double (nullable = true)
 |    |-- 2004010110: double (nullable = true)
 |    |-- 2004010111: double (nullable = true)
 |    |-- 2004010112: double (nul

In [9]:
for column in data_1_df.columns:
    
    data_2_df = data_1_df.withColumn("some", data_1_df.column[])

PS
PSC


In [20]:

data_2_df = data_1_df.withColumn("2004010105", data_1_df.PS['2004010105'])

In [21]:
# data_1_df.show()
type(data_2_df)
print(data_2_df.printSchema())

root
 |-- PS: struct (nullable = true)
 |    |-- 2004010105: double (nullable = true)
 |    |-- 2004010106: double (nullable = true)
 |    |-- 2004010107: double (nullable = true)
 |    |-- 2004010108: double (nullable = true)
 |    |-- 2004010109: double (nullable = true)
 |    |-- 2004010110: double (nullable = true)
 |    |-- 2004010111: double (nullable = true)
 |    |-- 2004010112: double (nullable = true)
 |    |-- 2004010113: double (nullable = true)
 |    |-- 2004010114: double (nullable = true)
 |    |-- 2004010115: double (nullable = true)
 |    |-- 2004010116: double (nullable = true)
 |-- PSC: struct (nullable = true)
 |    |-- 2004010105: double (nullable = true)
 |    |-- 2004010106: double (nullable = true)
 |    |-- 2004010107: double (nullable = true)
 |    |-- 2004010108: double (nullable = true)
 |    |-- 2004010109: double (nullable = true)
 |    |-- 2004010110: double (nullable = true)
 |    |-- 2004010111: double (nullable = true)
 |    |-- 2004010112: double (nul

In [22]:
data_2_df.show()

+--------------------+--------------------+----------+
|                  PS|                 PSC|2004010105|
+--------------------+--------------------+----------+
|[99.45, 99.49, 99...|[101.64, 101.68, ...|     99.45|
+--------------------+--------------------+----------+



In [None]:
header_col = 'Cal'
cols_minus_header = df.columns
cols_minus_header.remove(header_col)

df1 = (df
       .groupBy()
       .pivot('Cal')
       .agg(F.first(F.array(cols_minus_header)))
       .withColumn(header_col, F.array(*map(F.lit, cols_minus_header)))
      )

In [44]:
from pyspark.sql.functions import col

start_date_str = data_df.withColumn("start", col('header.start'))
end_date_str = "2000123123"

type(start_date_str)

start_date_str.show()

+--------------------+--------------------+--------+--------------------+--------------------+-------------+-------+----------+
|            geometry|              header|messages|          parameters|          properties|        times|   type|     start|
+--------------------+--------------------+--------+--------------------+--------------------+-------------+-------+----------+
|[[74.1955, 30.144...|[[POWER Hourly AP...|      []|[[Surface Pressur...|[[[99.45, 99.49, ...|[2.035, 2.25]|Feature|2004010105|
+--------------------+--------------------+--------+--------------------+--------------------+-------------+-------+----------+



In [28]:
import datetime

# Function to convert string in format "YYYYMMDDHH" to datetime object
def parse_date(input_date):
    return datetime.datetime.strptime(input_date, "%Y%m%d%H")

In [31]:
start_date = parse_date(start_date_str)
end_date = parse_date(end_date_str)

print(start_date)

TypeError: strptime() argument 1 must be str, not Column

In [None]:
# Loop through dates and print in the desired format
current_date = start_date
while current_date <= end_date:
    print(current_date.strftime("%Y-%m-%d %H:%M"))
    current_date += datetime.timedelta(hours=1)

In [52]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType

schema = StructType([
    StructField("properties", StructType([
        StructField("parameter", StructType([
            StructField("PS", MapType(StringType(),DoubleType()),True),
            StructField("PSC", StringType(),True),
            StructField("T2M_MAX", StringType(),True),
            StructField("T2M_MIN", StringType(),True),
            StructField("T2MWET", StringType(),True),
            StructField("T2MDEW", StringType(),True),
            StructField("T2M", StringType(),True),
            StructField("T2M_RANGE", StringType(),True),
            StructField("TS", MapType(StringType(),DoubleType()),True),
            StructField("WS10M", StringType(),True),
        ]), True)
    ]), True)
])



# Load the json file
data_df = spark.read.json('file:///home/talentum/test-jupyter/Daily/Abohar_30.1445_74.1955_20040101_20081231.json', multiLine=True, schema=schema)

print(data_df.printSchema())
print(data_df.show())

root
 |-- properties: struct (nullable = true)
 |    |-- parameter: struct (nullable = true)
 |    |    |-- PS: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: double (valueContainsNull = true)
 |    |    |-- PSC: string (nullable = true)
 |    |    |-- T2M_MAX: string (nullable = true)
 |    |    |-- T2M_MIN: string (nullable = true)
 |    |    |-- T2MWET: string (nullable = true)
 |    |    |-- T2MDEW: string (nullable = true)
 |    |    |-- T2M: string (nullable = true)
 |    |    |-- T2M_RANGE: string (nullable = true)
 |    |    |-- TS: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: double (valueContainsNull = true)
 |    |    |-- WS10M: string (nullable = true)

None
+--------------------+
|          properties|
+--------------------+
|[[[2004010105 -> ...|
+--------------------+

None


In [None]:
#data_df.withColumn
final_df = data_df.withColumn("PS", data_df.properties.parameter.PS) \
.withColumn("PSC", data_df.properties.parameter.PSC) \
.withColumn("TS", data_df.properties.parameter.TS) \
.withColumn("T2M", data_df.properties.parameter.T2M) \
.withColumn("T2M_MIN", data_df.properties.parameter.T2M_MIN) \
.withColumn("T2M_MAX", data_df.properties.parameter.T2M_MAX).show()

#data_df['pr']

In [53]:
df1 = data_df.select("properties.parameter.TS")
df2 =  data_df.select(col("properties.parameter.*"))

In [56]:
#df3 = df2.explode()
from pyspark.sql.functions import explode
df2.select(explode(df2.PS).alias("key", "value")).show()

+----------+-----+
|       key|value|
+----------+-----+
|2004010105|99.45|
|2004010106|99.49|
|2004010107|99.53|
|2004010108| 99.6|
|2004010109|99.65|
|2004010110|99.62|
|2004010111|99.52|
|2004010112|99.44|
|2004010113|99.37|
|2004010114|99.31|
|2004010115|99.28|
|2004010116|99.28|
+----------+-----+



In [None]:

# data_df = voter_df.filter(voter_df.VOTER_NAME.isNotNull())
# voter_df = voter_df.fillna("abc")   if null rows are not allowed to delete than fill it with something
# voter_df = voter_df.dropna()       to delete null values row

In [None]:
voter_df.write