### Libraries and INitializing PySpark

In [1]:
from pyspark import SparkContext

# sc is short for spark context
sc = SparkContext(appName="IntroToSpark")

ModuleNotFoundError: No module named 'pyspark'

In [None]:
taxi_entry = sc.parallelize(['2009-01-01', 0, 0, 24])

In [None]:
print(taxi_entry)

In [None]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="IntroTo Spark")# < write code here >
weather_entry = sc.parallelize(['2009-01-01', 15.1, 26.1])# < write code here >
print(weather_entry.take(3))

In [None]:
from pyspark.sql import SparkSession

APP_NAME = 'sampleApp'

spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

### DataFrames in PySpark

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

taxi_df = pd.read_csv('/datasets/pickups_terminal_5.csv')
taxi = spark.createDataFrame(taxi_df)

print(taxi)

In [None]:
[Row(date='2009-01-01', hour=0, minute=0, pickups=24.0), 
Row(date='2009-01-01', hour=0, minute=30, pickups=35.0), 
Row(date='2009-01-01', hour=1, minute=0, pickups=25.0), 
Row(date='2009-01-01', hour=1, minute=30, pickups=25.0), 
Row(date='2009-01-01', hour=2, minute=0, pickups=16.0)]

In [None]:
# you can break the line using the symbol '\'
spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

In [None]:
# format='csv' - specify file format
# header='true' - specify that there is a header (column names) in the file
# inferSchema='true' - specify that data types should be inferred
taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', format='csv', header='true', inferSchema='true')# < write code here >

print(taxi.show(5))# < write code here >

In [None]:
print(taxi.count()) #total number of rows in the dataframe

In [None]:
print(taxi[['date', 'hour', 'minute']].show(5)) #susbset columns

### Missing Data in Spark

In [None]:
print(taxi.describe().show())

In [None]:
taxi = taxi.dropna()
print(taxi.count())# < write code here >

In [None]:
taxi = taxi.fillna(0)
print(taxi.describe().show())

### EDA in PySpark SQL

In [None]:
print(taxi.summary().show())

*For future reference*
We observe that the pickups data is skewed to the right, since the mean is greater than the median (the row with "50%"). We also notice that the maximum value is 310, the mean is 29, and the standard deviation is 22.45. So the maximum is significantly more than three standard deviations from the mean. Therefore, it is clearly an outlier. 

In [None]:
taxi.registerTempTable("taxi")

In [None]:
print(spark.sql("SELECT COUNT(*) FROM taxi WHERE pickups > 100").show())

+--------+
|count(1)|
+--------+
|    1431|
+--------+

In [None]:
print(spark.sql("SELECT MIN(CAST(date as DATE)), MAX(CAST(date as DATE)) FROM taxi").show())

+-----------------------+-----------------------+
|min(CAST(date AS DATE))|max(CAST(date AS DATE))|
+-----------------------+-----------------------+
|             2009-01-01|             2016-06-30|
+-----------------------+-----------------------+

#Over seven years!!

In [None]:
print(spark.sql("SELECT * FROM taxi ORDER BY pickups DESC").show(5))

+-------------------+----+------+-------+
|               date|hour|minute|pickups|
+-------------------+----+------+-------+
|2015-11-01 00:00:00|   1|    30|  310.0|
|2010-09-23 00:00:00|  22|    30|  288.0|
|2012-03-07 00:00:00|  21|     0|  268.0|
|2011-03-02 00:00:00|  20|    30|  264.0|
|2011-03-02 00:00:00|  18|    30|  263.0|
+-------------------+----+------+-------+
only showing top 5 rows

In [None]:
print(spark.sql("SELECT COUNT(DISTINCT date) FROM taxi WHERE pickups > 200").show())# < write your code here >

+--------------------+
|count(DISTINCT date)|
+--------------------+
|                  21|
+--------------------+

### GroupBy

In [None]:
print(taxi.groupBy("date").mean().select("date", "avg(pickups)").show())

+-------------------+------------------+
|               date|      avg(pickups)|
+-------------------+------------------+
|2009-06-10 00:00:00| 28.48936170212766|
|2009-10-15 00:00:00|35.895833333333336|
|2010-03-25 00:00:00|28.458333333333332|
|2010-04-19 00:00:00|33.208333333333336|
|2010-05-03 00:00:00|             44.75|
|2010-08-21 00:00:00|           17.8125|
|2010-10-22 00:00:00|36.208333333333336|
|2010-11-02 00:00:00|            34.625|
|2011-05-25 00:00:00|35.829787234042556|
|2011-10-10 00:00:00| 26.76595744680851|
|2011-12-04 00:00:00|              23.0|
|2012-01-22 00:00:00|31.020833333333332|
|2012-07-11 00:00:00|              31.0|
|2012-10-20 00:00:00|32.041666666666664|
|2013-10-31 00:00:00|41.729166666666664|
|2014-08-04 00:00:00| 40.06382978723404|
|2015-04-26 00:00:00|22.145833333333332|
|2015-07-14 00:00:00|30.630434782608695|
|2015-10-10 00:00:00|            30.375|
|2016-03-11 00:00:00|              23.5|
+-------------------+------------------+
only showing top 20 rows

In [None]:
print(taxi.groupBy("date").mean().select("date", "avg(pickups)") \
      .sort("avg(pickups)", ascending=False).show())

+-------------------+------------------+
|               date|      avg(pickups)|
+-------------------+------------------+
|2012-03-07 00:00:00| 84.41666666666667|
|2011-03-02 00:00:00|             82.75|
|2012-03-09 00:00:00| 71.52083333333333|
|2014-03-05 00:00:00| 69.02083333333333|
|2012-03-10 00:00:00| 66.41666666666667|
|2012-03-22 00:00:00|             64.25|
|2013-03-22 00:00:00|            64.125|
|2013-11-01 00:00:00| 64.02083333333333|
|2013-03-06 00:00:00|62.723404255319146|
|2012-03-08 00:00:00|              62.0|
|2014-03-20 00:00:00|            61.625|
|2011-03-17 00:00:00| 60.97872340425532|
|2011-03-05 00:00:00|60.270833333333336|
|2011-03-04 00:00:00|           58.4375|
|2011-03-06 00:00:00|              58.0|
|2009-03-04 00:00:00|56.645833333333336|
|2011-05-04 00:00:00|             55.75|
|2014-03-06 00:00:00| 54.97872340425532|
|2015-03-04 00:00:00|54.854166666666664|
|2012-03-23 00:00:00|54.645833333333336|
+-------------------+------------------+
only showing top 20 rows

In [None]:
# If your query is too long, you can split the string in two.
# Python automatically joins the parts if there is no comma.
print(spark.sql('SELECT date, AVG(pickups) FROM taxi '
                'GROUP BY date ORDER BY AVG(pickups) DESC').show())

+-------------------+------------------+
|               date|      avg(pickups)|
+-------------------+------------------+
|2012-03-07 00:00:00| 84.41666666666667|
|2011-03-02 00:00:00|             82.75|
|2012-03-09 00:00:00| 71.52083333333333|
|2014-03-05 00:00:00| 69.02083333333333|
|2012-03-10 00:00:00| 66.41666666666667|
|2012-03-22 00:00:00|             64.25|
|2013-03-22 00:00:00|            64.125|
|2013-11-01 00:00:00| 64.02083333333333|
|2013-03-06 00:00:00|62.723404255319146|
|2012-03-08 00:00:00|              62.0|
|2014-03-20 00:00:00|            61.625|
|2011-03-17 00:00:00| 60.97872340425532|
|2011-03-05 00:00:00|60.270833333333336|
|2011-03-04 00:00:00|           58.4375|
|2011-03-06 00:00:00|              58.0|
|2009-03-04 00:00:00|56.645833333333336|
|2011-05-04 00:00:00|             55.75|
|2014-03-06 00:00:00| 54.97872340425532|
|2015-03-04 00:00:00|54.854166666666664|
|2012-03-23 00:00:00|54.645833333333336|
+-------------------+------------------+
only showing top 20 rows

In [None]:
print(spark.sql('SELECT EXTRACT(month FROM date), AVG(pickups) FROM taxi '
                'GROUP BY EXTRACT(month FROM date) ORDER BY AVG(pickups) DESC').show())

+-------------------------+------------------+
|month(CAST(date AS DATE))|      avg(pickups)|
+-------------------------+------------------+
|                        3| 34.61413319776309|
|                       10|31.492839171666343|
|                        2|29.856671982987773|
|                        5| 29.81593638978176|
|                        4|29.313725490196077|
|                        9|29.158446485623003|
|                       11|28.860367558929283|
|                        1|  28.5473244004438|
|                        6| 27.03835736129314|
|                        7| 26.45983005021244|
|                       12| 26.45916884626562|
|                        8| 25.88592750533049|
+-------------------------+------------------+

In [None]:
print(spark.sql('SELECT hour, AVG(pickups) FROM taxi '
                'GROUP BY hour ORDER BY AVG(pickups) DESC').show(10))

+----+------------------+
|hour|      avg(pickups)|
+----+------------------+
|   8| 48.98208348725527|
|   9| 45.74220335855324|
|  18|45.131967515688444|
|  19| 40.18456995201181|
|  17| 37.68493909191584|
|  12| 36.91678966789668|
|  10|36.391031555637575|
|  14|35.965867158671585|
|   7| 35.93711855002774|
|  13| 35.34939091915836|
+----+------------------+