>  # 🚕 Yellow Taxi 

## 📕 Data Dictionary - Trip Records 

* #️⃣ VendorID A code indicating the LPEP provider that provided the record.    
1= Creative Mobile Technologies, LLC; 2= VeriFone Inc. 

* ⏰ lpep_pickup_datetime The date and time when the meter was engaged.  
* ⏰ lpep_dropoff_datetime The date and time when the meter was disengaged.   

* 🧑‍🤝‍🧑 Passenger_count The number of passengers in the vehicle.    **This is a driver-entered value.**

* 📌 Trip_distance The elapsed trip distance in **miles** reported by the taximeter. 

* 📍 PULocationID TLC Taxi Zone in which the taximeter was **engaged**

* 📍 DOLocationID TLC Taxi Zone in which the taximeter was **disengaged**
 
* 💹 RateCodeID The final rate code in effect at the end of the trip.  
1= Standard rate 
2=JFK 
3=Newark 
4=Nassau or Westchester 
5=Negotiated fare 
6=Group ride 

* ✳ Store_and_fwd_flag This flag indicates whether the trip record was held in vehicle 
memory before sending to the vendor, aka “store and forward,” 
because the vehicle did not have a connection to the server.   
Y= store and forward trip 
N= not a store and forward trip 

* 💳 Payment_type A numeric code signifying how the passenger paid for the trip.  
1= Credit card 
2= Cash 
3= No charge 
4= Dispute 
5= Unknown 
6= Voided trip 

* 💰 Fare_amount The time-and-distance fare calculated by the meter. 

* 💰 Extra Miscellaneous extras and surcharges.  Currently, this only includes 
the $0.50 and $1 rush hour and overnight charges. 

* 💰 MTA_tax $0.50 MTA tax that is automatically triggered based on the metered  rate in use. 

* 💰 Improvement_surcharge $0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015. 

* 💰 Tip_amount Tip amount – This field is automatically populated for credit card  tips. **Cash tips are not included.**

* 💰 Tolls_amount Total amount of all tolls paid in trip.  

* 💰 Total_amount The total amount charged to passengers. **Does not include cash tips.** 

* 🗂 Trip_type A code indicating whether the trip was a street-hail or a dispatch 
that is automatically assigned based on the metered rate in use but 
can be altered by the driver.   
1= Street-hail 
2= Dispatch
### 📓 ATTENTION!

On 05/13/2022, we are making the following changes to trip record files:

All files will be stored in the PARQUET format. Please see the ‘Working With PARQUET Format’ under the Data Dictionaries and MetaData section.
Trip data will be published monthly (with two months delay) instead of bi-annually.
HVFHV files will now include 17 more columns (please see High Volume FHV Trips Dictionary for details). Additional columns will be added to the old files as well. The earliest date to include additional columns: February 2019.
Yellow trip data will now include 1 additional column (‘airport_fee’, please see Yellow Trips Dictionary for details). The additional column will be added to the old files as well. The earliest date to include the additional column: January 2011.

## 📚 Install Libraries & Import Libraries

In [29]:
#!pip install 'F:/30-Work/DataEngineer/03-Data/TaxiData/Lib.txt'

In [3]:
from pyspark.sql import *
from pyspark.sql.functions import *

## 💥 Create SparkSession

In [4]:
spark = SparkSession\
        .builder\
        .appName("Taxi")\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://localhost:9084")\
        .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
        .config("spark.sql.catalogImplementation", "hive")\
        .enableHiveSupport()\
        .config("spark.yarn.queue","prod")\
        .getOrCreate()
spark

23/08/04 14:34:21 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.0.41 instead (on interface ens38)
23/08/04 14:34:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/04 14:34:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/04 14:34:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [6]:
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|  default|
|  taxi_db|
+---------+



## 🎯 Data Injection

### ## Check Hadoop Location

In [7]:
%%bash
hdfs dfs  -ls /user/hive

Found 2 items
drwxrwxrwx   - hadoop supergroup          0 2023-08-04 14:01 /user/hive/Lake
drwxrwxrwx   - hadoop supergroup          0 2023-08-04 11:05 /user/hive/warehouse


