In [None]:
%matplotlib inline
import matplotlib
import seaborn as sns
sns.set()
matplotlib.rcParams['figure.dpi'] = 144

In [None]:
import random
from pyspark import SparkContext, SparkConf
sc = SparkContext("local[*]", "pyspark_df")
print(sc.version)

# Alternatively...
# conf = SparkConf().setAppName("pyspark_df").setMaster("local[*]")
# sc = SparkContext(conf=conf)

In [None]:
# needed to convert RDDs into DataFrames
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
sqlContext = SQLContext(sc)

In [None]:
# Resets the HDFS environment.  (It's only necessary if you've already run
# this notebook.)
# Also clears the local file version, in case you wrote to local FS instead
!hadoop fs -rm -r parquet_demo_pyspark
!rm -r parquet_demo_pyspark

# PySpark DataFrames
<!-- requirement: small_data/nycp.csv -->
<!-- requirement: projects/citi-subset -->

## Motivation and Spark SQL


Spark SQL is the current effort to provide support for writing SQL queries in Spark. Newer versions support Hive, Parquet, and other data sources. [Docs](http://spark.apache.org/docs/latest/sql-programming-guide.html)

The key feature of Spark SQL is the use of DataFrames instead of RDDs. A DataFrame is a distributed collection of data organized into named columns, and operations on DataFrames are first parsed through an optimized execution engine which streamlines and may even reorder the request to optimize execution. The keyword to search here is Catalyst.

Under the hood, operations on DataFrames are boiled down to operations on RDDs, but the RDDs are created by the execution engine, and not directly by the user. It is also possible to convert RDDs to DataFrames and vice versa.

The Spark ML package, unlike MLlib, uses DataFrames as inputs and outputs.

**Question:** What is an example of a "bad" sequence of operations which should be reordered for optimal performance?

DataFrames are...

* Immutable, like RDDs
* Lineage is remembered, like RDDs (resiliency)
* Lazy execution, like RDDs
* So why do we care?


DataFrames are an abstraction that lets us think of data in a familiar form (Panda, data.frame, SQL table, etc.).

We can use a similar API to RDDs!

Access to SQL-like optimizations and cost analysis due to it being in a columnar format.

What about type safety?

What are these UDF things?

In [None]:
data = (sc.parallelize(range(1,10001))
         .map(lambda x: (random.random(), random.random())))

In [None]:
df = data.toDF()
# Note: this isn't always so easy, you may need to explicitly specify a schema

In [None]:
df.printSchema()

In [None]:
df = df.withColumnRenamed("_1", "x").withColumnRenamed("_2", "y")
df.write.save("parquet_demo_pyspark", format="parquet")
# Another (older) syntax
# df.write.parquet("file:///home/jovyan/datacourse/module5/demo")

Try rerunning the above cell.

Save modes:
* error
* append
* overwrite
* ignore (i.e. CREATE TABLE IF NOT EXISTS)

In [None]:
df.write.mode("ignore").parquet("parquet_demo_pyspark")

In [None]:
dfp = sqlContext.read.parquet("parquet_demo_pyspark")

In [None]:
dfp.describe("x").show()

In [None]:
filtered_dfp = dfp.filter(dfp["x"] < 0.5)

In [None]:
filtered_dfp.count()

## Exploring the Catalyst Optimizer

The Catalyst optimizer is a tool which exploits the structure and type knowledge offered by DataFrames to enable faster execution.    At the end of the day its still using `RDD`'s under the hood, but it goes through an optimization step to turn the high level "query" written with DataFrames (or SQL) into an optimized query and then code generation into Spark Core (RDD).  This is really much like a compiler and it has the same tree sort of optimization structure.

We explore this by making use of the `explain` method of a `DataFrame` which shows

- *Parsed Logical Plan* (the input of the user)
- *Analyzed Logical Plan* (type information)
- *Optimized Logical Plan* (after optimization)
- *Physical Plan* (execution plan)


Lets take our `filtered_dfp` from before and look at its Logical Plan.

In [None]:
filtered_dfp.explain(True)

Now lets see a more interesting example.  Lets take `df` and apply a filter on the "x" column and look at the resulting plans.

In [None]:
filtered_df = df.filter(df["x"] < 0.5)

In [None]:
filtered_df.explain(True)

Lets look at a different plan where we filter both "x" and "y".

In [None]:
filtered_df = df.filter(df["x"] < 0.5).filter(df["y"] < 0.5)

In [None]:
filtered_df.explain(True)

In [None]:
filtered_dfp = dfp.filter(dfp["x"] < 0.5).filter(dfp["y"] < 0.5)

In [None]:
filtered_dfp.explain(True)

Under the hood, it's just manipulating trees based on rules.
The introductory [blog post](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) has good pictures.


### Project Tungsten


* Memory management and GC (better than the JVM)
* Cache-aware computation
* Codegen (compile queries into Java bytecode)

Cache-aware computation example:
* Case 1: pointer -> key, value
* Case 2: key, pointer -> key, value

The CPU has to find keys for sort purposes. This helps it find them faster.

[More](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)


## SQL and DataFrames

In [None]:
# Requires Hive to permanently store tables
df.registerTempTable('nums')  # This is NOT the same as a temp table in SQL proper
sql_df = sqlContext.sql("select x, y from nums where y > 0.9 limit 3")
sql_df.show()

In [None]:
sql_df.explain(True)

*Reminder:* Check the UI (port 4040 by default) for tables in memory.

*Reminder:* A number of interactive tutorials are available on the DataBricks [community cloud](https://community.cloud.databricks.com). I highly recommend making an account and checking out the guide.

This is also a good place to learn about connecting to databases like Cassandra or using JDBC protocol.

## Adding columns and functions


Because DataFrames are immutable, adding new information means appending columns to an existing DataFrame.

In [None]:
def predictor(threshold):
    def pred(val):
        if val > threshold:
            return 1.0
        else:
            return 0.0
    return pred

In [None]:
x_labelizer = udf(predictor(0.5), DoubleType())
y_labelizer = udf(predictor(0.9), DoubleType())

In [None]:
new_df = dfp.withColumn("x_label", x_labelizer("x")).withColumn("y_label", y_labelizer("y"))

In [None]:
new_df.show()

## Type safety and DataSets

In [None]:
rdd = new_df.rdd
row = rdd.take(1)
row

In [None]:
# Remember that take always returns a list of results
print(type(row))

In [None]:
row = row[0]
print(type(row))

In Python, we're not too worried about type safety. But it's important to note that in Scala/Java, these Row objects do not contain the type information of the objects inside them and therefore type safety can be lost converting from RDDs to DataFrames. [DataSets](http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) (fleshed out in Spark 2.0) are a newer incarnation of DataFrames that add encoding information to preserve that type safety.We can, however, drill into Row objects to extract the information we want.

We can, however, drill into Row objects to extract the information we want.

In [None]:
row[1]

In [None]:
row.asDict()

## DataFrame Optimization

One of the most common ways to slow down your code is to call into Python a bit too much.  Lets look at an example where we just want to split a string.

In [None]:
import os
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

In [None]:
df = sqlContext.read.csv(localpath('./small_data/nycp.csv'), 
                         header=True)

df.show()

In [None]:
from pyspark.sql.types import StringType, IntegerType
split_str = udf(lambda x : x.split(':'), StringType())

In [None]:
%%timeit
df.withColumn("split", split_str("STATION")).collect()

In [None]:
df.withColumn("split", split_str("STATION")).show(5)

In [None]:
from pyspark.sql import functions as F

In [None]:
%%timeit
df.withColumn("split", F.split(df.STATION, "-")).collect()

In [None]:
df.withColumn("split", F.split(df['STATION'], "-")).show(5)

This is great, but lets do a real computation, lets say we want to find the average temperature in each month and then plot it.  One thing we can take advantage of here is the built in conversion to Pandas DataFrames.  If we can reduce our data to manageable size, we can make plots quite easily.

In [None]:
avg_temp = (df.withColumn("avg", (df["TMIN"] + df["TMAX"]) / 2)
              .withColumn("month",df['DATE'].substr(5, 2).cast(IntegerType()))
              .groupBy("month")
              .avg('avg')
              .withColumnRenamed("avg(avg)", "temp")
           )
avg_temp.show()

In [None]:
(avg_temp.toPandas()
         .set_index("month")
         .sort_index()
         .plot.bar())

This calculation takes advantage of both the scalability of Spark and the nice plotting features of Pandas.  It can be useful to think of Spark as the tool to handle scaling data while something like Pandas can be used on the aggregate data for visualization and summary.

## Joins

Often we want to join data sets together.  Lets work with a slightly larger data set that we can join with the data set we just downloaded!

The data set we will work with keep track of individual `citibike` rides over the course of a few years.  We expect that the weather might play a role in how many rides there are in a given day so we will use the `citibike` data along with the weather data we have already obtained to confirm this suspicion.

First we will have to download the data which is a few `Gb` in size.

In [None]:
%%bash
cd projects
# Uncomment the following line to trigger the download of the data set.
#DO_DOWNLOAD=true
if [ ! -d "cititrip" ] && [ "$DO_DOWNLOAD" ] ; then
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-07 - Citi Bike trip data.csv.gz" - -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-08 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-09 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-10 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-11 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2013-12 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-01 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-02 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-03 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-04 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-05 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-06 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-07 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q "https://s3.amazonaws.com/dataincubator-course/cititrip/2014-08 - Citi Bike trip data.csv.gz" -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201409-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201410-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201411-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201412-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201501-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201502-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201503-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201504-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201505-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201506-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201507-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201508-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201509-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201510-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201511-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201512-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201601-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201602-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201603-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201604-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201605-citibike-tripdata.csv.gz -nc  -P cititrip/.
    wget -q https://s3.amazonaws.com/dataincubator-course/cititrip/201606-citibike-tripdata.csv.gz -nc  -P cititrip/.
fi
echo "Done!"

In [None]:
datadir = 'projects/cititrip' if os.path.isdir('projects/cititrip') else 'projects/citi-subset/'

Now we can read in the data and show just the first few rows, this is always smart just to get a sense of what the data contains.

In [None]:
df = sqlContext.read.csv(localpath(datadir),
                         header=True)
df.show(5)

Before we can join this data, we need to do a little clean up to parse out the dates, lets look at the beginning of the DataFrame.

In [None]:
(df.withColumn("day", F.split("starttime", " ").getItem(0))
   .select("day")
   .show())

And lets look at another part of the DataFrame, notice that the dates are not always in a unified format.

In [None]:
(df.withColumn("day", F.split("starttime", " ").getItem(0))
   .select("day")
   .filter(F.col("day").contains('-'))
   .show())

We will need to handle these two cases when converting to dates, to see some information about date strings, check out [simple date format](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)

In [None]:
days = (df.withColumn("st", F.split("starttime", " ").getItem(0))
               .withColumn("day",
                           F.when(F.col("st").contains("/"), 
                                  F.to_date("st", "MM/dd/yyyy"))
                            .otherwise(F.to_date("st", "yyyy-MM-dd")))
               .select("day")
)
days.show()

Now we can aggregate by "day" and count the number of occurrences.

In [None]:
day_counts = (days.select("day")
                  .groupBy("day")
                  .count()
                  .cache()
)

In [None]:
day_counts.show()

Let us take a look at this data.

In [None]:
(day_counts.toPandas()
           .set_index("day")
           .sort_index()
           .plot())

It looks like it might be very correlated with weather, to examine this, lets take our original DataFrame and join them together to get some sense of the correlation.  Note that we will do the join after we have already computed the aggregate summation over each day, why?

In [None]:
df_temp = (sqlContext.read.csv(localpath('./small_data/nycp.csv'), 
                         header=True)
                     .withColumn("avg", 
                                 (F.col("TMIN") + F.col("TMAX")) / 2)
                     .withColumn("dt", F.to_date(F.col("DATE"),
                                                "yyyyMMdd"))
          )
df_temp.select(['avg', 'dt']).show()

In [None]:
joined = df_temp.select(['avg', 'dt']).join(day_counts,
                                            df_temp.dt == day_counts.day,
                                            'inner')
joined.show()

We can now calculate the correlation of these two columns and see if it looks fairly large.

In [None]:
joined.corr("avg", "count")

This suggests a strong correlation, not too surprising given people probably don't want to ride when its cold.  Lets plot these just to see this visually.  To make it look pretty, lets scale the columns as well.  Here we will use a bit of poor way to do this, in the Machine Learning notebook, we will learn how to use some built in functionality to do this faster!

In [None]:
min_t = joined.agg({"avg":"min"}).collect()[0][0]
max_t = joined.agg({"avg":"max"}).collect()[0][0]
min_c = joined.agg({"count":"min"}).collect()[0][0]
max_c = joined.agg({"count":"max"}).collect()[0][0]
print(min_t, max_t, min_c, max_c)

In [None]:
(joined.withColumn("scale_temp", 
                   (F.col("avg") - min_t)/(max_t - min_t))
       .withColumn("scale_count",
                  (F.col("count") - min_c)/(max_c - min_c))
       .select(["day", "scale_temp", "scale_count"])
       .toPandas()
       .set_index("day")
       .plot(alpha=.75))

Clearly weather is a strong predictor of bike ridership, but it isn't the only thing apparent, do you notice anything else? 

*Copyright &copy; 2019 The Data Incubator.  All rights reserved.*