In [None]:
from pyspark.context import SparkContext # for RDDs
from pyspark.sql import SparkSession # for DFs

sc = SparkContext('local', 'DF_Practice') # for RDDs
spark = (SparkSession.builder
                    .appName('DF_Practice')
                    .getOrCreate()
        ) # for DFs

#### Creating dataframes

Common ways:
1. From an RDD
2. From a data source (csv, json, parquet, ... , jdbc)
3. From a table
4. From a SQL statement
5. From a Row object

In [None]:
# From RDD
first_rdd = sc.parallelize([
  (1, "Batman"),
  (2, "Superman"),
  (3, "Spiderman")
])

first_df = spark.createDataFrame(first_rdd)

In [None]:
# let's see how the dataframe and its schema look like


#first_df.show(5) # basic display, ASCII

#display(first_df) # Databricks specific way of showing results

#spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # OK for exploration, not great for performance
#first_df
#display(first_df) # but this looks nicer now

#display(first_df.limit(50).toPandas()) #Not recommended unless dealing with small data, pandas pulls everything into driver node memory

#first_df.schema # schema object
#first_df.printSchema() # prints schema of dataframe


In [None]:
# create dataframe from a data source / file
# "spark.read" 
# use "option" for defining additional parameters

crimes_df = (spark.read
             .option("sep", "\t") # separator
             .option("header", True) # file has header row
             .option("inferSchema", True) # spark tries to infer data types
             .csv("Chicago-Crimes-2018.csv") #path
            )

display(crimes_df)

In [None]:
# spark can also read in nested data (json, parquet)

events_df = (spark.read
             .option("inferSchema", True)
             .json("events-500k.json")
            )

events_df.printSchema()

In [None]:
display(events_df)

In [None]:
# Everything is a Row...

from pyspark.sql import Row

# You can create a template for row objects, or you can directly create a row.

# Directly creating a row:
#my_row = Row(ok=1, arr=["hello", 2], more_data=Row(something=1, others="done"))

# Using a template:
my_data = Row("id", "product", "cost")

first_row = my_data(1, "mac", 1000)
second_row = my_data(2, "windows", 500)
third_row = my_data(3, "linux", 700)

#rdd = sc.parallelize([first_row, second_row, third_row])
df = spark.createDataFrame([first_row, second_row, third_row])

#rdd.take(3)
#df.take(3)
display(df)

#### Writing dataframes

1. Into tables
2. Into files

In [None]:
(events_df.write
 .mode("overwrite")
 .saveAsTable("event_table")
)
# this is saved as a global hive table (in Databricks runtime 7+ --> Delta table)
# the table will be accessible from other clusters and it is persisted (not in the free Databricks community version)
# you can use df.createOrReplaceTempView("view_name") to create a temporary, session and cluster based sql view
# Further info: https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

In [None]:
# read whole table into dataframe
events_table_df = spark.table("event_table")

display(events_table_df)

In [None]:
# read sql statement into dataframe
events_sql_df = spark.sql("SELECT device, event_name, CURRENT_DATE() as cdate FROM event_table LIMIT 50")

display(events_sql_df)

In [None]:
(first_df.write
  .option("compression", "snappy")
  .mode("overwrite") # overwrites data if exists. other options "append", "error", "ignore"
  .parquet("first.parquet")
)

# test after running overwrite, append, ignore modes:
#display(spark.read.parquet("first.parquet"))

#### Dataframe methods

##### Column transformations

In [None]:
# creating column objects. 

import pyspark.sql.functions as F

#F.col("device")
#events_df.device
#events_df["device"]

In [None]:
# selecting columns

#devices_df = events_df.select("user_id", "device")
#devices_df = events_df.select(F.col("user_id"), F.col("device"))
#devices_df = events_df.select(F.col("*")) # selecting all values - SQL syntax
#devices_df = events_df.select(F.col("device"), F.col("event_name"), F.col("geo.*")) # selecting nested fields (struct) with dot notation

# for array type, we need to explode the fields (every element will get its own row)

