In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
spark

In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [5]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
from pyspark.sql import types

schema = types.StructType([types.StructField('dispatching_base_num', types.StringType(), True)
                         , types.StructField('pickup_datetime', types.TimestampType(), True)
                         , types.StructField('dropoff_datetime', types.TimestampType(), True)
                         , types.StructField('PUlocationID', types.IntegerType(), True)
                         , types.StructField('DOlocationID', types.IntegerType(), True)
                         , types.StructField('SR_Flag', types.StringType(), True)
                         , types.StructField('Affiliated_base_number', types.StringType(), True)])

In [11]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [12]:
df_partitioned=df.repartition(6)

In [13]:
df_partitioned.write.parquet('fhv_partitioned',mode='overwrite')

In [14]:
from pyspark.sql import functions as F

In [15]:
df = df \
    .withColumn('pickup_date',F.to_date(df.pickup_datetime))

In [16]:
df.registerTempTable('trips_data_sql')



In [17]:
spark.sql("""
SELECT COUNT(*)
FROM trips_data_sql
WHERE DATE(pickup_datetime) = '2019-10-15'
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [36]:
df= df.withColumn('pickup_timestamp',F.to_timestamp(df.pickup_datetime))\
      .withColumn('dropoff_timestamp', F.to_timestamp(df.dropoff_datetime))\
      .withColumn('trip_duration_seconds',(F.unix_timestamp("dropoff_timestamp") - F.unix_timestamp('pickup_timestamp')))

In [37]:
df.registerTempTable('trips_data_sql')

In [38]:
spark.sql("""
SELECT MAX(trip_duration_seconds)/60/60
FROM trips_data_sql
""").show()

+----------------------------------------+
|((max(trip_duration_seconds) / 60) / 60)|
+----------------------------------------+
|                                631152.5|
+----------------------------------------+



In [43]:
lookup_df = spark.read \
                 .option("header", "true") \
                 .csv('taxi_zone_lookup.csv')

lookup_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [44]:
lookup_df.registerTempTable('lookup_df_sql')



In [46]:
spark.sql("""
SELECT lookup_df_sql.zone, COUNT(*)
FROM trips_data_sql
LEFT JOIN lookup_df_sql
    ON trips_data_sql.PUlocationID = lookup_df_sql.LocationID
GROUP BY 1
ORDER BY 2
""").show()

+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows

