In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

data_path = 'data/fhv_tripdata_2019-10.csv.gz'

df = spark.read \
    .option("header", "true") \
    .csv(data_path)

df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 22:55:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B0

In [7]:
output_path = 'data/pq/'
df \
    .repartition(6) \
    .write.parquet(output_path)

                                                                                

In [2]:
from pyspark.sql import types
import pyspark.sql.functions as f

In [15]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True), StructField('pickup_timestamp', LongType(), True), StructField('dropoff_timestamp', LongType(), True), StructField('time_difference_seconds', LongType(), True)])

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(data_path)

In [6]:
df.filter(
    (f.col('pickup_datetime') >= '2019-10-15 00:00:00')
    & (f.col('pickup_datetime') <= '2019-10-15 23:59:59')
).count()

                                                                                

62610

In [16]:
from pyspark.sql.functions import col, unix_timestamp, from_unixtime, max

In [9]:
df = df.withColumn("pickup_timestamp", unix_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("dropoff_timestamp", unix_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss")) \
       .withColumn("time_difference_seconds", col("dropoff_timestamp") - col("pickup_timestamp"))


In [13]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+-----------------+-----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_timestamp|dropoff_timestamp|time_difference_seconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+-----------------+-----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|      1569889380|       1569890100|                    720|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|      1569888689|       1569888802|                    113|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|         

In [17]:
df.select(max(df.time_difference_seconds)).show()

[Stage 6:>                                                          (0 + 1) / 1]

+----------------------------+
|max(time_difference_seconds)|
+----------------------------+
|                  2272149000|
+----------------------------+



                                                                                

In [33]:
2272149000/60/60

631152.5

In [21]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('data/taxi_zone_lookup.csv')

df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [22]:
df_join = df.join(df_zones, df.PUlocationID == df_zones.LocationID,  "outer")

In [23]:
df_join.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+-----------------+-----------------------+----------+-------+----+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|pickup_timestamp|dropoff_timestamp|time_difference_seconds|LocationID|Borough|Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+-----------------+-----------------------+----------+-------+----+------------+
|              B02416|2019-10-01 00:56:00|2019-10-01 01:05:00|        null|        null|   null|                B02416|      1569891360|       1569891900|                    540|      null|   null|null|        null|
|              B02416|2019-10-01 01:01:00|2019-10-01 01:17:12|        null|        null|   null|                B02416|      1569891660|

                                                                                

In [26]:
df_join.registerTempTable('df')



In [30]:
spark.sql("""
SELECT 
    zone, count(zone)
FROM
    df
    where PUlocationID is not null
GROUP BY
    1
    order by 2 asc
""").show()

[Stage 46:>                                                         (0 + 1) / 1]

+--------------------+-----------+
|                zone|count(zone)|
+--------------------+-----------+
|         Jamaica Bay|          1|
|Governor's Island...|          2|
| Green-Wood Cemetery|          5|
|       Broad Channel|          8|
|     Highbridge Park|         14|
|        Battery Park|         15|
|Saint Michaels Ce...|         23|
|Breezy Point/Fort...|         25|
|Marine Park/Floyd...|         26|
|        Astoria Park|         29|
|    Inwood Hill Park|         39|
|       Willets Point|         47|
|Forest Park/Highl...|         53|
|  Brooklyn Navy Yard|         57|
|        Crotona Park|         62|
|        Country Club|         77|
|     Freshkills Park|         89|
|       Prospect Park|         98|
|     Columbia Street|        105|
|  South Williamsburg|        110|
+--------------------+-----------+
only showing top 20 rows



                                                                                