# wk8 Demo - Advanced Spark - DataFrames and Spark SQL
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2022`__

So far we've been using Spark's low level APIs. In particular, we've been using the RDD (Resilient Distiributed Datasets) API to implement Machine Learning algorithms from scratch. This week we're going to take a look at how Spark is used in a production setting. We'll look at DataFrames, SQL, and UDFs (User Defined Functions).  As discussed previously, we still need to understand the internals of Spark and MapReduce in general to write efficient and scalable code.

In class today we'll get some practice working with larger data sets in Spark. We'll start with an introduction to efficiently storing data and approach a large dataset for analysis. After that we'll discuss a ranking problem which was covered in Chapter 6 of the High Performance Spark book and how we can apply that to our problem. We'll follow up with a discussion on things that could be done to make this more effiicent.
* ... __describe__ differences between data serialization formats.
* ... __choose__ a data serialization format based on use case.
* ... __describe__ DataFrames API, GroupBy and _Spark SQL_.
* ... __describe__ and __create__ a data pipeline for analysis.
* ... __use__ a user defined function (UDF).
* ... __understand__ feature engineering and aggregations in Spark.

__`Additional Resources:`__ Writing performant code in Spark requires a lot of thought. Holden's High Performance Spark book covers this topic very well. In addition, Spark - The Definitive Guide, by Bill Chambers and Matei Zaharia, provides some recent developments.

In [2]:
## Imports
import re
import json
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

app_name = "week8_demo"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.ui.port","42229")\
        .getOrCreate()
sc = spark.sparkContext

## Change the working directory
!cd /media/notebooks/student-workspace/LiveSessionMaterials/wk08Demo_DataFrames

In [3]:
## Load the data
data = spark.read.format('bigquery') \
  .option('table', 'bigquery-public-data:samples.gsod') \
  .load()

## DataFrames API

Let's showcase some of the important methods that we have available when working with DataFrames

In [3]:
## show
data.show()

22/06/20 15:32:50 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 1) / 1]

+--------------+-----------+----+-----+---+-------------------+---------------------+------------------+--------------------------+----------------------+----------------------------------+---------------------+---------------------------------+------------------+---------------------------+------------------+---------------------------+------------------------+-------------------+------------------+------------------------+---------------+------------------------+-------------------+----------+-----+-----+-----+-----+-------+-------+
|station_number|wban_number|year|month|day|          mean_temp|num_mean_temp_samples|    mean_dew_point|num_mean_dew_point_samples|mean_sealevel_pressure|num_mean_sealevel_pressure_samples|mean_station_pressure|num_mean_station_pressure_samples|   mean_visibility|num_mean_visibility_samples|   mean_wind_speed|num_mean_wind_speed_samples|max_sustained_wind_speed|max_gust_wind_speed|   max_temperature|max_temperature_explicit|min_temperature|min_temperature

                                                                                

Here we see `.show()`, a method that works similarly to Pandas `.head()`. You can observe that the DataFrame is stored in text, that way it's easier to distribute throughout the different executors. If you want to better display the results, we can transform the output using `.limit(n)` to a Pandas Dataframe

In [4]:
data.limit(10).toPandas().head()

Unnamed: 0,station_number,wban_number,year,month,day,mean_temp,num_mean_temp_samples,mean_dew_point,num_mean_dew_point_samples,mean_sealevel_pressure,...,min_temperature,min_temperature_explicit,total_precipitation,snow_depth,fog,rain,snow,hail,thunder,tornado
0,39730,99999,1929,10,20,52.799999,4,45.5,4,,...,,,0.0,,False,False,False,False,False,False
1,33110,99999,1929,12,18,47.5,4,44.0,4,,...,,,,,False,False,False,False,False,False
2,37770,99999,1931,4,24,50.200001,4,44.299999,4,,...,,,,,False,False,False,False,False,False
3,726810,24131,1931,6,23,65.099998,24,41.5,8,,...,,,0.0,,False,False,False,False,False,False
4,726810,24131,1931,3,2,42.799999,24,31.5,8,,...,,,0.0,,False,False,False,False,False,False


This is a public dataset from NOAA, regarding weather stations across the United States. It has a total of 31 columns.

Another important command is `.printSchema()` to check columns names and what type of data is stored on it

In [5]:
data.printSchema()

root
 |-- station_number: long (nullable = false)
 |-- wban_number: long (nullable = true)
 |-- year: long (nullable = false)
 |-- month: long (nullable = false)
 |-- day: long (nullable = false)
 |-- mean_temp: double (nullable = true)
 |-- num_mean_temp_samples: long (nullable = true)
 |-- mean_dew_point: double (nullable = true)
 |-- num_mean_dew_point_samples: long (nullable = true)
 |-- mean_sealevel_pressure: double (nullable = true)
 |-- num_mean_sealevel_pressure_samples: long (nullable = true)
 |-- mean_station_pressure: double (nullable = true)
 |-- num_mean_station_pressure_samples: long (nullable = true)
 |-- mean_visibility: double (nullable = true)
 |-- num_mean_visibility_samples: long (nullable = true)
 |-- mean_wind_speed: double (nullable = true)
 |-- num_mean_wind_speed_samples: long (nullable = true)
 |-- max_sustained_wind_speed: double (nullable = true)
 |-- max_gust_wind_speed: double (nullable = true)
 |-- max_temperature: double (nullable = true)
 |-- max_tempe

In [7]:
%%time
## To look how many data points, we can use the command .count()
print(f"Number of rows is {data.count()} and number of columns is {len(data.columns)}")

[Stage 5:>                                                          (0 + 4) / 4]

Number of rows is 114420316 and number of columns is 31
CPU times: user 2.71 ms, sys: 2.83 ms, total: 5.54 ms
Wall time: 1.53 s


                                                                                

114 million rows! Try to fit that into a Pandas DataFrame!. Now let's check how can we filter our dataframe and how can we create new columns.

We need to lever a very important set of Spark built-in functions from `pyspark.sql.functions`, typically called `F` functions

In [27]:
# Using built-in Spark functions are always more efficient
from pyspark.sql import types
import pyspark.sql.functions as F

## Let's create a new column called time
data = data.withColumn("time", 
                  F.concat(F.col("year"), 
                  F.lit("-"), F.col("month"), 
                  F.lit("-"), F.col("day")) \
                  .cast(types.TimestampType()))

data.printSchema()

root
 |-- station_number: long (nullable = false)
 |-- wban_number: long (nullable = true)
 |-- year: long (nullable = false)
 |-- month: long (nullable = false)
 |-- day: long (nullable = false)
 |-- mean_temp: double (nullable = true)
 |-- num_mean_temp_samples: long (nullable = true)
 |-- mean_dew_point: double (nullable = true)
 |-- num_mean_dew_point_samples: long (nullable = true)
 |-- mean_sealevel_pressure: double (nullable = true)
 |-- num_mean_sealevel_pressure_samples: long (nullable = true)
 |-- mean_station_pressure: double (nullable = true)
 |-- num_mean_station_pressure_samples: long (nullable = true)
 |-- mean_visibility: double (nullable = true)
 |-- num_mean_visibility_samples: long (nullable = true)
 |-- mean_wind_speed: double (nullable = true)
 |-- num_mean_wind_speed_samples: long (nullable = true)
 |-- max_sustained_wind_speed: double (nullable = true)
 |-- max_gust_wind_speed: double (nullable = true)
 |-- max_temperature: double (nullable = true)
 |-- max_tempe

In [None]:
# If you want to select one or a set of columns, we can use the select method
data.select('time').show(5)

In [None]:
# RUN CELL AS IS
data.select(['time', 'tornado']).show(5)

In [None]:
# If you want any row, we can take
data.take(1)

Each Row of the DataFrame is a `Row` which is similar to a dictionary, you can reference each element of the Row using the key. Now, also notice that the output of `take` is a list, so you need to index the list first

In [None]:
# let's get the station_number only
data.take(1)[0]['station_number']

In [None]:
# Let's check now how to filter data using another weather station data
stations = spark.read.format('bigquery') \
  .option('table', 'bigquery-public-data:noaa_gsod.stations') \
  .load()

In [None]:
## Let's filter only US based stations
# Let's filter for just the US since this is a US based dataset
stations_us = stations.filter(F.col('Country')=='US')

print(f'Total stations are {stations.count()}, total US stations are {stations_us.count()}')

In [None]:
%%time
## Finally, we can describe our dataset using the describe command, similar to Pandas
## Let's select just a few columns
keep_columns = ['station_number', 'mean_temp', 'thunder', 'mean_sealevel_pressure']
data.select(keep_columns).describe().show()

# Data Types

I highly recommend reading this article [Format Wars](http://www.svds.com/dataformats/) which covered the characteristics, structure, and differences between raw text, sequence, Avro, Parquet, and ORC data serializations. 

There were several points discussed: 

* Human Readable
* Row vs Column Oriented
* Read vs Write performance
* Appendable
* Splittable
* Metadata storage

We have 4 data types below

- Compressed CSV
- Parquet
- Avro
- CSV

Of these 3 are row oriented and 1 is column oriented. We have over 100M rows and 31 columns. Columnar compression should do fairly well in this scenerio. 

In [None]:
# Access staging bucket and see whats there
import os
GCS_LOCATION = os.getenv('DATA_BUCKET')
GCS_LOCATION

In [None]:
%%time
!gsutil rm -r {GCS_LOCATION}datagzip
data.write.option("compression","gzip").csv(f'{GCS_LOCATION}datagzip')
!gsutil du -sh {GCS_LOCATION}datagzip/*

In [None]:
%%time
!gsutil rm -r {GCS_LOCATION}dataparquet
data.write.format("parquet").save(f'{GCS_LOCATION}dataparquet')
!gsutil du -sh {GCS_LOCATION}dataparquet/*

In [None]:
%%time
!gsutil rm -r {GCS_LOCATION}dataavro
data.write.format("avro").save(f'{GCS_LOCATION}dataavro')
!gsutil du -sh {GCS_LOCATION}dataavro/*

In [None]:
%%time
!gsutil rm -r {GCS_LOCATION}datacsv
data.write.csv(f'{GCS_LOCATION}datacsv')
!gsutil du -sh {GCS_LOCATION}datacsv/*

## Why do we care?

The compression of each data type matter when running different operations and computations, let's compare the 3

In [21]:
## Create our dataframes
data_parquet = spark.read.parquet(f'{GCS_LOCATION}dataparquet')
data_csv = spark.read.csv(f'{GCS_LOCATION}datacsv')
data_avro = spark.read.format("avro").load(f'{GCS_LOCATION}/dataavro')

In [None]:
%%time
data_parquet.count()

In [None]:
%%time
data_csv.count()

In [None]:
%%time
data_avro.count()

* _What is the compression ratio for the parquet to csv file?_

* _Which serialization would query a column faster?_

* _Which types of columns do you think has the best compression for parquet?_

* _When should you use flat files vs other data formats?_

* _If we want to do analysis with lots of aggregations what serialization should we use?_

* _Is there any downside to Parquet?_

* _If you had to partition data into days as new data comes in with aggregations happening at end of day how would you operationalize this?_


# Data Aggregation

Let's perform different aggregations using different methods and GroupBy. Don't worry! GroupBy from DataFrames is very different than RDDs.

In [None]:
%%time
## Let's start with sorting
#data_parquet.sort("mean_temp").show()
#data_parquet.sort("mean_temp").select("mean_temp").show()
data_parquet.sort("mean_temp").select("mean_temp").filter(F.col("mean_temp").isNotNull()).show()

In [None]:
%%time
## Let's compare with avro
data_avro.sort("mean_temp").select("mean_temp").filter(F.col("mean_temp").isNotNull()).show()

In [None]:
%%time
data_parquet.select(F.mean("mean_wind_speed").alias("Avg mean Wind")).show()

In [None]:
%%time
data_parquet.select(F.max("mean_wind_speed")).show()

In [None]:
%%time
data_parquet.select(F.min("mean_wind_speed")).show()

In [None]:
%%time
data_parquet.select(F.stddev("mean_wind_speed")).show()

## GroupBy

In [None]:
%%time
## Let's suppose we want the average temperature by year
data_pandas = data_parquet.groupBy("year").agg(F.mean("mean_temp").alias("Mean Temp")).toPandas()
data_pandas.sort_values("year").set_index("year").plot()

In [None]:
data_parquet.groupBy("year").agg(F.mean("mean_temp").alias("Mean Temp")).count()

In [None]:
%%time
## Let's suppose we want more than one
data_pandas = data_parquet.groupBy("year").agg(F.mean("mean_temp").alias("Mean Temp"), 
                                               F.max("max_temperature").alias("Max Temp")).toPandas()
data_pandas.sort_values("year").set_index("year").plot()

In [None]:
%%time
## Let's suppose we want more than one
data_pandas = data_parquet.groupBy("year").agg(F.mean("mean_temp").alias("Mean Temp"), 
                                               F.stddev("mean_temp").alias("SD Mean Temp")).toPandas()
data_pandas = data_pandas.sort_values("year").set_index("year")
data_pandas['Min CI Temp'] = data_pandas['Mean Temp'] - 2*data_pandas['SD Mean Temp']
data_pandas['Max CI Temp'] = data_pandas['Mean Temp'] + 2*data_pandas['SD Mean Temp']
del data_pandas['SD Mean Temp']
data_pandas.plot()

In [None]:
%%time
## Let's suppose we want more than one
data_parquet.groupBy(["year", 'month']).agg(F.mean("mean_temp").alias("Mean Temp"), 
                                               F.stddev("mean_temp").alias("SD Mean Temp")).sort(['year','month']).show()

## User Defined Functions

In [39]:
## Let's recall how we created the time column from before
data_parquet_time = data_parquet.withColumn("time", 
                                F.concat(F.col("year"), 
                                F.lit("-"), F.col("month"), 
                                F.lit("-"), F.col("day")) \
                                .cast(types.TimestampType()))

In [None]:
%%time
data_parquet_time.select('time').show(5)

In [41]:
## Can we do it differently? Yes! UDF. You can create UDF that will work row by row in your dataframe
def create_date_from_parts(year, month, day):
    return f'{year}-{month}-{day}'

create_date_udf = F.udf(create_date_from_parts, types.StringType())
data_parquet_time_udf = data_parquet.withColumn("time", create_date_udf('year', 'month', 'day').cast(types.TimestampType()))

In [None]:
%%time
data_parquet_time_udf.select('time').show(5)

UDFs are typically much slower than built-in Spark functionality. The reason for this is becauase they have to serialize and deserialize the data for every row that the function is applied to. There have been recent improvements to UDF for some analytical results with Pandas UDFs that return scalars or groupby maps. Some more information about why UDFs are inefficent can be found here https://blog.cloudera.com/blog/2017/02/working-with-udfs-in-apache-spark/

In [None]:
%%time
## Let's look at other examples
from pyspark.sql.functions import udf
@udf("double")
def squared_udf(s):
    return s * s


data_udf = data_parquet.withColumn("square_temp", squared_udf(F.col("mean_temp")))
data_udf.select("square_temp").show()

In [None]:
%%time
data_no_udf = data_parquet.withColumn("square_temp", F.col("mean_temp")**2)
data_udf.select("square_temp").show()

In [None]:
%%time
## You can also use UDF with select
data_parquet.select("mean_temp", squared_udf("mean_temp").alias("squared_temp")).show()

# Spark SQL

Finally, let's work with Spark SQL. Spark allows us to combine the power of SQL with Spark and the Dataframes API

In [None]:
%%time
## Let's run an example
# First we need to create a temporary table that we can query
data_parquet.registerTempTable('data')
spark.sql(
"""
select mean_temp
from data
""").show()

In [None]:
%%time
data_parquet.select("mean_temp").show()

In [None]:
## Let's run multiple querys similar to the ones we ran before
spark.sql(
"""
select mean_temp, power(mean_temp, 2) as squared_temp
from data
""").show()

In [None]:
%%time
## Let's run multiple querys similar to the ones we ran before
spark.sql(
"""
select 
    year, 
    month, 
    avg(mean_temp) as mean, 
    std(mean_temp) as st_dev
from 
    data
group by
    year,
    month
order by
    year,
    month
""").show()

In [None]:
## We can save the Spark SQL query as a dataframe
df_sql = spark.sql(
"""
select 
    year, 
    month, 
    avg(mean_temp) as mean, 
    std(mean_temp) as st_dev
from 
    data
group by
    year,
    month
order by
    year,
    month
""")

df_sql_pd = df_sql.toPandas()
df_sql_pd = df_sql_pd.set_index(["year", 'month'])
df_sql_pd

In [None]:
%%time
## Let's also join dataframes
stations = spark.read.format('bigquery') \
  .option('table', 'bigquery-public-data:noaa_gsod.stations') \
  .load()

stations_us = stations.filter(F.col('Country')=='US')

## One of the dataframes is quite small, so let's broadcast it!
# join_data = data_parquet.join(F.broadcast(stations_us), stations_us.usaf==data_parquet.station_number, 'inner')
join_data = data_parquet.join(stations_us, stations_us.usaf==data_parquet.station_number, 'inner')
join_data.count()

In [53]:
join_data.printSchema()

root
 |-- station_number: long (nullable = true)
 |-- wban_number: long (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- mean_temp: double (nullable = true)
 |-- num_mean_temp_samples: long (nullable = true)
 |-- mean_dew_point: double (nullable = true)
 |-- num_mean_dew_point_samples: long (nullable = true)
 |-- mean_sealevel_pressure: double (nullable = true)
 |-- num_mean_sealevel_pressure_samples: long (nullable = true)
 |-- mean_station_pressure: double (nullable = true)
 |-- num_mean_station_pressure_samples: long (nullable = true)
 |-- mean_visibility: double (nullable = true)
 |-- num_mean_visibility_samples: long (nullable = true)
 |-- mean_wind_speed: double (nullable = true)
 |-- num_mean_wind_speed_samples: long (nullable = true)
 |-- max_sustained_wind_speed: double (nullable = true)
 |-- max_gust_wind_speed: double (nullable = true)
 |-- max_temperature: double (nullable = true)
 |-- max_temperatu

In [None]:
%%time
## let's now use Spark SQL to query this data
join_data.registerTempTable('data')
spark.sql(
"""
select 
    state,
    avg(mean_temp) as mean, 
    avg(lat),
    avg(lon)
from 
    data
where
    state in ('CA', 'TX', 'NY')
group by
    state
order by
    state
""").show()