In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


22/02/26 14:25:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/26 14:25:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
spark.po

## Question 1. Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local spark session 
* Execute `spark.version`

What's the output?  
`Ans: ` 3.0.3

In [4]:
spark.version

'3.0.3'


## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

```bash
wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
```

Read it with Spark using the same schema as we did 
in the lessons. We will use this dataset for all
the remaining questions.

Repartition it to 24 partitions and save it to
parquet.

What's the size of the folder with results (in MB)?  
`Ans:` 208M

In [5]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-26 14:26:44--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.42.220
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.42.220|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.1’


2022-02-26 14:26:55 (64.3 MB/s) - ‘fhvhv_tripdata_2021-02.csv.1’ saved [733822658/733822658]



In [6]:
!wc -l fhvhv_tripdata_2021-02.csv

11613943 fhvhv_tripdata_2021-02.csv


In [7]:
from pyspark.sql import types

In [8]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [9]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [None]:
df = df.repartition(24)

In [None]:
df.write.parquet('fhvhv/2021/02/')

In [10]:
df = spark.read.parquet('fhvhv/2021/02/')

                                                                                

In [11]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [13]:
ls -lh fhvhv/2021/02

total 208M
-rw-r--r-- 1 dexhrestha dexhrestha    0 Feb 26 14:07 _SUCCESS
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00000-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00001-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00002-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00003-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00004-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00005-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00006-d17f6cd5-b801-4ca8-95b4-f5cc6eb06ad7-c000.snappy.parquet
-rw-r--r-- 1 dexhrestha dexhrestha 8.7M Feb 26 14:06 part-00007-d17f6cd

## Question 3. Count records 

How many taxi trips were there on February 15?  
Consider only trips that started on February 15.  
`Ans:` 367170

In [12]:
from pyspark.sql import functions as F

In [15]:
df \
    .withColumn('pickup_date',F.to_date(df.pickup_datetime)) \
    .select('pickup_date','dropoff_datetime','PULocationID','DOLocationID') \
    .where('pickup_date==date("2021-02-15")') \
    .count()

                                                                                

367170


## Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest?   

`Ans:` 2021-02-11

In [17]:
df.head(2)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02887', pickup_datetime=datetime.datetime(2021, 2, 6, 1, 18, 35), dropoff_datetime=datetime.datetime(2021, 2, 6, 1, 40, 34), PULocationID=163, DOLocationID=235, SR_Flag=None),
 Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 5, 7, 13, 6), dropoff_datetime=datetime.datetime(2021, 2, 5, 7, 31, 56), PULocationID=225, DOLocationID=181, SR_Flag=None)]

In [157]:
df \
    .withColumn('duration',F.to_timestamp(F.col('dropoff_datetime')).cast('long')-F.to_timestamp(F.col('pickup_datetime')).cast('long')) \
    .selectExpr('cast(pickup_datetime as date) as pickup_date','dropoff_datetime','duration') \
    .sort(F.col('duration'),ascending=False) \
    .show(1)



+-----------+-------------------+--------+
|pickup_date|   dropoff_datetime|duration|
+-----------+-------------------+--------+
| 2021-02-11|2021-02-12 10:39:44|   75540|
+-----------+-------------------+--------+
only showing top 1 row



                                                                                

## Question 5. Most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` 
in this dataset.

How many stages this spark job has?  
`Ans:` 2 stages
> Note: the answer may depend on how you write the query,
> so there are multiple correct answers. 
> Select the one you have.


In [36]:
df.registerTempTable('fhv')

In [41]:
dispatching_base_count = spark.sql('''
    SELECT dispatching_base_num,count(dispatching_base_num) as count
    FROM fhv 
    GROUP BY (dispatching_base_num)
    ORDER BY(count) DESC
''')

In [43]:
dispatching_base_count.show(3)



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
+--------------------+-------+
only showing top 3 rows



                                                                                

In [52]:
df \
    .select('dispatching_base_num') \
    .groupBy('dispatching_base_num') \
    .count().alias('frequency') \
    .orderBy('frequency.count',ascending=False) \
    .show(3)



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
+--------------------+-------+
only showing top 3 rows



                                                                                


## Question 6. Most common locations pair

Find the most common pickup-dropoff pair. 

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash  
`Ans:`  East New York / East New York


If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East". 

In [55]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-02-26 14:59:59--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.97.77
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.97.77|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv.1’


2022-02-26 15:00:00 (89.2 MB/s) - ‘taxi+_zone_lookup.csv.1’ saved [12322/12322]



In [71]:
schema = types.StructType([
    types.StructField("LocationID",types.IntegerType(),True),
    types.StructField("Borough",types.StringType(),True),
    types.StructField("Zone",types.StringType(),True),
    types.StructField("service_zone",types.StringType(),True)
    ])

In [77]:
zones = spark.read.csv('taxi+_zone_lookup.csv',header=True,schema=schema) 
zones.show(3)          

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [121]:
zones.registerTempTable('zones')

In [156]:
df \
    .groupBy('PULocationID','DOLocationID') \
    .count() \
    .join(zones,(df.PULocationID==zones.LocationID),how='left') \
    .selectExpr('PULocationID','DOLocationID','count','Zone as PUZone') \
    .join(zones,(df.DOLocationID==zones.LocationID),how='left') \
    .selectExpr('PULocationID','DOLocationID','count','PUZone','Zone as DOZone') \
    .withColumn('Zone Pair',F.concat(F.col('PUZone'),F.lit(' / '),F.col('DOZone'))) \
    .select('Zone Pair','PULocationID','DOLocationID','count') \
    .orderBy('count',ascending=False) \
    .show(3,truncate=False)



+-----------------------------+------------+------------+-----+
|Zone Pair                    |PULocationID|DOLocationID|count|
+-----------------------------+------------+------------+-----+
|East New York / East New York|76          |76          |45041|
|Borough Park / Borough Park  |26          |26          |37329|
|Canarsie / Canarsie          |39          |39          |28026|
+-----------------------------+------------+------------+-----+
only showing top 3 rows



                                                                                

## Bonus question. Join type

(not graded) 

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?  
`Ans:`  The join type is left join  

And how many stages your spark job has?  
`Ans:`  There are 2 stages