#devices_df = events_df.select(F.col("*"), F.explode(F.col("items")).alias("items"))
#devices_df = events_df.select(F.col("*"), F.explode_outer(F.col("items")).alias("items"))


#display(devices_df)

In [None]:
# selecting columns in SQL-like manner, you can use "in", "case", etc statements

devices_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
#devices_df = events_df.selectExpr("user_id", "case when device = 'Windows' then 'Microsoft' else 'Other' end as windows_test")
display(devices_df)

In [None]:
# dropping columns

anonymous_df = events_df.drop("user_id", "geo", "device")
display(anonymous_df)

In [None]:
# withColumn can be used for adding new column, or replacing an existing column with same name

mobile_df = events_df.withColumn("mobile", F.col("device").isin("iOS", "Android"))
display(mobile_df)

In [None]:
# renaming column names

location_df = events_df.withColumnRenamed("geo", "location")
display(location_df)

In [None]:
# replacing column values

#from pyspark.sql.functions import when

warranty_df = (events_df.select(
                F.col("*")
                , F.when(F.col("event_name") == "warranty","issue")
                  .when(F.col("event_name") == "cart","sale")
                  .otherwise("other").alias("event_class")
              ))
display(warranty_df)

##### Row transformations

In [None]:
# filtering

purchases_df = events_df.filter("ecommerce.total_item_quantity > 0")
display(purchases_df)

#revenue_df = events_df.filter((F.col("ecommerce.purchase_revenue_in_usd").isNotNull()) & (F.col("ecommerce.total_item_quantity") > 1)) 
#display(revenue_df)

In [None]:
# distinct values

#distinct_events_df = events_df.distinct()
#display(distinct_events_df)

# dropDuplicates can be used for considering only a subset of columns

distinct_event_names_df = events_df.dropDuplicates(["event_name"])
display(distinct_event_names_df)

In [None]:
# creating a new dataframe by using n first rows

limit_df = events_df.limit(25)
display(limit_df)

In [None]:
# sorting rows

increasing_ts_df = events_df.sort("event_timestamp")
display(increasing_ts_df)

# use .desc() on a column for descending order. Alternatively, import desc from functions and use it on string-definition of column as desc("column_name")

#decreasing_ts_df = events_df.sort(F.col("event_timestamp").desc())
#display(decreasing_ts_df)

# orderBy = sort

#ordered_df = events_df.orderBy(["device", "event_timestamp"]) #.sort(["device", "event_timestamp"]) 
#display(ordered_df)

### Aggregations

In [None]:
# creating grouped objects

events_df.groupBy("event_name")
#events_df.groupBy("geo.state", "geo.city")

In [None]:
# group and count

event_counts_df = events_df.groupBy("event_name").count()
display(event_counts_df)

In [None]:
# average

avg_purchase_per_state_df = events_df.groupBy("geo.state").avg("ecommerce.purchase_revenue_in_usd")
display(avg_purchase_per_state_df)

In [None]:
# sum

total_purchase_grouped_df = events_df.groupBy("geo.state", "geo.city").sum("ecommerce.total_item_quantity")
display(total_purchase_grouped_df)

In [None]:
# use agg() for applying different types of aggregations
# also allows for other transformations on top of the returned column

#from pyspark.sql.functions import sum, avg, approx_count_distinct

state_purchases_DF = events_df.groupBy("geo.state").agg(F.sum("ecommerce.total_item_quantity").alias("total_purchases"))
display(state_purchases_DF)

#state_aggregates_df = events_df.groupBy("geo.state").agg(
#  F.avg("ecommerce.total_item_quantity").alias("avg_quantity"),
#  F.approx_count_distinct("user_id").alias("distinct_users"))

#display(state_aggregates_df)

### Datetime functions

In [None]:
# casting to datetime

timestamp_df = events_df.withColumn("event_timestamp_formatted", (F.col("event_timestamp") / 1e6).cast("timestamp"))
display(timestamp_df)

# alternative: import the specific data type

#from pyspark.sql.types import TimestampType

#timestamp_df = events_df.withColumn("event_timestamp_formatted", (F.col("event_timestamp") / 1e6).cast(TimestampType()))
#display(timestamp_df)

