In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-2.3.0.tar.gz (211.9MB)
[K    100% |████████████████████████████████| 211.9MB 1.5kB/s eta 0:00:01 1% |▋                               | 4.1MB 6.6MB/s eta 0:00:32    7% |██▎                             | 15.3MB 11.2MB/s eta 0:00:18 14.7MB/s eta 0:00:12    21% |███████                         | 45.8MB 9.2MB/s eta 0:00:19    23% |███████▌                        | 49.6MB 6.4MB/s eta 0:00:26    24% |███████▉                        | 51.9MB 14.7MB/s eta 0:00:11/s eta 0:00:11[K    33% |██████████▊                     | 71.1MB 7.5MB/s eta 0:00:19:00:11    38% |████████████▍                   | 82.0MB 15.7MB/s eta 0:00:09            | 97.5MB 7.7MB/s eta 0:00:15   | 101.7MB 14.3MB/s eta 0:00:08MB/s eta 0:00:08��███████▉               | 111.8MB 19.2MB/s eta 0:00:06  54% |█████████████████▌              | 116.0MB 17.1MB/s eta 0:00:06    55% |█████████████████▉              | 118.2MB 12.2MB/s eta 0:00:08    56% |██████████████████              | 118.8MB 7.6M

### Download requisite data to make available in notebook

In [2]:
%%sh
wget --no-verbose https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2017-06.csv
wget -O taxi_zone_lookup.csv --no-verbose https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

2018-01-25 02:22:47 URL:https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2017-06.csv [87477546/87477546] -> "green_tripdata_2017-06.csv.1" [1]
2018-01-25 02:22:47 URL:https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv [12322/12322] -> "taxi_zone_lookup.csv" [1]


### Register special SQL magics

In [3]:
from IPython.core.magic import register_line_cell_magic

# https://github.com/LucaCanali/Miscellaneous/blob/master/Pyspark_SQL_Magic_Jupyter/IPython_Pyspark_SQL_Magic.ipynb
# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return spark.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return spark.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return spark.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return spark.sql(val).explain(detailed_explain)

### Build a SparkSession, the gateway to everything in Spark (2.x)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import numpy as np
import time
import plotly
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode(connected=True)

In [3]:
spark = SparkSession.builder.appName(name="PySpark Intro").master("local[*]").getOrCreate().newSession()
# .config("spark.jars", "hadoop-aws-2.7.3.jar")
spark

In [7]:
spark.sparkContext.uiWebUrl

'http://10.36.57.120:4040'

In [9]:
spark.sparkContext._jsc.addJar("hadoop-aws-2.7.3.jar")

### Create Spark DataFrame from CSV with header and inferred Schema

In [6]:
%%time
green_trips = spark.read\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .csv("green_tripdata_2017-06.csv")

CPU times: user 0 ns, sys: 12.3 ms, total: 12.3 ms
Wall time: 39.1 s


In [7]:
green_trips.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)



In [8]:
green_trips.createOrReplaceTempView("green_trips")
revenue_by_hour = spark.sql("""
SELECT hour(lpep_pickup_datetime), SUM(total_amount) AS total
FROM green_trips
GROUP BY hour(lpep_pickup_datetime)
ORDER BY hour(lpep_pickup_datetime) ASC""")
revenue_by_hour.cache().count()

24

![caching](Screen Shot 2018-01-24 at 12.39.47 PM.png)

In [9]:
revenue_by_hour.explain()

== Physical Plan ==
InMemoryTableScan [hour(lpep_pickup_datetime)#54, total#52]
   +- InMemoryRelation [hour(lpep_pickup_datetime)#54, total#52], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Sort [hour(lpep_pickup_datetime)#54 ASC NULLS FIRST], true, 0
            +- Exchange rangepartitioning(hour(lpep_pickup_datetime)#54 ASC NULLS FIRST, 200)
               +- *HashAggregate(keys=[hour(lpep_pickup_datetime#13, Some(Etc/UTC))#59], functions=[sum(total_amount#28)])
                  +- Exchange hashpartitioning(hour(lpep_pickup_datetime#13, Some(Etc/UTC))#59, 200)
                     +- *HashAggregate(keys=[hour(lpep_pickup_datetime#13, Some(Etc/UTC)) AS hour(lpep_pickup_datetime#13, Some(Etc/UTC))#59], functions=[partial_sum(total_amount#28)])
                        +- *FileScan csv [lpep_pickup_datetime#13,total_amount#28] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/nbuser/library/green_tripdata_2017-06.csv], PartitionFilter

### Write out revenue by hour via Spark

In [10]:
# revenue_by_hour.write.csv("green_revenue_by_hour")

In [11]:
!ls -alrth green_revenue_by_hour/part*
# 25 files, each with 1 line, and 1 file with 0 lines (!!!)

-rw-r--r-- 1 nbuser nbuser 21 Jan 24 20:11 green_revenue_by_hour/part-00022-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 20 Jan 24 20:11 green_revenue_by_hour/part-00006-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 21 Jan 24 20:11 green_revenue_by_hour/part-00003-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 20 Jan 24 20:11 green_revenue_by_hour/part-00016-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 21 Jan 24 20:11 green_revenue_by_hour/part-00020-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 20 Jan 24 20:11 green_revenue_by_hour/part-00008-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 21 Jan 24 20:11 green_revenue_by_hour/part-00014-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbuser nbuser 21 Jan 24 20:11 green_revenue_by_hour/part-00015-2c744d1b-8d98-4de9-9fd8-55f0ddd1041d-c000.csv
-rw-r--r-- 1 nbu

In [12]:
pandas_rev_df = revenue_by_hour.toPandas()

### Write revenue by hour to single CSV file via Pandas
#### Note: Spark jobs are distributed and writing is done by each executor when using Spark's write API. Calling .toPandas() is _not_ free and will throw errors on larger datasets, because the *single driver machine* now has to hold the _entire dataset_ in memory

In [13]:
# pandas_rev_df.to_csv("revenue_by_hour.csv")

In [14]:
!ls -alrth revenue_by_hour.csv
# Ahhhh, back to 1 file :phew:

-rw-r--r-- 1 nbuser nbuser 592 Jan 24 20:18 revenue_by_hour.csv


In [15]:
display(green_trips.sort(F.rand()).limit(5).toPandas())

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type
0,2,2017-06-15 18:26:02,2017-06-15 18:28:10,N,1,80,80,1,0.43,3.5,1.0,0.5,1.59,0.0,,0.3,6.89,1,1
1,2,2017-06-30 20:40:36,2017-06-30 20:48:45,N,1,112,255,1,1.18,7.0,0.5,0.5,1.66,0.0,,0.3,9.96,1,1
2,2,2017-06-20 15:23:06,2017-06-20 16:04:21,N,1,40,255,1,4.55,24.5,0.0,0.5,5.06,0.0,,0.3,30.36,1,1
3,2,2017-06-16 16:53:06,2017-06-16 17:00:59,N,1,95,95,1,0.77,6.5,1.0,0.5,0.0,0.0,,0.3,8.3,2,1
4,1,2017-06-18 19:36:42,2017-06-18 20:14:34,N,1,75,33,1,9.6,33.0,0.0,0.5,3.0,0.0,,0.3,36.8,1,1


In [16]:
green_trips.rdd.getNumPartitions()

2

In [17]:
taxi_zones = spark.read\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .csv("taxi_zone_lookup.csv")
taxi_zones.cache().count()

265

In [18]:
pu_taxi_zones = taxi_zones.toDF("PULocationID", "PUBorough", "PUZone", "PUservice_zone") 
# select([F.col(c).alias("PU{c}".format(c=c)) for c in taxi_zones.columns])
green_trips_with_locations = green_trips.join(pu_taxi_zones, "PULocationID")

do_taxi_zones = taxi_zones.toDF("DOLocationID", "DOBorough", "DOZone", "DOservice_zone") 
# .select([F.col(c).alias("DO{c}".format(c=c)) for c in taxi_zones.columns])
green_trips_with_locations = green_trips_with_locations.join(do_taxi_zones, "DOLocationID")

In [19]:
display(green_trips_with_locations.limit(5).toPandas())

Unnamed: 0,DOLocationID,PULocationID,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,passenger_count,trip_distance,fare_amount,...,improvement_surcharge,total_amount,payment_type,trip_type,PUBorough,PUZone,PUservice_zone,DOBorough,DOZone,DOservice_zone
0,265,35,2,2017-06-01 00:33:45,2017-06-01 01:39:52,N,4,1,90.41,404.5,...,0.3,413.51,2,1,Brooklyn,Brownsville,Boro Zone,Unknown,,
1,75,263,2,2017-06-01 00:33:55,2017-06-01 23:36:23,N,1,1,0.38,4.0,...,0.3,6.89,1,1,Manhattan,Yorkville West,Yellow Zone,Manhattan,East Harlem South,Boro Zone
2,43,255,2,2017-06-01 00:12:07,2017-06-01 00:12:07,N,1,1,9.49,31.0,...,0.3,40.38,1,1,Brooklyn,Williamsburg (North Side),Boro Zone,Manhattan,Central Park,Yellow Zone
3,256,256,2,2017-06-01 00:17:26,2017-06-01 00:20:01,N,1,1,0.59,4.0,...,0.3,6.36,1,1,Brooklyn,Williamsburg (South Side),Boro Zone,Brooklyn,Williamsburg (South Side),Boro Zone
4,135,130,2,2017-06-01 00:17:22,2017-06-01 00:24:21,N,1,3,2.08,8.5,...,0.3,9.8,2,1,Queens,Jamaica,Boro Zone,Queens,Kew Gardens Hills,Boro Zone


In [20]:
green_trips_with_locations.createOrReplaceTempView("green_trips_with_locations")

In [21]:
%%sql_display
SELECT total_amount > 0 AS is_net_fare, COUNT(1)
FROM green_trips_with_locations
GROUP BY total_amount > 0

Unnamed: 0,is_net_fare,count(1)
0,True,972114
1,False,4353


### Quartiles by Total Revenue

In [22]:
quartile_df = green_trips_with_locations.filter("total_amount > 0")\
  .select(
    # assign quartiles by total_amount    
    F.ntile(4).over(Window.partitionBy().orderBy("total_amount")).alias("quartile"), 
    F.col("total_amount"),
    F.col("tip_amount")
  )
# quartile_df.cache().count() # override lazy caching to eagerly cache by forcing the count action

In [23]:
display(quartile_df.sort(F.rand()).limit(10).toPandas())

Unnamed: 0,quartile,total_amount,tip_amount
0,4,21.3,0.0
1,3,16.25,0.0
2,3,13.56,2.26
3,4,21.96,3.66
4,3,11.8,0.0
5,2,10.38,2.08
6,2,10.38,2.08
7,2,10.8,0.0
8,3,13.3,0.0
9,4,22.85,4.55


### Analyze Revenue, Tips and % of Total Revenue by Quartile 

In [24]:
quartile_df_agg =  quartile_df.groupBy("quartile").agg(
  F.round(F.avg("total_amount"),2).alias("avg_total"), 
  F.round(F.expr("percentile(total_amount, 0.5)"),2).alias("median_total"), # percentile is a hive UDAF
  F.round(F.avg("tip_amount"),2).alias("avg_tip"),
  F.expr("percentile(tip_amount, 0.5)").alias("median_tip"), # percentile is a hive UDAF
  F.round(F.avg(F.col("tip_amount")/F.col("total_amount"))*100,2).alias("avg_tip_pct"),
  F.count(F.lit(1)).alias("trips"), 
  F.round(F.sum("total_amount"),2).alias("revenue")
)
total_revenue = quartile_df_agg.agg(F.sum("revenue")).first()["sum(revenue)"]
display(quartile_df_agg.select("*", F.round((F.col("revenue")/F.lit(total_revenue))*100,2).alias("rev_pct")).toPandas())

Unnamed: 0,quartile,avg_total,median_total,avg_tip,median_tip,avg_tip_pct,trips,revenue,rev_pct
0,1,6.48,6.8,0.23,0.0,3.33,243029,1574406.33,11.1
1,2,9.57,9.36,0.65,0.0,6.73,243029,2325976.0,16.4
2,3,13.91,13.8,1.12,0.0,7.96,243028,3381087.79,23.84
3,4,28.4,24.35,2.85,2.96,9.67,243028,6902469.09,48.66


In [34]:
grouped_by_hour = green_trips_with_locations \
.filter("lpep_pickup_datetime LIKE '2017-06-27%'") \
.groupBy(
  F.window("lpep_pickup_datetime", "60 minutes").getField("start").alias("pickup_window")
)
pickups_by_hour_pdf = grouped_by_hour \
  .count() \
  .withColumnRenamed("count", "hour_count") \
  .sort(F.asc("pickup_window")) \
  .toPandas()

In [35]:
from plotly.graph_objs import *

data = [Bar(x=pickups_by_hour_pdf.pickup_window,
            y=pickups_by_hour_pdf.hour_count)]

layout = Layout(title="Pickups on June 27, 2016 by Hour",
                xaxis=dict(title='Hour of Day'),
                yaxis=dict(title='Pickups'))
fig = Figure(data=data, layout=layout)

iplot(fig, filename='jupyter/basic_bar')

In [36]:
tip_pct_by_hour_pdf = grouped_by_hour.agg(
    F.round(F.avg(F.col("tip_amount")/F.col("total_amount"))*100,2).alias("avg_tip_pct")
  )\
  .sort(F.asc("pickup_window"))\
  .toPandas()

In [37]:
from plotly.graph_objs import *

data = [Bar(x=tip_pct_by_hour_pdf.pickup_window,
            y=tip_pct_by_hour_pdf.avg_tip_pct)]

layout = Layout(title="Avg Tip % on June 27, 2016 by Hour",
                xaxis=dict(title='Hour of Day'),
                yaxis=dict(title='Avg Tip %'))
fig = Figure(data=data, layout=layout)

iplot(fig, filename='jupyter/basic_bar')

In [38]:
tip_pct_by_hour_pdf

Unnamed: 0,pickup_window,avg_tip_pct
0,2017-06-27 00:00:00,6.09
1,2017-06-27 01:00:00,5.17
2,2017-06-27 02:00:00,5.2
3,2017-06-27 03:00:00,5.08
4,2017-06-27 04:00:00,4.54
5,2017-06-27 05:00:00,6.74
6,2017-06-27 06:00:00,6.96
7,2017-06-27 07:00:00,7.63
8,2017-06-27 08:00:00,7.6
9,2017-06-27 09:00:00,7.9


In [29]:
trace1 = Bar(x=pickups_by_hour_pdf.pickup_window,
            y=pickups_by_hour_pdf.hour_count,
            name="Pickups",
            yaxis='y2')
trace2 = Scatter(x=tip_pct_by_hour_pdf.pickup_window,
            y=tip_pct_by_hour_pdf.avg_tip_pct,
            name="Tip %")
data = [trace1, trace2]
layout = Layout(
    title='Pickups and Tip % by Hour',
    yaxis=dict(
        title='Tip %'
    ),
    yaxis2=dict(
        title='Pickups',
        titlefont=dict(
            color='rgb(148, 103, 189)'
        ),
        tickfont=dict(
            color='rgb(148, 103, 189)'
        ),
        overlaying='y',
        side='right'
    )
)
fig = Figure(data=data, layout=layout)
plot_url = iplot(fig, filename='multiple-axes-double')

#### Accumulator

In [30]:
rows_accum = spark.sparkContext.accumulator(0)
revenue_by_hour.foreach(lambda r: rows_accum.add(1)) # rowsAccum.add(1))
print("Accumulator Rows: {ar}".format(ar=rows_accum.value))
print("DF Row Count: {c}".format(c=revenue_by_hour.count()))

Accumulator Rows: 24
DF Row Count: 24


### Python-specific Advice
#### My blog post [here](http://garrens.com/blog/2018/01/06/scaling-python-for-data-science-using-spark/) covers the topic of scaling Python for Spark with examples and thorough explanation

#### Avoid Python-based UDFs - instead using *PySpark* APIs like pyspark.sql.functions, which utilize the JVM for high performance.  

Note: in this notebook, the performance doesn't appear to be much different, but in many other circumstances, expect the performance hit from using python APIs in Spark to be more pronounced. With Spark 2.3 coming out, vectorized UDFs will be supported for Python which will greatly improve performance. See [this post](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html) for more details

In [31]:
%%time
timestamp_to_epoch = F.udf(lambda t: int(t.strftime("%s")))
green_trips_with_locations.select(
    timestamp_to_epoch(green_trips_with_locations.lpep_pickup_datetime),
    F.col("DOBorough"),
    F.col("DOZone")
    ).distinct().count()

CPU times: user 18.1 ms, sys: 12.3 ms, total: 30.4 ms
Wall time: 36.9 s


In [32]:
%%time
print(green_trips_with_locations.select(
    F.unix_timestamp(green_trips_with_locations.lpep_pickup_datetime),
    F.col("DOBorough"),
    F.col("DOZone")
).distinct().count())

970934
CPU times: user 853 µs, sys: 10.3 ms, total: 11.1 ms
Wall time: 34.7 s


#### Be cautious when using [your favorite Python packages here] with Spark

Python package code (such as pandas, numpy, etc) executes on one machine (the driver) using 1 core while Spark is distributed. 

You probably **can** use any package you like, but just because you can **doesn’t mean you should**.

#### Friends don't let friends use python packages with spark unwittingly

#### No, don’t use RDDs; use [Spark] SQL!

RDDs are the foundational data structure of Spark, but they are not optimized. DataFrames and all other SQL functionality is automatically optimized through Catalyst and Tungsten. Python is exceptionally slow when using RDDs due to the [de]serialization and python process overhead compared to the DataFrame APIs.

![Spark SQL](Screen Shot 2018-01-24 at 11.22.49 AM.png)

### Common problems

In [33]:
df = spark.read.csv("i_dont_exist.csv")

# Py4JJavaError: An error occurred while calling o33.csv.
# : org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/nbuser/library/i_dont_exist.csv;

AnalysisException: 'Path does not exist: file:/home/nbuser/library/i_dont_exist.csv;'

#### Truck loads of small files = death by 1,000 very slow cuts

In [None]:
%%time
df = spark.read.csv("small_files/")
df.count()

# CPU times: user 13.9 ms, sys: 1.33 ms, total: 15.2 ms
# ** Wall time: 1min 7s**
# mkdir small_files
# split -l 2000 green_tripdata_2017-06.csv small_files/
# ~500 files 150-200KB each :(
# http://garrens.com/blog/2017/11/04/big-data-spark-and-its-small-files-problem/

#### GZip is _not splittable_ therefore cannot be parallelized across cluster - re: 1 unsplittable file = 1 partition
options: 

1) don't use GZIP

2) call repartition(num_partitions) immediately after reading in file(s), which will shuffle the data across the network to each executor roughly equally. Expensive network-wise, but worthwhile if you're accessing the data frequently.

In [None]:
%%time
df = spark.read.csv("green_tripdata_2017-06.csv.gz")

# CPU times: user 2.08 ms, sys: 756 µs, total: 2.83 ms
# Wall time: 289 ms

# cat green_tripdata_2017-06.csven_tripdata_2017-06.csv | gzip --fast > green_tripdata_2017-06.csv.gz

#### Most basic incarnation of reading CSV file without header OR inferred Schema, therefore every column has a generic name and is of String type

In [39]:
%%time
df = spark.read.csv("green_tripdata_2017-06.csv")
df.count()
# CPU times: user 1.56 ms, sys: 566 µs, total: 2.12 ms
# Wall time: ** 259 ms **

CPU times: user 6.93 ms, sys: 2.06 ms, total: 8.99 ms
Wall time: 33.7 s
