In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd
from pyspark.sql.functions import col, when, dayofmonth, from_unixtime, expr


In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
df_pandas = pd.read_parquet("fhv_tripdata_2019-10.parquet")

In [6]:
df_pandas.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [7]:
output_path = "fhv_tripdata_2019-10_modified.parquet"
df_pandas.to_parquet(output_path)

In [8]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.LongType(), True),
    types.StructField('dropOff_datetime', types.LongType(), True),
    types.StructField('PUlocationID', types.DoubleType(), True),
    types.StructField('DOlocationID', types.DoubleType(), True),
    types.StructField('SR_Flag', types.IntegerType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [9]:
#spark_df = spark.createDataFrame(df_pandas, schema)

In [10]:
#spark_df.head()

In [11]:
#spark.conf.set("spark.sql.parquet.int96AsTimestamp", "true")
spark.conf.set("spark.sql.session.timeZone", "GMT")


In [12]:
df = spark.read \
    .option("header", "true") \
    .option("timeZone", "GMT") \
    .schema(schema) \
    .parquet("fhv_tripdata_2019-10_modified.parquet")

In [13]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: long (nullable = true)
 |-- dropOff_datetime: long (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [14]:
df.head(10)

[Row(dispatching_base_num='B00009', pickup_datetime=1569889380000000000, dropOff_datetime=1569890100000000000, PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=1569888689000000000, dropOff_datetime=1569888802000000000, PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=1569888703000000000, dropOff_datetime=1569890240000000000, PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=1569891389000000000, dropOff_datetime=1569891467000000000, PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=1569889389000000000, dropOff_datetime=1569889707000000000, PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(

In [15]:
df = df.withColumn("time_difference_seconds", (df["dropOff_datetime"]-df["pickup_datetime"])/ 1000000000)
df = df.withColumn("pickup_datetime", from_unixtime(df.pickup_datetime / 1000000000))  # Convert nanoseconds to seconds
df = df.withColumn("dropOff_datetime", from_unixtime(df.dropOff_datetime / 1000000000))  # Convert nanoseconds to seconds

from pyspark.sql.functions import month, dayofmonth, hour

df = df.withColumn("pickup_month", month("pickup_datetime")) \
       .withColumn("pickup_day", dayofmonth("pickup_datetime")) \
       .withColumn("pickup_hour", hour("pickup_datetime"))

In [16]:
df.head(10)

[Row(dispatching_base_num='B00009', pickup_datetime='2019-10-01 00:23:00', dropOff_datetime='2019-10-01 00:35:00', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00009', time_difference_seconds=720.0, pickup_month=10, pickup_day=1, pickup_hour=0),
 Row(dispatching_base_num='B00013', pickup_datetime='2019-10-01 00:11:29', dropOff_datetime='2019-10-01 00:13:22', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00013', time_difference_seconds=113.0, pickup_month=10, pickup_day=1, pickup_hour=0),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:11:43', dropOff_datetime='2019-10-01 00:37:20', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B00014', time_difference_seconds=1537.0, pickup_month=10, pickup_day=1, pickup_hour=0),
 Row(dispatching_base_num='B00014', pickup_datetime='2019-10-01 00:56:29', dropOff_datetime='2019-10-01 00:57:47', PUlocationID=264.0, DOlocationID=264.0, SR_F

In [17]:
df_filtered = df.filter(col("pickup_day") == 15) \
                .select("pickup_datetime")

In [18]:
df_filtered.count()

62629

In [19]:
highest_time_difference_row = df.orderBy(col("time_difference_seconds").desc()).limit(10)

In [20]:
highest_time_difference_row.head(10)

[Row(dispatching_base_num='B02832', pickup_datetime='2019-10-28 09:00:00', dropOff_datetime='2091-10-28 09:30:00', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B02832', time_difference_seconds=2272149000.0, pickup_month=10, pickup_day=28, pickup_hour=9),
 Row(dispatching_base_num='B02832', pickup_datetime='2019-10-11 18:00:00', dropOff_datetime='2091-10-11 18:30:00', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B02832', time_difference_seconds=2272149000.0, pickup_month=10, pickup_day=11, pickup_hour=18),
 Row(dispatching_base_num='B02416', pickup_datetime='2019-10-31 23:46:33', dropOff_datetime='2029-11-01 00:13:00', PUlocationID=None, DOlocationID=None, SR_Flag=None, Affiliated_base_number='B02416', time_difference_seconds=315620787.0, pickup_month=10, pickup_day=31, pickup_hour=23),
 Row(dispatching_base_num='B00746         ', pickup_datetime='2019-10-01 21:43:42', dropOff_datetime='2027-10-01 21:45:23', PUlocationID=

In [43]:
from pyspark.sql.functions import substring, concat, lit
# Extract year part
year_part = lit("2019")

# Replace month with 10
month_part = lit("-10")

# Extract day and time parts
day_time_part = substring(df["dropOff_datetime"], 8, 15)

# Concatenate corrected year, month, day, and time parts
new_dropOff_datetime = concat(year_part, month_part, day_time_part)

# Replace the original dropOff_datetime column with the corrected values
df_corrected = df.withColumn("dropOff_datetime", new_dropOff_datetime)

df_corrected.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------------+------------+----------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|time_difference_seconds|pickup_month|pickup_day|pickup_hour|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------------+------------+----------+-----------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|       264.0|       264.0|   null|                B00009|                  720.0|          10|         1|          0|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|       264.0|       264.0|   null|                B00013|                  113.0|          10|         1|          0|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|       264.0|       264.0|   null|    

In [27]:
from pyspark.sql.functions import unix_timestamp

In [44]:
df_corrected = df_corrected.withColumn("dropOff_unix_timestamp", unix_timestamp(df_corrected["dropOff_datetime"], "yyyy-MM-dd HH:mm:ss"))
df_corrected = df_corrected.withColumn("pickup_unix_timestamp", unix_timestamp(df_corrected["pickup_datetime"], "yyyy-MM-dd HH:mm:ss"))
df_corrected = df_corrected.withColumn("time_difference_hours", (df_corrected["dropOff_unix_timestamp"]-df_corrected["pickup_unix_timestamp"])/3600)


In [46]:
highest_time_difference_row = df_corrected.orderBy(col("time_difference_hours").desc()).limit(10)
highest_time_difference_row.head(10)


[Row(dispatching_base_num='B02532', pickup_datetime='2019-10-04 23:00:00', dropOff_datetime='2019-10-24 12:40:00', PUlocationID=87.0, DOlocationID=171.0, SR_Flag=None, Affiliated_base_number='B02532', time_difference_seconds=1690800.0, pickup_month=10, pickup_day=4, pickup_hour=23, dropOff_unix_timestamp=1571920800, pickup_unix_timestamp=1570230000, time_difference_hours=469.6666666666667),
 Row(dispatching_base_num='B02546', pickup_datetime='2019-10-02 09:00:01', dropOff_datetime='2019-10-20 09:40:09', PUlocationID=264.0, DOlocationID=136.0, SR_Flag=None, Affiliated_base_number='B02546', time_difference_seconds=1557608.0, pickup_month=10, pickup_day=2, pickup_hour=9, dropOff_unix_timestamp=1571564409, pickup_unix_timestamp=1570006801, time_difference_hours=432.6688888888889),
 Row(dispatching_base_num='B01985', pickup_datetime='2019-10-07 20:47:00', dropOff_datetime='2019-10-24 11:45:00', PUlocationID=264.0, DOlocationID=264.0, SR_Flag=None, Affiliated_base_number='B02617', time_diffe