In [None]:
# formatting dates

#from pyspark.sql.functions import date_format

format_df = (timestamp_df.withColumn("date string", F.date_format("event_timestamp_formatted", "MMMM dd, yyyy"))
  .withColumn("time string", F.date_format("event_timestamp_formatted", "HH:mm:ss.SSSSSS"))
) 
display(format_df)

In [None]:
# extracting datepart

#from pyspark.sql.functions import year, month, dayofweek, minute, second

datetime_df = (timestamp_df.withColumn("year", F.year(F.col("event_timestamp_formatted")))
  .withColumn("month", F.month(F.col("event_timestamp_formatted")))
  .withColumn("dayofweek", F.dayofweek(F.col("event_timestamp_formatted")))
  .withColumn("minute", F.minute(F.col("event_timestamp_formatted")))
  .withColumn("second", F.second(F.col("event_timestamp_formatted")))              
)

display(datetime_df)

In [None]:
# converting to date

#from pyspark.sql.functions import to_date

date_df = timestamp_df.withColumn("date", F.to_date(F.col("event_timestamp_formatted")))
display(date_df)

In [None]:
# manipulating dates

#from pyspark.sql.functions import date_add

plus_df = timestamp_df.withColumn("plus_two_days", F.date_add(F.col("event_timestamp_formatted"), 2))
display(plus_df)

#plus_df = timestamp_df.selectExpr("*","event_timestamp_formatted + interval 2 days") # spark sql allows for +/- interval type of datetime manipulation
#display(plus_df)

### Complex types


* StructType
    * Structure of dataframe, incl. nested fields
* ArrayType
    * List of elements (same type)
* MapType
    * Key-value pairs

In [None]:
# let's import a new dataset

sales_df = spark.read.parquet("sales.parquet")

In [None]:
details_df = (sales_df.withColumn("items", F.explode("items")) # explode array to rows
  .select("email", "items.item_name")
  .withColumn("details", F.split(F.col("item_name"), " ")) # split from space: returns an array (list)
)
display(details_df)

In [None]:
# array_contains returns True if array contains the value
# use element_at for getting specific element
# NB - 1-based index


mattress_df = (details_df.filter(F.array_contains(F.col("details"), "Mattress"))
  .withColumn("size", F.element_at(F.col("details"), 2))
  .withColumn("quality", F.element_at(F.col("details"), 1))
)           
display(mattress_df)

In [None]:
# similar dataset for pillows

pillow_df = (details_df.filter(F.array_contains(F.col("details"), "Pillow"))
  .withColumn("size", F.element_at(F.col("details"), 1))
  .withColumn("quality", F.element_at(F.col("details"), 2))
)           
display(pillow_df)

In [None]:
# unionByName() resolves columns by name
# union() resolves by index

union_df = (mattress_df.unionByName(pillow_df)
  .drop("details"))
display(union_df)

In [None]:
# collect_set - aggregate function, returns distinct set of items

options_df = (union_df.groupBy("email")
  .agg(F.collect_set("size").alias("size options"),
       F.collect_set("quality").alias("quality options"))
)
display(options_df)

### Joining dataframes

In [None]:
# let's create a new dataframe
# distinct users who made a sale = converted

converted_users_df = (sales_df.select("email")
  .distinct()
  .withColumn("converted", F.lit(True))
)
display(converted_users_df)

In [None]:
# let's import whole users dataset for joining

users_df = spark.read.parquet("users.parquet")

# join all users with converted users

conversions_df = (users_df.join(converted_users_df, "email", "outer") # outer join
  .filter(F.col("email").isNotNull()) # only keep users who have e-mail address
  .na.fill(False) # fill all null values with False (only looks for matching data type columns, eg ignores integer type columns)
)
display(conversions_df)

In [None]:
# let's get the cart history for each user

carts_df = (events_df.withColumn("items", F.explode("items"))
  .groupBy("user_id").agg(F.collect_set("items.item_id").alias("cart"))
)
display(carts_df)

In [None]:
# left join for cart history with email address

email_carts_df = conversions_df.join(carts_df, "user_id", "left")
display(email_carts_df)

