In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('test')\
        .getOrCreate()

22/03/01 23:59:37 WARN Utils: Your hostname, ZPS713 resolves to a loopback address: 127.0.1.1; using 172.26.238.3 instead (on interface eth0)
22/03/01 23:59:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/01 23:59:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark.version

'3.0.3'

In [7]:
from pyspark.sql import types

In [8]:
schema = types.StructType([
    types.StructField('hvfhs_license_num',types.StringType(),True),
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropoff_datetime',types.TimestampType(),True),
    types.StructField('PULocationID',types.IntegerType(),True),
    types.StructField('DOLocationID',types.IntegerType(),True),
    types.StructField('SR_Flag',types.StringType(),True)
])

In [9]:
df = spark.read\
    .option("header","true")\
    .schema(schema)\
    .csv('fhvhv_tripdata_2021-02.csv')

In [17]:
df = df.repartition(24)

In [18]:
df.write.parquet('fhvhv/2021/02', mode="overwrite")

                                                                                

In [40]:
fhvhv_df = spark.read.parquet('fhvhv/2021/02/*')



In [41]:
fhvhv_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [20]:
from pyspark.sql import functions as F

In [55]:
timeFmt = "yyyy-MM-dd HH:mm:ss"
timeDiff = (F.unix_timestamp('dropoff_datetime', format=timeFmt)
            - F.unix_timestamp('pickup_datetime', format=timeFmt))/60
fhvhv_df = fhvhv_df.withColumn("trip_duration", timeDiff)

In [56]:
fhvhv_df.registerTempTable('fhvhv_trips_data')

In [44]:
#Question 3: Records on Feb 2021-02-15
spark.sql("""
SELECT
    count(1)
FROM
    fhvhv_trips_data
WHERE
    date_format(pickup_datetime, 'yyyyMMdd') = '20210215'
""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+





In [82]:
#Question 4: Day with the longest trip
spark.sql("""
SELECT date_format(pickup_datetime, 'yyyy-MM-dd')
FROM fhvhv_trips_data 
WHERE trip_duration =
    (SELECT
        MAX(trip_duration)
    FROM
        fhvhv_trips_data) 
""").show()

[Stage 130:>                                                        (0 + 3) / 3]

+----------------------------------------+
|date_format(pickup_datetime, yyyy-MM-dd)|
+----------------------------------------+
|                              2021-02-11|
+----------------------------------------+





In [81]:
#Question 5: Stages for most frequent dispatching_base_num
spark.sql("""
    (SELECT dispatching_base_num,
    COUNT(1) as count_dispatch_base_num
    FROM fhvhv_trips_data 
    GROUP BY dispatching_base_num)
""").createOrReplaceTempView("count_of_dispatching_base_num")

spark.sql("""
SELECT dispatching_base_num 
FROM count_of_dispatching_base_num
WHERE count_dispatch_base_num =
    (SELECT MAX(count_dispatch_base_num)
    FROM count_of_dispatching_base_num)
""").show()

                                                                                

+--------------------+
|dispatching_base_num|
+--------------------+
|              B02510|
+--------------------+



In [83]:
#Question 6: Most common locations pair
spark.sql("""
    (SELECT concat(PULocationID, '|', DOLocationID) AS location_pair,
    COUNT(concat(PULocationID, '|', DOLocationID)) as count_location_pair
    FROM fhvhv_trips_data 
    GROUP BY concat(PULocationID, '|', DOLocationID) )
""").createOrReplaceTempView("count_of_location_pair")

spark.sql("""
    SELECT location_pair 
    FROM count_of_location_pair 
    WHERE count_location_pair =
    (SELECT MAX(count_location_pair)
    FROM count_of_location_pair)
""").show()

                                                                                

+-------------+
|location_pair|
+-------------+
|        76|76|
+-------------+

