In [119]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql.functions import date_format
from pyspark.sql.functions import max ,sum
from pyspark.sql.window import Window


In [24]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [15]:
print(f"pandas version {pd. __version__}")
print (pyspark.__file__)
print ( F"pyspark version {pyspark.__version__}")

pandas version 2.0.1
C:\tools\spark-3.4.2-bin-hadoop3\python\pyspark\__init__.py
pyspark version 3.4.2


## Download FHV 2019-10 

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [17]:
!gzip -d fhv_tripdata_2019-10.csv.gz

In [18]:
! wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [20]:
!head -n 100 fhv_tripdata_2019-10.csv > sample_fhv_tripdata_2019-10.csv
! wc -l sample_fhv_tripdata_2019-10.csv

100 sample_fhv_tripdata_2019-10.csv


## check the  FHV 2019-10 file

In [25]:
df_smaple = pd.read_csv('sample_fhv_tripdata_2019-10.csv')
df_smaple.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID                int64
DOlocationID                int64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [149]:
df_smaple.head(100)

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
...,...,...,...,...,...,...,...
94,B00310,2019-10-01 00:12:11,2019-10-01 00:28:41,264,254,,B00310
95,B00310,2019-10-01 00:06:02,2019-10-01 00:14:04,264,242,,B00310
96,B00310,2019-10-01 00:03:43,2019-10-01 00:07:26,264,213,,B02534
97,B00310,2019-10-01 00:37:14,2019-10-01 00:51:58,264,241,,B02879


In [26]:
spark.createDataFrame(df_smaple).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', LongType(), True), StructField('DOlocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [31]:
schema = types.StructType([
types.StructField('dispatching_base_num', types.StringType(), True), 
types.StructField('pickup_datetime', types.TimestampType(), True), 
types.StructField('dropOff_datetime', types.TimestampType(), True), 
types.StructField('PUlocationID', types.IntegerType(), True), 
types.StructField('DOlocationID', types.IntegerType(), True), 
types.StructField('SR_Flag', types.StringType(),True), 
types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [32]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [35]:
    df \
        .repartition(6) \
        .write.parquet("fhv_tripdata/2019/10")

## 2.average size of the Parquet

In [63]:
!ls -l  fhv_tripdata/2019/10/*.parquet| awk '{sum += $5; count++} END {print "average size of the Parquet is "sum/count/1024/1024 "MB"}'

average size of the Parquet is 5.85827MB


## data exploration

## 3.How many taxi trips were there on the 15th of October?

In [114]:
df_15th_records = df\
.filter(date_format(df.pickup_datetime, 'yyyy-MM-dd') == '2019-10-15')\
.count()

print (f"taxi trips were there on the 15th of October is {df_15th_records}")

taxi trips were there on the 15th of October is 62610


## 4.Longest trip for each day

In [146]:
df_trip_duration = df.withColumn('trip_duration', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))).select ('trip_duration','pickup_datetime','dropOff_datetime')
max_duration_hours = df_trip_duration.select((max('trip_duration')/60/60).alias('Longest trip by hours'))

max_duration_hours.show()


+---------------------+
|Longest trip by hours|
+---------------------+
|             631152.5|
+---------------------+



## 5.Spark UI port 
4040

## 6.Least frequent pickup location zone

### 6.1.download taxi_zone_lookup

In [154]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-04 03:48:16--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T014817Z&X-Amz-Expires=300&X-Amz-Signature=fa71e4779989395405917e86121022ad6a136bd22db24525394b6e83aa691570&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-04 03:48:17--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

## data exploration

In [148]:
! wc -l taxi_zone_lookup.csv

266 taxi_zone_lookup.csv


In [198]:
taxi_zone_lookup_df=pd.read_csv('taxi_zone_lookup.csv')
taxi_zone_lookup_df.head(10)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
5,6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
6,7,Queens,Astoria,Boro Zone
7,8,Queens,Astoria Park,Boro Zone
8,9,Queens,Auburndale,Boro Zone
9,10,Queens,Baisley Park,Boro Zone


In [176]:
spark.createDataFrame(taxi_zone_lookup_df).schema

StructType([StructField('LocationID', LongType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [177]:
schema_zone = types.StructType([
types.StructField('LocationID', types.IntegerType(), True), 
types.StructField('Borough', types.StringType(), True), 
types.StructField('Zone', types.StringType(), True), 
types.StructField('service_zone', types.StringType(), True)
])


In [178]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(schema_zone) \
    .csv('taxi_zone_lookup.csv')

In [179]:
df_zone \
.repartition(4) \
.write.parquet("taxizonelookup")

In [180]:
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [193]:
Least_frequent_PU_df = df\
.groupBy('PUlocationID')\
.count()\
.orderBy('count', ascending=True)\
.limit(1)
Least_frequent_pickup_location_df.show()

+------------+-----+
|PUlocationID|count|
+------------+-----+
|           2|    1|
+------------+-----+



In [195]:
name_Least_frequent_PU_df = Least_frequent_PU_df\
.join(df_zone, df_zone.LocationID == Least_frequent_PU_df.PUlocationID)
name_Least_frequent_pickup_location_df.select('Zone','count').show(10)

+-----------+-----+
|       Zone|count|
+-----------+-----+
|Jamaica Bay|    1|
+-----------+-----+

