In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder\
    .master("local[*]") \
    .appName("app") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/09 18:14:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [8]:
df = spark.read \
        .option('header','true') \
        .schema(schema) \
        .csv('fhvhv_tripdata_2021-06.csv')

In [9]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+-------------------+-------------------+------------+------------+-------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|
+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 5 rows



23/03/09 18:16:58 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/rohit/data-engineering-zoomcamp/week_5_batch_processing/homework/fhvhv_tripdata_2021-06.csv


In [10]:
df = df.repartition(12)

In [13]:
df.write.parquet("fhvh/2021/06", mode='overwrite')

23/03/05 17:57:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, SR_Flag, Affiliated_base_number
 Schema: hvfhs_license_num, dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, SR_Flag
Expected: hvfhs_license_num but found: dispatching_base_num
CSV file: file:///home/rohit/data-engineering-zoomcamp/week_5_batch_processing/homework/fhvhv_tripdata_2021-06.csv
                                                                                

In [11]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [12]:
df.registerTempTable("trip_data")



In [61]:
#ANSWER1:  how many taxi trips were started on june 15
res1 = spark.sql(
"""
SELECT count(*)
FROM trip_data
where DATE(pickup_datetime)='2021-06-15'
""")
res1.show()



+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

In [16]:
cols = [
 'dispatching_base_num',
 'pickup_date',
 'dropoff_date',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [17]:
df = df.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)).select(cols)

In [20]:
df.show(2)

23/03/09 18:24:25 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/rohit/data-engineering-zoomcamp/week_5_batch_processing/homework/fhvhv_tripdata_2021-06.csv

+--------------------+-----------+------------+------------+------------+-------+
|dispatching_base_num|pickup_date|dropoff_date|PULocationID|DOLocationID|SR_Flag|
+--------------------+-----------+------------+------------+------------+-------+
|              B02875| 2021-06-01|  2021-06-01|         138|         144|      N|
|              B02884| 2021-06-04|  2021-06-04|          83|         265|      N|
+--------------------+-----------+------------+------------+------------+-------+
only showing top 2 rows



                                                                                

In [38]:
# Answer 2: longest trip duration
spark.sql('''

select  EXTRACT(days from  (dropoff_datetime - pickup_datetime))*24 + EXTRACT(hours from  (dropoff_datetime - pickup_datetime)) as trip_time
from trip_data order by trip_time desc LIMIT 1
''').show()

                                                                                

+---------+
|trip_time|
+---------+
|       66|
+---------+



In [39]:
df_zones = spark.read.option('header','true').csv('taxi_zone_lookup.csv')

In [42]:
df_zones.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [45]:
#ANSWER 4: name of most frequent pickup location zones

combined_df = df.join(df_zones,  df.PULocationID == df_zones.LocationID)
combined_df = combined_df.drop('LocationID')

In [47]:
combined_df.show(3)

23/03/09 19:16:36 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/rohit/data-engineering-zoomcamp/week_5_batch_processing/homework/fhvhv_tripdata_2021-06.csv

+--------------------+-----------+------------+------------+------------+-------+----------+-------+-----------------+------------+
|dispatching_base_num|pickup_date|dropoff_date|PULocationID|DOLocationID|SR_Flag|LocationID|Borough|             Zone|service_zone|
+--------------------+-----------+------------+------------+------------+-------+----------+-------+-----------------+------------+
|              B02875| 2021-06-01|  2021-06-01|         138|         144|      N|       138| Queens|LaGuardia Airport|    Airports|
|              B02884| 2021-06-04|  2021-06-04|          83|         265|      N|        83| Queens| Elmhurst/Maspeth|   Boro Zone|
|              B02875| 2021-06-04|  2021-06-04|         129|         255|      N|       129| Queens|  Jackson Heights|   Boro Zone|
+--------------------+-----------+------------+------------+------------+-------+----------+-------+-----------------+------------+
only showing top 3 rows



                                                                                

In [49]:
combined_df.registerTempTable("combined_data")



In [60]:
#ANSWER 4: name of most frequent pickup location zones
val = spark.sql('''

select PULocationID, Zone, count(*) as pickup_count
from combined_data 
group by PULocationID, Zone HAVING
pickup_count = (select MAX(pickup_count) FROM
(select PULocationID, count(*) as pickup_count from combined_data 
GROUP BY PULocationID) AS pickup_counts)
limit 1
''')
val.show()

                                                                                

+------------+-------------------+------------+
|PULocationID|               Zone|pickup_count|
+------------+-------------------+------------+
|          61|Crown Heights North|      231279|
+------------+-------------------+------------+



In [59]:
combined_df.columns

['dispatching_base_num',
 'pickup_date',
 'dropoff_date',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Borough',
 'Zone',
 'service_zone']