In [40]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

In [41]:
pd.DataFrame.iteritems = pd.DataFrame.items

In [42]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [43]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
# !find . -name 'fhv_tripdata_2019-10.csv.gz' -exec gzip -d {} \;

In [44]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [45]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

# df = spark.read \
#     .options( 
#     header = "true", \
#     inferSchema = "true" \
#             ) \
#     .csv('fhv_tripdata_2019-10.csv')

In [None]:
df.head(5)

In [None]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [None]:
df_pandas = pd.read_csv('head.csv')

In [None]:
df_pandas.dtypes

In [None]:
df_pandas

In [None]:
# Drop column that is causing issues
df_pandas.drop("Affiliated_base_number", axis=1, inplace=True)

In [None]:
spark.createDataFrame(df_pandas)

In [None]:
spark.createDataFrame(df_pandas).show()

In [46]:
from pyspark.sql import types

In [47]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [48]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [49]:
df = df.repartition(6)

In [50]:
df.write.mode('overwrite').parquet('fhvhv/2019/10/')

24/03/03 06:24:02 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 6
CSV file: file:///home/buzzkanga1/code/fhv_tripdata_2019-10.csv


                                                                                

In [51]:
df = spark.read.parquet('fhvhv/2019/10/')

In [52]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [53]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+--------------------+-------------------+-------------------+------------+------------+-------+
|              B02947|2019-10-05 06:30:00|2019-10-05 06:34:11|         167|          69|   null|
|              B01437|2019-10-06 16:05:09|2019-10-06 16:10:16|         264|         134|   null|
|     B01467         |2019-10-02 07:15:02|2019-10-02 07:49:28|         193|         157|   null|
|              B00459|2019-10-06 17:58:34|2019-10-06 18:05:28|         264|          76|   null|
|              B01196|2019-10-01 22:37:52|2019-10-01 22:53:12|         264|         254|   null|
|              B00937|2019-10-03 03:13:10|2019-10-03 03:19:57|         264|         243|   null|
|              B01083|2019-10-07 17:06:53|2019-10-07 17:10:42|         264|         243|   null|
|              B02677|2019-10-

In [54]:
df.select('dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID').filter(df.dispatching_base_num == 'B00256').show()

+--------------------+-------------------+-------------------+------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+--------------------+-------------------+-------------------+------------+------------+
|              B00256|2019-10-03 12:54:53|2019-10-03 13:30:55|         264|         264|
|              B00256|2019-10-03 21:33:19|2019-10-03 22:27:33|         264|         264|
|              B00256|2019-10-04 10:45:56|2019-10-04 11:19:49|         264|         264|
|              B00256|2019-10-01 14:25:46|2019-10-01 14:48:25|         264|         264|
|              B00256|2019-10-05 08:35:36|2019-10-05 08:54:40|         264|         264|
|              B00256|2019-10-07 13:17:12|2019-10-07 13:58:16|         264|         264|
|              B00256|2019-10-06 13:30:24|2019-10-06 14:10:33|         264|         264|
|              B00256|2019-10-03 07:03:45|2019-10-03 07:49:57|         264|         264|
|              B00256

In [55]:
from pyspark.sql import functions as F

In [None]:
!head -n 10 head.csv

In [None]:
# def crazy_stuff(base_num):
#     num = int(base_num[1:])
#     if num % 7 == 0:
#         return f's/{num:03x}'
#     elif num % 3 == 0:
#         return f'a/{num:03x}'
#     else:
#         return f'e/{num:03x}'

In [None]:
# crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [None]:
# df \
#     .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
#     .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
#     .withColumn('base_id', crazy_stuff_udf(df.hvfhs_license_num)) \
#     .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
#     .show()

In [66]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('length', ((df.dropoff_datetime.cast('double') - df.pickup_datetime.cast('double'))/3600)) \
    .sort(F.col('length').desc()) \
    .select('dispatching_base_num', 'pickup_datetime', 'dropoff_datetime', 'pickup_date', 'dropoff_date', 'length', 'PULocationID', 'DOLocationID') \
    .show()



+--------------------+-------------------+-------------------+-----------+------------+------------------+------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pickup_date|dropoff_date|            length|PULocationID|DOLocationID|
+--------------------+-------------------+-------------------+-----------+------------+------------------+------------+------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00| 2019-10-28|  2091-10-28|          631152.5|         264|         264|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00| 2019-10-11|  2091-10-11|          631152.5|         264|         264|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00| 2019-10-31|  2029-11-01| 87672.44083333333|        null|        null|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23| 2019-10-01|  2027-10-01| 70128.02805555555|         159|         264|
|              B02921|2019-10-17 14:00:00|2020-10-18 00:00:00| 2019-1

                                                                                

In [None]:
df.select('dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID').filter(df.pickup_datetime >= '2019-10-16').count()

In [None]:
df.count()

In [None]:
date = datetime.datetime.strptime(string_date, "2019-10-15")

In [None]:
date.dtypes

In [77]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('length', ((df.dropoff_datetime.cast('double') - df.pickup_datetime.cast('double'))/3600)) \
    .select('dispatching_base_num', 'pickup_datetime', 'dropoff_datetime', 'pickup_date', 'dropoff_date', 'length', 'PULocationID', 'DOLocationID') \
    .show()

# Group by PULocationID and count occurrences
grouped_df = df.groupBy(F.col('PULocationID')).count()

# Sort by count in descending order
sorted_grouped_df = grouped_df.orderBy(F.col('count').asc())

# Show the results
sorted_grouped_df.show()

+--------------------+-------------------+-------------------+-----------+------------+-------------------+------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pickup_date|dropoff_date|             length|PULocationID|DOLocationID|
+--------------------+-------------------+-------------------+-----------+------------+-------------------+------------+------------+
|              B02947|2019-10-05 06:30:00|2019-10-05 06:34:11| 2019-10-05|  2019-10-05|0.06972222222222223|         167|          69|
|              B01437|2019-10-06 16:05:09|2019-10-06 16:10:16| 2019-10-06|  2019-10-06|0.08527777777777777|         264|         134|
|     B01467         |2019-10-02 07:15:02|2019-10-02 07:49:28| 2019-10-02|  2019-10-02| 0.5738888888888889|         193|         157|
|              B00459|2019-10-06 17:58:34|2019-10-06 18:05:28| 2019-10-06|  2019-10-06|              0.115|         264|          76|
|              B01196|2019-10-01 22:37:52|2019-10-01 22:53:12|