### ## TO-DO Move Files to Hadoop 'Lake'

In [14]:
%%bash
YearMonth=2022-01
hdfs dfs -put -f /home/hadoop/mydata/yellow_tripdata_${YearMonth}.parquet /user/hive/Lake/yellow_tripdata_${YearMonth}.parquet

In [35]:
%%bash
hdfs dfs -put -f /home/hadoop/mydata/MastrData/taxi_location.csv /user/hive/Lake/taxi_location.csv

In [4]:
%%bash
hdfs dfs -put -f /home/hadoop/mydata/MastrData/Payment_type.csv /user/hive/Lake/Payment_type.csv

### ## TO-DO Check File Size in Hadoop

In [37]:
%%bash
hdfs dfs  -du -h /user/hive

36.4 M  36.4 M  /user/hive/Lake
0       0       /user/hive/warehouse


### ## TO-DO Read data From HDFS

In [8]:
dflocation=spark.read.csv("hdfs://localhost:9000/user/hive/Lake/taxi_location.csv",header=True)
dflocation.show(10,truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |
|7         |Queens       |Astoria                |Boro Zone   |
|8         |Queens       |Astoria Park           |Boro Zone   |
|9         |Queens       |Auburndale             |Boro Zone   |
|10        |Queens       |Baisley Park           |Boro Zone   |
|11        |Brooklyn     |Bath Beach             |Boro Zone   |
|12        |Manhattan    |Battery Park           |Yellow Zone |
+----------+-------------+-----------------------+------------+
only showing top 10 rows



                                                                                

In [39]:
dfPyment=spark.read.csv("hdfs://localhost:9000/user/hive/Lake/Payment_type.csv",header=True)
dfPyment.show(10)

+---+-------------+
| ID|         Name|
+---+-------------+
|  1| Credit card |
|  2|        Cash |
|  3|   No charge |
|  4|     Dispute |
|  5|     Unknown |
|  6| Voided trip |
+---+-------------+



In [11]:
df= spark.read.parquet('hdfs://localhost:9000/user/hive/warehouse/taxi_db.db/taxi_stg/yellow_tripdata_2022-01.parquet')
df.show(10)

[Stage 3:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.6

                                                                                

## 📺 Data Exploratory

In [41]:
df.show(vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2022-01-01 00:35:40 
 tpep_dropoff_datetime | 2022-01-01 00:53:29 
 passenger_count       | 2.0                 
 trip_distance         | 3.8                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 142                 
 DOLocationID          | 236                 
 payment_type          | 1                   
 fare_amount           | 14.5                
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 3.65                
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 21.95               
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 1------------------------------------
 VendorID              | 1        

In [42]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [43]:
dflocation.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [44]:
dfPyment.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)



In [45]:
dfPyment.show(10)

+---+-------------+
| ID|         Name|
+---+-------------+
|  1| Credit card |
|  2|        Cash |
|  3|   No charge |
|  4|     Dispute |
|  5|     Unknown |
|  6| Voided trip |
+---+-------------+



In [46]:
# distinct count PULocationID &  payment_type
display(df.select('PULocationID').distinct().count())
display(dflocation.count())
display(df.select('payment_type').distinct().count())
display(dfPyment.count())

257

265

6

6

In [47]:
df.filter(df.PULocationID.isNull()).count()

0

In [48]:
df.filter(df.payment_type.isNull()).count()

0

In [49]:
df.select('payment_type').distinct().show()

+------------+
|payment_type|
+------------+
|           0|
|           5|
|           1|
|           3|
|           2|
|           4|
+------------+



In [50]:
df.describe(["passenger_Count"]).show()

+-------+------------------+
|summary|   passenger_Count|
+-------+------------------+
|  count|           2392428|
|   mean|1.3894533085217193|
| stddev|0.9829685911276145|
|    min|               0.0|
|    max|               9.0|
+-------+------------------+



In [51]:
df.filter(df.passenger_count <= 0 ).count()

52061

In [52]:
df.describe(["total_amount"]).show(truncate=False,vertical=True)

-RECORD 0-------------------------
 summary      | count             
 total_amount | 2463931           
-RECORD 1-------------------------
 summary      | mean              
 total_amount | 19.16937009586583 
-RECORD 2-------------------------
 summary      | stddev            
 total_amount | 255.9640887564706 
-RECORD 3-------------------------
 summary      | min               
 total_amount | -480.3            
-RECORD 4-------------------------
 summary      | max               
 total_amount | 401095.62         



In [53]:
### get total amount <= 0
df.filter(df.total_amount <= 0).count()

13408

In [54]:
df.select(round(sum(df['tolls_amount'])).alias('Totl_Amount')).show(truncate=False)

+-----------+
|Totl_Amount|
+-----------+
|923918.0   |
+-----------+



### join data with payment_type

## ☢️ Data Clean

In [13]:
# Separate date from time
df = df.withColumn("date_pickup", to_date("tpep_pickup_datetime"))
df = df.withColumn("time_pickup", date_format(col("tpep_pickup_datetime"), "HH:mm:ss"))
df = df.withColumn("date_dropoff", to_date("tpep_dropoff_datetime"))
df = df.withColumn("time_dropoff", date_format(col("tpep_dropoff_datetime"), "HH:mm:ss"))

### ## TO_DO Add Duration_Time

In [14]:
# Add Duration_Time
df=df.withColumn('Duration_Time', (df.tpep_pickup_datetime - df.tpep_dropoff_datetime).substr(14,8))

### ## TO_DO Drop Clomun DateAndTime

In [15]:
cols = (["tpep_pickup_datetime", "tpep_dropoff_datetime"])
df = df.drop(*cols)

In [16]:
df.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------+------------+-------------+
|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|date_pickup|time_pickup|date_dropoff|time_dropoff|Duration_Time|
+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------+------------+-------------+
|       1|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3

                                                                                

In [19]:
spark.sql("use taxi_db;").show()

++
||
++
++



In [20]:
spark.sql("show tables;").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  taxi_db|payment_type|      false|
|  taxi_db|    taxi_stg|      false|
|  taxi_db|  taxi_years|      false|
+---------+------------+-----------+



In [None]:
spark.sql("spark.sql.legacy.createHiveTableByDefault=fales")

In [29]:
spark.sql("CREATE TABLE Taxi_Years ,to_date(FROM_UNIXTIME(UNIX_TIMESTAMP(tpep_pickup_datetime,'dd/MM/yyyy'))) as pickup_date AS SELECT  VendorID, tpep_pickup_datetime FROM taxi_stg limit 10;")

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near ','.(line 1, pos 24)

== SQL ==
CREATE TABLE Taxi_Years ,to_date(FROM_UNIXTIME(UNIX_TIMESTAMP(tpep_pickup_datetime,'dd/MM/yyyy')))as pickup_date  AS SELECT  VendorID,tpep_pickup_datetime FROM taxi_stg   limit 10;
------------------------^^^


## 📇 Data Lode

In [9]:
df.repartition(1).write.parquet(
    '/user/hive/warehouse/FactTaxi.parquet', mode="overwrite", header=True)

NameError: name 'df' is not defined

## 📊 RFM stands for recency, frequency, monetary value Analysis 

In [79]:
df.select\
    (max(df.date_pickup))\
    .show()



+----------------+
|max(date_pickup)|
+----------------+
|      2022-05-18|
+----------------+



                                                                                

# Data Transformations

In [55]:
df=df.join(dfPyment,df['payment_type']== dfPyment['id'])

In [58]:
df=df.join(dflocation,df['PULocationID'] == dflocation['LocationID'])

### join data with PULocationID

In [66]:
df\
.groupBy('Borough')\
.agg(round(sum(df.total_amount),1).alias('ToTal_Amount')\
,count('Borough').alias('Count')\
,avg('ToTal_Amount').alias('avg'))\
.sort(desc('ToTal_Amount'))\
.show(truncate=False)

[Stage 108:>                                                        (0 + 1) / 1]

+-------------+------------+-------+------------------+
|Borough      |ToTal_Amount|Count  |avg               |
+-------------+------------+-------+------------------+
|Manhattan    |3.51227684E7|2165500|16.219241948908394|
|Queens       |9248307.3   |188237 |49.131187439192445|
|Unknown      |711034.1    |23964  |29.670928476048086|
|Brooklyn     |286291.6    |10854  |26.376600331672766|
|Bronx        |106799.1    |3405   |31.36537444933998 |
|EWR          |30518.4     |347    |87.94939481267988 |
|Staten Island|9104.5      |121    |75.24347107438015 |
+-------------+------------+-------+------------------+



                                                                                

In [67]:
df.groupBy('Name')\
.agg(\
round(sum('tolls_amount'),0).alias('ToTal_Amount')\
,count('Name').alias('Count')\
,round(avg('ToTal_Amount')).alias('avg'))\
.sort(desc('ToTal_Amount'))\
.show(truncate=False)



+-------------+------------+-------+----+
|Name         |ToTal_Amount|Count  |avg |
+-------------+------------+-------+----+
| Credit card |728147.0    |1874874|20.0|
| Cash        |167073.0    |495171 |16.0|
| No charge   |2182.0      |11709  |8.0 |
| Unknown     |0.0         |1      |12.0|
| Dispute     |-651.0      |10673  |31.0|
+-------------+------------+-------+----+



In [68]:
df\
.groupBy(date_format('date_pickup','yyyy').alias('DatePickup'))\
.agg(round(sum('tolls_amount')).alias('Amount')\
,count('date_pickup').alias('Count')\
,min('tolls_amount').alias('min')\
,max('tolls_amount').alias('max')\
,mode('tolls_amount').alias('mode')
,round(avg('tolls_amount'),2).alias('avg'))\
.show()

[Stage 118:>                                                        (0 + 1) / 1]

+----------+--------+-------+-----+-----+----+----+
|DatePickup|  Amount|  Count|  min|  max|mode| avg|
+----------+--------+-------+-----+-----+----+----+
|      2009|    19.0|      8|  0.0|18.75| 0.0|2.34|
|      2022|896725.0|2392390|-31.4|193.3| 0.0|0.37|
|      2008|     7.0|      6|  0.0| 6.55| 0.0|1.09|
|      2021|     0.0|     24|  0.0|  0.0| 0.0| 0.0|
+----------+--------+-------+-----+-----+----+----+



                                                                                

In [78]:
df\
.groupBy(date_format('date_pickup','yyyy-MM',).alias('DatePickup'))\
.agg(round(sum('tolls_amount')).alias('Amount'))\
.where(col("DatePickup").like('%2022-01%'))\
.show()



+----------+--------+
|DatePickup|  Amount|
+----------+--------+
|   2022-01|896718.0|
+----------+--------+



                                                                                

test

df.select("totalAmount").filter(df.totalAmount<=0).count()
df = spark.sql("SELECT * FROM Taxi")
df
df.select("totalAmount", "extra").summary(
    "count", "min", "25%", "50%", "75%", "max").show()


df.filter("state IS NULL AND gender IS NULL").show()
df.filter(df.state.isNull() & df.gender.isNull()).show()

df.write.save("abfss://MyDataLake@onelake.dfs.fabric.microsoft.com/TaxiLake.Lakehouse/Files/All/Trnc.Taxi")

df = spark.sql("SELECT * FROM TaxiLake.TaxiCloud ")
display(df)

df.repartition(1).write.csv(
    'abfss://MyDataLake@onelake.dfs.fabric.microsoft.com/TaxiLake.Lakehouse/Files/All/alldata', mode="overwrite", header=True)

#df.repartition(1).write.parquet(
 # 'hdfs://localhost:9000/user/hive/warehouse/yellow_tripdata_2022-01.parquet", header=True)

#df.repartition(1).write.csv('abfss://MyDataLake@onelake.dfs.fabric.microsoft.com/TaxiLake.Lakehouse/Files/All/alldata',mode="overwrite",header=True)

%%bigquery
select * from `console.cloud.google.com/bigquery?ws=!1m5!1m4!4m3!1smy-project-1-390410!2spTaxi1!3spTaxi1`