In [None]:
# using join hints and especially broadcast join can make joins a lot faster if one dataframe is small

# let's create a small dataframe
event_type_df = (events_df.select("event_name")
        .distinct()
        .withColumn("event_type"
                    , F.when((F.col("event_name") == "register") | (F.col("event_name") == "login"), "initial")
                    .when((F.col("event_name") == "checkout") | (F.col("event_name") == "cart") | (F.col("event_name") == "finalize"), "purchase")
                    .otherwise("other")
                   ))

# let's create a large dataframe
events_large_df = events_df.join(events_df.limit(1000), "device")

In [None]:
# view and set the auto broadcast join threshold

print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold")) # default setting is to try broadcasting if one dataframe is smaller than 10MB
#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # disable automatic broadcast
#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10*1024*1024) # restore default setting

In [None]:
# try to join the large and small dataframes

spark.catalog.clearCache() # clear cache to ensure fair comparison
events_large_df.join(event_type_df, "event_name").count()


In [None]:
# join the dataframes using broadcast hint

spark.catalog.clearCache() # clear cache to ensure fair comparison
events_large_df.join(event_type_df.hint("broadcast"), "event_name").count()

### UDFs

User-Defined Functions

In [None]:
# create a simple Python function

def firstLetterFunction(email):
  return email[0]

firstLetterFunction("annagray@kaufman.com")

In [None]:
# register the function as a Spark UDF 

firstLetterUDF = F.udf(firstLetterFunction)

display(sales_df.select(firstLetterUDF(F.col("email"))))

In [None]:
# Geo distance function
# https://gist.github.com/rochacbruno/2883505

from pyspark.sql.types import DoubleType

def distance(startLat, startLon, endLat, endLon):    
    lat1 = startLat
    lon1 = startLon
    lat2 = endLat
    lon2 = endLon
    
    
    #radius = 6371 # km
    radius = 6371000 # m
    import math
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c / 1000 # distance in kilometres

    return d

geo_distance = F.udf(distance, DoubleType()) # you can define the return type, default is string

Let's read in a dataset with coordinates of countries' geographical centres  
https://raw.githubusercontent.com/google/dspl/master/samples/google/canonical/countries.csv

In [None]:
# read the data in as a dataframe

countries_df = (spark.read
               .option("header","true")
               .option("inferSchema","true")
               .csv("countries.csv"))

display(countries_df)

In [None]:
# Let's compare distances of each country's geographical centre, using our UDF

distance_df = (countries_df
 .filter(F.col("country")=="EE")
 .join(countries_df # no join keys = cartesian/cross join
       #.filter(F.col("country")=="FI")
       .toDF("join_country","join_latitude","join_longitude","join_name") # we can use .toDF() to rename all columns of a dataframe
       .na.drop() # remove countries with null values
      )
 .withColumn("distance_in_km", geo_distance("latitude","longitude","join_latitude","join_longitude")) # here is the UDF
 #.filter(F.col("distance_in_km").cast(DoubleType())>0)
 #.orderBy("distance_in_km")
)

display(distance_df)

### Task 1

Based on the sales dataframe used above, create a new dataframe that has the following fields:
* order_id
* email
* purchase_revenue_in_usd
* value_added_tax

Value_added_tax should be an UDF, calculated as follows:
* if purchase_revenue_in_usd > 1500: tax = revenue*0.2
* else: tax = revenue*0.1

The dataframe should be filtered to only include rows where value_added_tax is above 110.

The dataframe should be returned in a descending order based on the value_added_tax column.

In [None]:
# your answer



### Task 2

Join the events dataframe from above and the zips dataset (zips.json).

The join should be done based on city and state. Note that the join is case-sensitive, so transform the columns accordingly before the join.

Return a dataframe which has the following columns:
* user_id
* latitude
* longitude

Latitude is element 2 in the "loc" column of the zips dataset</br>
Longitude is element 1 in the "loc" column of the zips dataset

In [None]:
# your answer



### Further reading
https://spark.apache.org/docs/latest/api/sql/index.html</br>
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html