In [67]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
import pandas as pd

In [38]:
spark = (
    SparkSession.builder
    .appName("MySparkApp")
    .getOrCreate()
)

In [39]:
df = spark.read.option("header", "true").csv("fhv_tripdata_2019-10.csv.gz")

In [40]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [41]:
df.count()

1897493

In [42]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

### The inferred schema by Spark is really bad so we use pandas

In [43]:
df_sample = df.limit(100)

In [44]:
df_sample_pd = df_sample.to_pandas_on_spark()



In [45]:
df_sample_pd.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014


In [46]:
df_sample_pd.dtypes

dispatching_base_num      object
pickup_datetime           object
dropOff_datetime          object
PUlocationID              object
DOlocationID              object
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [47]:
df_pd = pd.read_csv("fhv_tripdata_2019-10.csv.gz", nrows=100)

In [48]:
df_pd.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID                int64
DOlocationID                int64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [49]:
df_pd.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014


In [50]:
type(df_pd)

pandas.core.frame.DataFrame

In [51]:
df_spark = spark.createDataFrame(df_pd)

AttributeError: 'DataFrame' object has no attribute 'iteritems'

Problem with pandas version dependency and pyspark version

In [59]:
schema = T.StructType([
    T.StructField("dispatching_base_num", T.StringType(), True),
    T.StructField("pickup_datetime", T.TimestampType(), True),
    T.StructField("dropOff_datetime", T.TimestampType(), True),
    T.StructField("PUlocationID", T.IntegerType(), True),
    T.StructField("DOlocationID", T.IntegerType(), True),
    T.StructField("SR_Flag", T.StringType(), True),
    T.StructField("Affiliated_base_number", T.StringType(), True),
])

In [60]:
df_schema = spark.read.option("header", "true").schema(schema).csv("fhv_tripdata_2019-10.csv.gz")

In [61]:
df_schema.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [62]:
df_repartitioned = df_schema.repartition(6)

In [65]:
df_repartitioned.write.mode("overwrite").parquet("six_repartitions")

                                                                                

In [66]:
df_repartitioned.show()

[Stage 29:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B01318|2019-10-23 10:45:08|2019-10-23 10:50:15|         264|         209|   null|                B01318|
|              B03060|2019-10-25 13:57:31|2019-10-25 14:23:09|         264|         132|   null|                B02617|
|              B01145|2019-10-03 01:29:24|2019-10-03 01:36:13|         264|         213|   null|                B02971|
|              B03157|2019-10-24 11:09:21|2019-10-24 11:37:45|         264|         260|   null|                B03157|
|              B00647|2019-10-07 04:31:44|2019-10-07 04:47:33|         264|         147|   null|                B00647|
|              B00256|2019-10-04 19:33:5

                                                                                

In [68]:
target_date = "2019-10-15"
df_filtered = df_repartitioned.filter(F.to_date(F.col("pickup_datetime")) == target_date)

In [69]:
df_filtered.count()

                                                                                

62610

In [70]:
df_repartitioned = df_repartitioned.withColumn("duration_secs", F.col("dropoff_datetime").cast("Long") - F.col("pickup_datetime").cast("Long"))

In [71]:
df_repartitioned = df_repartitioned.withColumn("duration_hours", F.col("duration_secs") / 3600)

In [72]:
df_repartitioned.show()

[Stage 38:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|duration_secs|     duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+-------------------+
|              B01318|2019-10-23 10:45:08|2019-10-23 10:50:15|         264|         209|   null|                B01318|          307|0.08527777777777777|
|              B03060|2019-10-25 13:57:31|2019-10-25 14:23:09|         264|         132|   null|                B02617|         1538| 0.4272222222222222|
|              B01145|2019-10-03 01:29:24|2019-10-03 01:36:13|         264|         213|   null|                B02971|          409|0.11361111111111111|
|              B03157|2019-10-24 11:09:21|2019-10-24 11:37:45|         264| 

                                                                                

In [73]:
max_trip_duration_hours = df_repartitioned.agg(F.max(df_repartitioned.duration_hours)).head()[0]

                                                                                

In [74]:
max_trip_duration_hours

631152.5

In [75]:
taxi_zone_df = spark.read.option("header", "true").option("inferSchema", "true").csv("taxi_zone_lookup.csv")

In [76]:
taxi_zone_df.createTempView("taxi_zone_lookup")

In [77]:
test_result = spark.sql("select * from taxi_zone_lookup limit 10")

In [78]:
test_result.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+



In [79]:
df_repartitioned.createTempView("fhv_tripdata")

In [81]:
least_freq_pickup_loc_zone = spark.sql(
    """
    select
        zone.Zone,
        count(*) as trip_count
    from fhv_tripdata as trip
    left join taxi_zone_lookup as zone
        on trip.PULocationID = zone.LocationID
    group by 1
    order by 2
    """
)

In [83]:
least_freq_pickup_loc_zone.show()

[Stage 52:>                                                         (0 + 1) / 1]

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|         Jamaica Bay|         1|
|Governor's Island...|         2|
| Green-Wood Cemetery|         5|
|       Broad Channel|         8|
|     Highbridge Park|        14|
|        Battery Park|        15|
|Saint Michaels Ce...|        23|
|Breezy Point/Fort...|        25|
|Marine Park/Floyd...|        26|
|        Astoria Park|        29|
|    Inwood Hill Park|        39|
|       Willets Point|        47|
|Forest Park/Highl...|        53|
|  Brooklyn Navy Yard|        57|
|        Crotona Park|        62|
|        Country Club|        77|
|     Freshkills Park|        89|
|       Prospect Park|        98|
|     Columbia Street|       105|
|  South Williamsburg|       110|
+--------------------+----------+
only showing top 20 rows



                                                                                