### Spark Practice Notebook

For this practice we will be using the FHV 2019-10 data found [here](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz)


In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
pyspark.__version__

'3.5.1'

In [5]:
pyspark.__file__

'/home/divij/spark/spark-3.5.1-bin-hadoop3/python/pyspark/__init__.py'

In [2]:
#local spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 18:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
from pyspark.sql import types

schema = types.StructType([ 
    types.StructField('dispatching_base_num', 
                types.StringType(), True), 
    types.StructField('pickup_datetime', 
                types.TimestampType(), True), 
    types.StructField('dropOff_datetime', 
                types.TimestampType(), True), 
    types.StructField('PUlocationID', 
                types.IntegerType(), True), 
    types.StructField('DOlocationID', 
                types.IntegerType(), True),
    types.StructField('SR_Flag', 
                types.StringType(), True),
    types.StructField('Affiliated_base_number', 
                types.StringType(), True)     
]) 

In [29]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/home/divij/notebooks/fhv_tripdata_2019-10.csv')

In [21]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
+--------------------+------------------

In [30]:
# create 6 partitions in our dataframe
df = df.repartition(6)

df.write.parquet('/home/divij/notebooks/raw/', 'overwrite')

                                                                                

In [31]:
df_trips_data=df.registerTempTable('trips_data')

In [42]:
# count of records - 15 Oct
spark.sql("""
SELECT 
    COUNT(1) AS number_records
FROM
    trips_data
WHERE
    pickup_datetime > '2019-10-15 00:00:00'
and pickup_datetime <= '2019-10-16 00:00:00'
""").show()

[Stage 55:>                                                         (0 + 4) / 4]

+--------------+
|number_records|
+--------------+
|         62610|
+--------------+





In [39]:
# maximum trip duration in hours
spark.sql("""
SELECT 
extract(day from MAX(dropOff_datetime - pickup_datetime)*24) as DURATION
FROM
    trips_data
""").show()



+--------+
|DURATION|
+--------+
|  631152|
+--------+



                                                                                

In [43]:
df_taxi_zone = spark.read \
    .option("header", "true") \
    .csv('/home/divij/notebooks/taxi_zone_lookup.csv')

In [45]:
df_taxi_zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [47]:

schema = types.StructType([ 
    types.StructField('LocationID', 
                types.IntegerType(), True), 
    types.StructField('Borough', 
                types.StringType(), True),
    types.StructField('Zone', 
                types.StringType(), True),
    types.StructField('service_zone', 
                types.StringType(), True)       
]) 

In [48]:
df_taxi_zone = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('/home/divij/notebooks/taxi_zone_lookup.csv')

In [52]:
df_result = df.join(df_taxi_zone, df.PUlocationID == df_taxi_zone.LocationID)


In [54]:
df_result.select('pickup_datetime', 'Borough', 'Zone').show(10)

                                                                                

+-------------------+-------------+-----------------+
|    pickup_datetime|      Borough|             Zone|
+-------------------+-------------+-----------------+
|2019-10-07 10:41:27|      Unknown|               NV|
|2019-10-04 08:27:31|      Unknown|               NV|
|2019-10-08 08:15:00|      Unknown|               NA|
|2019-10-02 10:38:24|      Unknown|               NV|
|2019-10-01 10:25:21|Staten Island|    Port Richmond|
|2019-10-02 07:52:48|      Unknown|               NV|
|2019-10-07 06:54:42|      Unknown|               NV|
|2019-10-07 12:13:02|      Unknown|               NV|
|2019-10-03 09:44:39|      Unknown|               NV|
|2019-10-03 21:36:36|    Manhattan|Battery Park City|
+-------------------+-------------+-----------------+
only showing top 10 rows



In [55]:
df_result=df_result.registerTempTable('trips_data_zoned')

In [58]:
# least common PU Zone
spark.sql("""
SELECT
    Zone,
    COUNT(1) AS number_records
FROM
    trips_data_zoned
GROUP BY
Zone
ORDER BY count(1) 
""").show(5)

                                                                                

+--------------------+--------------+
|                Zone|number_records|
+--------------------+--------------+
|         Jamaica Bay|             1|
|Governor's Island...|             2|
| Green-Wood Cemetery|             5|
|       Broad Channel|             8|
|     Highbridge Park|            14|
+--------------------+--------------+
only showing top 5 rows



