In [61]:
import pyspark
from pyspark.sql import SparkSession

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()

In [11]:
pyspark.__version__

'3.4.0'

In [12]:
pyspark.__file__

'/usr/local/Cellar/apache-spark/3.4.0/libexec/python/pyspark/__init__.py'

In [14]:
df.write.parquet('zones_test', mode='overwrite')

In [15]:
!head taxi+_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


In [16]:
!ls -lh

total 72
-rw-r--r--  1 arunsuresh  staff    16K Apr 27 10:07 Untitled.ipynb
-rw-r--r--  1 arunsuresh  wheel    12K Aug 17  2016 taxi+_zone_lookup.csv
-rw-r--r--  1 arunsuresh  wheel   2.1K Apr 27 10:15 test.ipynb
drwxr-xr-x@ 6 arunsuresh  staff   192B Apr 27 10:16 [34mzones_test[m[m


In [18]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet

--2023-04-27 10:18:16--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.138.245.58, 108.138.245.96, 108.138.245.225, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.138.245.58|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 308924937 (295M) [application/x-www-form-urlencoded]
Saving to: 'fhvhv_tripdata_2021-01.parquet'


2023-04-27 10:20:27 (2.25 MB/s) - 'fhvhv_tripdata_2021-01.parquet' saved [308924937/308924937]



In [19]:
!head -n 1001 fhvhv_tripdata_2021-01.parquet > head.parquet


In [48]:
# df_pandas = \
# pq.read_table('fhvhv_tripdata_2021-01.parquet')\
# .to_pandas(safe=False)

In [87]:
table = \
pq.read_table("fhvhv_tripdata_2021-01.parquet")

df_pandas = table.filter(
    pc.less_equal(table["dropoff_datetime"],
    pa.scalar(pd.Timestamp.max))
).to_pandas()

In [88]:
df_pandas = df_pandas.iloc[:1001, ]
# df_pandas = \
# df_pandas.loc[:, ['hvfhs_license_num', 'dispatching_base_num', 'pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID', 'shared_request_flag']]


In [89]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True), StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag',

In [85]:
from pyspark.sql import types

In [99]:
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True), types.StructField('dispatching_base_num', types.StringType(), True), types.StructField('originating_base_num', types.StringType(), True), types.StructField('request_datetime', types.TimestampType(), True), types.StructField('on_scene_datetime', types.TimestampType(), True), types.StructField('pickup_datetime', types.TimestampType(), True), types.StructField('dropoff_datetime', types.TimestampType(), True), types.StructField('PULocationID', types.LongType(), True), types.StructField('DOLocationID', types.LongType(), True), types.StructField('trip_miles', types.DoubleType(), True), types.StructField('trip_time', types.LongType(), True), types.StructField('base_passenger_fare', types.DoubleType(), True), types.StructField('tolls', types.DoubleType(), True), types.StructField('bcf', types.DoubleType(), True), types.StructField('sales_tax', types.DoubleType(), True), types.StructField('congestion_surcharge', types.DoubleType(), True), types.StructField('airport_fee', types.DoubleType(), True), types.StructField('tips', types.DoubleType(), True), types.StructField('driver_pay', types.DoubleType(), True), types.StructField('shared_request_flag', types.StringType(), True), types.StructField('shared_match_flag', types.StringType(), True), types.StructField('access_a_ride_flag', types.StringType(), True), types.StructField('wav_request_flag', types.StringType(), True), types.StructField('wav_match_flag', types.StringType(), True)
    ]
)

In [100]:
# df = spark.read \
#     .schema(schema) \
#     .csv('fhvhv_tripdata_2021-01.csv')

df = spark.read \
    .schema(schema) \
    .parquet('fhvhv_tripdata_2021-01.parquet')

In [103]:
# df.show()

In [104]:
# df.head(10)

In [106]:
df = df.repartition(24)

In [108]:
# df.write.parquet('fhvhv/2021/01', mode='overwrite')

df.write.parquet('fhvhv/2021/01')



AnalysisException: [PATH_ALREADY_EXISTS] Path file:/Users/arunsuresh/Documents/arun-data-eng-zoomcamp/week_5/code/notebooks/fhvhv/2021/01 already exists. Set mode as "overwrite" to overwrite the existing path.

In [109]:
df = spark.read.parquet('files/fhvhv/2021/01')

                                                                                

In [114]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [122]:
from pyspark.sql import functions as F

In [126]:
# df.show()

In [127]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    else:
        return f'e/{num:03x}'

In [128]:
crazy_stuff('B02884')

's/b44'

In [129]:
crazy_stuff_udf = \
F.udf(crazy_stuff, returnType=types.StringType())

In [130]:
df \
.withColumn('pickup_date', F.to_date(df.pickup_datetime))\
.withColumn('dropoff_date', F.to_date(df.dropoff_datetime))\
.withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num))\
.select('base_id',\
       'pickup_date', \
       'dropoff_date',\
       'PULocationID', \
       'DOLocationID')\
.show()


[Stage 27:>                                                         (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/acc| 2021-01-11|  2021-01-11|         262|         231|
|  e/a39| 2021-01-05|  2021-01-05|          61|         181|
|  e/9ce| 2021-01-02|  2021-01-02|         100|           1|
|  e/b42| 2021-01-31|  2021-01-31|         232|           4|
|  s/af0| 2021-01-05|  2021-01-05|         162|           1|
|  e/b43| 2021-01-27|  2021-01-27|          68|          68|
|  e/9ce| 2021-01-17|  2021-01-17|         205|         205|
|  e/b35| 2021-01-30|  2021-01-30|         256|         255|
|  e/b3b| 2021-01-15|  2021-01-15|          89|          91|
|  e/9ce| 2021-01-04|  2021-01-04|         132|         102|
|  e/acc| 2021-01-11|  2021-01-11|          97|          61|
|  e/9ce| 2021-01-21|  2021-01-21|          79|          37|
|  e/b32| 2021-01-02|  2021-01-03|          26|         178|
|  e/b49| 2021-01-14|  2

                                                                                

In [121]:
df.\
select('pickup_datetime', \
       'dropoff_datetime',\
       'PULocationID', \
       'DOLocationID').filter(df.hvfhs_license_num == 'HV0003') \
.head(5)


[Row(pickup_datetime=datetime.datetime(2021, 1, 11, 10, 40, 22), dropoff_datetime=datetime.datetime(2021, 1, 11, 11, 15, 49), PULocationID=262, DOLocationID=231),
 Row(pickup_datetime=datetime.datetime(2021, 1, 5, 7, 13, 22), dropoff_datetime=datetime.datetime(2021, 1, 5, 7, 27, 50), PULocationID=61, DOLocationID=181),
 Row(pickup_datetime=datetime.datetime(2021, 1, 31, 10, 42, 9), dropoff_datetime=datetime.datetime(2021, 1, 31, 10, 59, 52), PULocationID=232, DOLocationID=4),
 Row(pickup_datetime=datetime.datetime(2021, 1, 27, 14, 24, 36), dropoff_datetime=datetime.datetime(2021, 1, 27, 14, 26, 43), PULocationID=68, DOLocationID=68),
 Row(pickup_datetime=datetime.datetime(2021, 1, 30, 0, 35, 46), dropoff_datetime=datetime.datetime(2021, 1, 30, 0, 39, 42), PULocationID=256, DOLocationID=255)]