In [37]:
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import as_time, as_date, to_date, to_time

In [3]:
from snowflake.snowpark.types import IntegerType,\
StringType,\
StructType,\
StructField,\
FloatType,\
DateType,\
TimestampType

In [4]:
import pandas as pd

In [5]:
connection_parameters = {
    "account":"######.us-east-2.aws",
    "user": "#####",
    "password": "#####",
    "role" : "ACCOUNTADMIN",
    "database": "uber_data"
}

In [6]:
session = Session.builder.configs(connection_parameters).create()

In [7]:
session.sql("use uber_data;").show()

------------------------------------
|"status"                          |
------------------------------------
|Statement executed successfully.  |
------------------------------------



In [7]:
session.sql('''create or replace
            file format csv_format
            type = 'csv'
            field_delimiter = '|'
            skip_header = 1;
            ''').show()

------------------------------------------------
|"status"                                      |
------------------------------------------------
|File format CSV_FORMAT successfully created.  |
------------------------------------------------



In [9]:
session.sql('''create or replace stage csv_stage
                file_format = csv_format;

''').show()

----------------------------------------------
|"status"                                    |
----------------------------------------------
|Stage area CSV_STAGE successfully created.  |
----------------------------------------------



In [10]:
session.file.put("file://E:/GritFeat/SQL/Uber_data/2023_Uber_data.csv","csv_stage")

[PutResult(source='2023_Uber_data.csv', target='2023_Uber_data.csv.gz', source_size=348446367, target_size=64091984, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

In [16]:
session.sql('list @csv_stage').show()

-----------------------------------------------------------------------------------------------------------------
|"name"                           |"size"    |"md5"                             |"last_modified"                |
-----------------------------------------------------------------------------------------------------------------
|csv_stage/2023_Uber_data.csv.gz  |64091984  |2af75b6adc5d8a96c9f33c2eff02aacd  |Sun, 21 May 2023 18:58:23 GMT  |
-----------------------------------------------------------------------------------------------------------------



In [18]:
raw_schema = StructType([ StructField("ID",IntegerType()),
                        StructField("VendorID", IntegerType()),
                        StructField("tpep_pickup_datetime", TimestampType()),
                        StructField("tpep_dropoff_datetime", TimestampType()),
                        StructField("passenger_count",IntegerType()),
                        StructField("trip_distance",FloatType()),
                        StructField("rate_code_ID",IntegerType()),
                        StructField("store_n_forward_flag" ,StringType()),
                        StructField("PU_locationID",IntegerType()),
                        StructField("DO_locationID" ,IntegerType()),
                        StructField("paymentTypeID",IntegerType()),
                        StructField("fare_amount",FloatType()),
                        StructField("extra",FloatType()),
                        StructField("mta_tax",FloatType()),
                        StructField("tip_amount",FloatType()),
                        StructField("tolls_amount",FloatType()),
                        StructField("improvement_surcharge",FloatType()),
                        StructField("total_amount",FloatType()),
                        StructField("congestion_surcharge",FloatType()),
                        StructField("airport_fee",FloatType())
                        ])

In [19]:
filepath = "@csv_stage/2023_Uber_data.csv.gz"
data = session.read\
                .schema(raw_schema)\
                .options({"field_delimiter":",",
                                "skip_header":1,
                                "compression":"gzip"})\
                    .csv(filepath)

In [182]:
data.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [138]:
data.write.mode('append').save_as_table('uber_data_raw')

In [139]:
data.select_expr("distinct VendorID").show()

--------------
|"VENDORID"  |
--------------
|2           |
|1           |
--------------



In [35]:
session.sql('''
create or replace table Vendor_DIM(
VendorID int identity,
FirstName varchar,
LastName varchar,
Contact int,
Email varchar,
Remarks varchar(50),
primary key (VendorID)
);
''').show()

------------------------------------------
|"status"                                |
------------------------------------------
|Table VENDOR_DIM successfully created.  |
------------------------------------------



In [36]:
session.sql('''
insert into Vendor_DIM values
('1','ABC','XYZ','98111111','abc@email.com','vendor1'),
('2','PQR','XYZ','98222222','pqr@email.com','vendor2'),
('3','UVW','XYZ','98333333','uvw@email.com','vendor3');
''').show()

-----------------------------
|"number of rows inserted"  |
-----------------------------
|3                          |
-----------------------------



In [39]:
session.sql('''
create or replace table  Payment_DIM(
PaymentID int,
PaymentType varchar,
primary key(PaymentID)
);
''').show()

-------------------------------------------
|"status"                                 |
-------------------------------------------
|Table PAYMENT_DIM successfully created.  |
-------------------------------------------



In [40]:
session.sql('''
insert into Payment_DIM values
(1,'Credit card'),
(2, 'Cash'),
(3, 'No charge'),
(4, 'Dispute'),
(5, 'Unknown'),
(6, 'Voided trip');
''').show()

-----------------------------
|"number of rows inserted"  |
-----------------------------
|6                          |
-----------------------------



In [298]:
session.sql('''
create or replace table Store_n_Forward_DIM(
FlagID int,
Store_N_Forward_Flag char,
Description varchar,
primary key(FlagID)
);
''').show()

---------------------------------------------------
|"status"                                         |
---------------------------------------------------
|Table STORE_N_FORWARD_DIM successfully created.  |
---------------------------------------------------



In [299]:
session.sql('''
insert into store_n_forward_DIM
values
(1,'Y', 'store and forward trip'),
(2,'N', 'not a store and forward trip');
''').show()

-----------------------------
|"number of rows inserted"  |
-----------------------------
|2                          |
-----------------------------



In [43]:
session.sql('''
create or replace table rateCode_DIM(
RateCodeID int,
RateType varchar,
primary key (RateCodeID)
);
''').show()

--------------------------------------------
|"status"                                  |
--------------------------------------------
|Table RATECODE_DIM successfully created.  |
--------------------------------------------



In [44]:
session.sql('''
insert into RateCode_DIM
values
(1,'Standard rate'),
(2,'JFK'),
(3,'Newark'),
(4,'Nassau or Westchester'),
(5,'Negotiated fare'),
(6,'Group ride');
''').show()

-----------------------------
|"number of rows inserted"  |
-----------------------------
|6                          |
-----------------------------



In [45]:
session.sql('''
select max(PU_locationID),min(PU_locationID) from uber_data_raw
''').show()

-----------------------------------------------
|"MAX(PU_LOCATIONID)"  |"MIN(PU_LOCATIONID)"  |
-----------------------------------------------
|265                   |1                     |
-----------------------------------------------



In [46]:
session.sql('''
select max(DO_locationID),min(DO_locationID) from uber_data_raw
''').show()

-----------------------------------------------
|"MAX(DO_LOCATIONID)"  |"MIN(DO_LOCATIONID)"  |
-----------------------------------------------
|265                   |1                     |
-----------------------------------------------



In [13]:
session.sql('''
select count(distinct DO_locationid) from uber_data_raw 

''').show()

-----------------------------------
|"COUNT(DISTINCT DO_LOCATIONID)"  |
-----------------------------------
|261                              |
-----------------------------------



In [7]:
session.file.put("file://E:/GritFeat/SQL/Uber_data/list_of_real_usa_addresses.csv","csv_stage")

[PutResult(source='list_of_real_usa_addresses.csv', target='list_of_real_usa_addresses.csv.gz', source_size=10788, target_size=4432, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

In [39]:
location_schema = StructType([ 
                        StructField("Address", StringType()),
                        StructField("City", StringType()),
                        StructField("State", StringType()),
                        StructField("Zip" , StringType()),
                        ])

In [9]:
session.sql('list @csv_stage;').show()

-----------------------------------------------------------------------------------------------------------------------------
|"name"                                       |"size"    |"md5"                             |"last_modified"                |
-----------------------------------------------------------------------------------------------------------------------------
|csv_stage/2023_Uber_data.csv.gz              |64091984  |2af75b6adc5d8a96c9f33c2eff02aacd  |Sun, 21 May 2023 18:58:23 GMT  |
|csv_stage/list_of_real_usa_addresses.csv.gz  |4432      |de7d4dba45a086d08f99dd010f3e7d23  |Mon, 22 May 2023 11:33:52 GMT  |
-----------------------------------------------------------------------------------------------------------------------------



In [40]:
filepath = "@csv_stage/list_of_real_usa_addresses.csv.gz"
location_data = session.read\
                .schema(location_schema)\
                .options({"field_delimiter":",",
                                "skip_header":1,
                                "compression":"gzip"})\
                    .csv(filepath)

In [41]:
location_data.show()

---------------------------------------------------------------------
|"ADDRESS"                     |"CITY"           |"STATE"  |"ZIP"   |
---------------------------------------------------------------------
|"777 Brockton Avenue"         |"Abington"       |"MA"     |"2351"  |
|"30 Memorial Drive"           |"Avon"           |"MA"     |"2322"  |
|"250 Hartford Avenue"         |"Bellingham"     |"MA"     |"2019"  |
|"700 Oak Street"              |"Brockton"       |"MA"     |"2301"  |
|"66-4 Parkhurst Rd"           |"Chelmsford"     |"MA"     |"1824"  |
|"591 Memorial Dr"             |"Chicopee"       |"MA"     |"1020"  |
|"55 Brooksby Village Way"     |"Danvers"        |"MA"     |"1923"  |
|"137 Teaticket Hwy"           |"East Falmouth"  |"MA"     |"2536"  |
|"42 Fairhaven Commons Way"    |"Fairhaven"      |"MA"     |"2719"  |
|"374 William S Canning Blvd"  |"Fall River"     |"MA"     |"2721"  |
---------------------------------------------------------------------



In [42]:
locationID = session.range(1, 300, 1).to_df("LocationID")

In [43]:
locationID.show()

----------------
|"LOCATIONID"  |
----------------
|1             |
|2             |
|3             |
|4             |
|5             |
|6             |
|7             |
|8             |
|9             |
|10            |
----------------



In [56]:
locationID = locationID.to_pandas()

In [57]:
location_data = location_data.to_pandas()

In [58]:
location_df = pd.concat([locationID,location_data],axis=1)
location_df.head(2)

Unnamed: 0,LOCATIONID,ADDRESS,CITY,STATE,ZIP
0,1,"""777 Brockton Avenue""","""Abington""","""MA""","""2351"""
1,2,"""30 Memorial Drive""","""Avon""","""MA""","""2322"""


In [60]:
session.sql('''
create or replace table location_DIM(
LocationID int identity ,
Address varchar,
City varchar,
State varchar,
ZipCode varchar,
primary key (LocationID));
''').show()

--------------------------------------------
|"status"                                  |
--------------------------------------------
|Table LOCATION_DIM successfully created.  |
--------------------------------------------



In [62]:
location_df = session.createDataFrame(location_df)

In [63]:
location_df.write.mode('append').save_as_table('location_dim')

In [64]:
session.sql('''
select * from location_dim
''').show()

---------------------------------------------------------------------------------------
|"LOCATIONID"  |"ADDRESS"                     |"CITY"           |"STATE"  |"ZIPCODE"  |
---------------------------------------------------------------------------------------
|1             |"777 Brockton Avenue"         |"Abington"       |"MA"     |"2351"     |
|2             |"30 Memorial Drive"           |"Avon"           |"MA"     |"2322"     |
|3             |"250 Hartford Avenue"         |"Bellingham"     |"MA"     |"2019"     |
|4             |"700 Oak Street"              |"Brockton"       |"MA"     |"2301"     |
|5             |"66-4 Parkhurst Rd"           |"Chelmsford"     |"MA"     |"1824"     |
|6             |"591 Memorial Dr"             |"Chicopee"       |"MA"     |"1020"     |
|7             |"55 Brooksby Village Way"     |"Danvers"        |"MA"     |"1923"     |
|8             |"137 Teaticket Hwy"           |"East Falmouth"  |"MA"     |"2536"     |
|9             |"42 Fairhaven Co

In [95]:
session.sql('''
create or replace table Time_DIM(
TimeID int identity,
Time time,
Hour int,
Minute int,
second int,
primary key (TimeID)
);
''').show()

----------------------------------------
|"status"                              |
----------------------------------------
|Table TIME_DIM successfully created.  |
----------------------------------------



In [82]:
session.sql('''
create or replace table Date_DIM(
DateID int identity,
Date date,
Day int,
Month int,
Year int,
primary key (DateID)
);
''').show()

----------------------------------------
|"status"                              |
----------------------------------------
|Table DATE_DIM successfully created.  |
----------------------------------------



In [77]:

from snowflake.snowpark.functions import date_part,as_date,date_trunc, to_date
from snowflake.snowpark.functions import col
import datetime

date_PU_DF =  data.select((to_date(col("tpep_pickup_datetime"))),
                            date_part("year",col("tpep_pickup_datetime")),
                         date_part("month",col("tpep_pickup_datetime")),
                         date_part("day",col("tpep_pickup_datetime"))).distinct()
date_PU_DF.show(3)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"TO_DATE(""TPEP_PICKUP_DATETIME"")"  |"DATE_PART('YEAR', ""TPEP_PICKUP_DATETIME"")"  |"DATE_PART('MONTH', ""TPEP_PICKUP_DATETIME"")"  |"DATE_PART('DAY', ""TPEP_PICKUP_DATETIME"")"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-01-01                           |2023                                           |1                                               |1                                             |
|2022-12-31                           |2022                                           |12                                              |31                                            |
|2022-10-24                           |2022                                     

In [78]:

date_DO_DF =  data.select((to_date(col("tpep_dropoff_datetime"))),
                            date_part("year",col("tpep_dropoff_datetime")),
                         date_part("month",col("tpep_dropoff_datetime")),
                         date_part("day",col("tpep_dropoff_datetime"))).distinct()
date_DO_DF.show(3)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"TO_DATE(""TPEP_DROPOFF_DATETIME"")"  |"DATE_PART('YEAR', ""TPEP_DROPOFF_DATETIME"")"  |"DATE_PART('MONTH', ""TPEP_DROPOFF_DATETIME"")"  |"DATE_PART('DAY', ""TPEP_DROPOFF_DATETIME"")"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-01-01                            |2023                                            |1                                                |1                                              |
|2022-12-31                            |2022                                            |12                                               |31                                             |
|2023-01-02                            |2023                

In [79]:
date_df = date_PU_DF.union(date_DO_DF)

In [80]:
date_df = date_df.distinct()

In [81]:
date_df.count()

38

In [83]:
dateID = session.range(1, 40, 1).to_df("DateID")

In [84]:

dateID = dateID.to_pandas()
date_df = date_df.to_pandas()

In [85]:
date_df = pd.concat([dateID,date_df],axis = 1)
date_df.head(2)

Unnamed: 0,DATEID,"TO_DATE(""TPEP_PICKUP_DATETIME"")","DATE_PART('YEAR', ""TPEP_PICKUP_DATETIME"")","DATE_PART('MONTH', ""TPEP_PICKUP_DATETIME"")","DATE_PART('DAY', ""TPEP_PICKUP_DATETIME"")"
0,1,2023-01-01,2023.0,1.0,1.0
1,2,2022-12-31,2022.0,12.0,31.0


In [86]:
date_df = session.createDataFrame(date_df)

In [87]:
date_df.write.mode('append').save_as_table('date_dim')

In [89]:
session.sql('''
select * from date_dim limit 2''').show()

----------------------------------------------------
|"DATEID"  |"DATE"      |"DAY"  |"MONTH"  |"YEAR"  |
----------------------------------------------------
|1         |2023-01-01  |2023   |1        |1       |
|2         |2022-12-31  |2022   |12       |31      |
----------------------------------------------------



In [93]:
from snowflake.snowpark.functions import hour,minute,second,to_time
from snowflake.snowpark.functions import col
import datetime

time_PU_DF =  data.select((to_time(col("tpep_pickup_datetime"))),
                            hour("tpep_pickup_datetime"),
                         minute("tpep_pickup_datetime"),
                         second("tpep_pickup_datetime")).distinct()
time_PU_DF.show(3)

----------------------------------------------------------------------------------------------------------------------------------------------------
|"TO_TIME(""TPEP_PICKUP_DATETIME"")"  |"HOUR(""TPEP_PICKUP_DATETIME"")"  |"MINUTE(""TPEP_PICKUP_DATETIME"")"  |"SECOND(""TPEP_PICKUP_DATETIME"")"  |
----------------------------------------------------------------------------------------------------------------------------------------------------
|00:32:10                             |0                                 |32                                  |10                                  |
|00:55:08                             |0                                 |55                                  |8                                   |
|00:25:04                             |0                                 |25                                  |4                                   |
----------------------------------------------------------------------------------------------------------

In [94]:
time_DO_DF =  data.select((to_time(col("tpep_dropoff_datetime"))),
                            hour("tpep_dropoff_datetime"),
                         minute("tpep_dropoff_datetime"),
                         second("tpep_dropoff_datetime")).distinct()
time_DO_DF.show(3)

--------------------------------------------------------------------------------------------------------------------------------------------------------
|"TO_TIME(""TPEP_DROPOFF_DATETIME"")"  |"HOUR(""TPEP_DROPOFF_DATETIME"")"  |"MINUTE(""TPEP_DROPOFF_DATETIME"")"  |"SECOND(""TPEP_DROPOFF_DATETIME"")"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------
|00:40:36                              |0                                  |40                                   |36                                   |
|01:01:27                              |1                                  |1                                    |27                                   |
|00:37:49                              |0                                  |37                                   |49                                   |
----------------------------------------------------------------------------------

In [10]:
time_PU_df.show()

-------------------------------------------
|"TIME"    |"HOUR"  |"MINUTE"  |"SECOND"  |
-------------------------------------------
|12:32:02  |12      |32        |2         |
|12:53:23  |12      |53        |23        |
|12:39:44  |12      |39        |44        |
|12:50:54  |12      |50        |54        |
|12:08:00  |12      |8         |0         |
|12:54:38  |12      |54        |38        |
|12:43:22  |12      |43        |22        |
|12:55:58  |12      |55        |58        |
|12:44:44  |12      |44        |44        |
|19:15:06  |19      |15        |6         |
-------------------------------------------



In [96]:
time_df = time_PU_DF.union(time_DO_DF)

In [97]:
time_df = time_df.distinct()

In [98]:
time_df.count()

86400

In [14]:
time_df.show()

-------------------------------------------
|"TIME"    |"HOUR"  |"MINUTE"  |"SECOND"  |
-------------------------------------------
|12:32:02  |12      |32        |2         |
|12:53:23  |12      |53        |23        |
|12:18:19  |12      |18        |19        |
|13:02:52  |13      |2         |52        |
|12:17:45  |12      |17        |45        |
|12:14:28  |12      |14        |28        |
|12:50:22  |12      |50        |22        |
|12:27:36  |12      |27        |36        |
|12:14:58  |12      |14        |58        |
|12:58:22  |12      |58        |22        |
-------------------------------------------



In [15]:
time_df.count()

86400

In [99]:
timeID = session.range(1, 86500, 1).to_df("TimeID")
timeID = timeID.to_pandas()
time_df = time_df.to_pandas()

In [100]:
time_df = pd.concat([timeID,time_df],axis = 1)
time_df.head(2)

Unnamed: 0,TIMEID,"TO_TIME(""TPEP_PICKUP_DATETIME"")","HOUR(""TPEP_PICKUP_DATETIME"")","MINUTE(""TPEP_PICKUP_DATETIME"")","SECOND(""TPEP_PICKUP_DATETIME"")"
0,1,00:32:10,0.0,32.0,10.0
1,2,00:55:08,0.0,55.0,8.0


In [101]:
time_df = session.createDataFrame(time_df)

In [102]:
time_df.write.mode('append').save_as_table('time_dim')

In [103]:
session.sql('''
select * from time_Dim limit 2
''').show()

------------------------------------------------------
|"TIMEID"  |"TIME"    |"HOUR"  |"MINUTE"  |"SECOND"  |
------------------------------------------------------
|1         |00:32:10  |0       |32        |10        |
|2         |00:55:08  |0       |55        |8         |
------------------------------------------------------



In [105]:
session.sql('''
create or replace table taxiTrip_fact(

taxitrip_ID int identity,
VendorID int,

Pickup_dateID int,
dropOff_dateID int,
Pickup_timeID int,
Dropoff_timeID int,

passenger_count int,
trip_distance float,
RateCodeID int,
Store_n_forwardID int,

pickup_locationID int,
dropoff_locationID int,

PaymentID int,
fare_amount float,
extra float,
mta_tax float,
tip_amount float,
tolls_amount float,
improvement_surcharge float,
total_amount float,
congestion_surcharge float,
airport_fee float,

foreign key (pickup_locationID) references Location_DIM(locationID),
foreign key (dropoff_locationID) references Location_DIM(locationID),
foreign key (VendorID) references Vendor_DIM(VendorID),
foreign key (Store_n_forwardID) references Store_n_Forward_DIM(FlagID),
foreign key (RateCodeID) references RateCode_DIM(RateCodeID),
foreign key (PaymentID) references Payment_DIM(PaymentID),
foreign key (Pickup_dateID) references Date_DIM(DateID),
foreign key (dropOff_dateID) references Date_DIM(DateID),
foreign key (Pickup_timeID) references Time_DIM(TimeID),
foreign key (Dropoff_timeID) references Time_DIM(TimeID),
primary key (taxitrip_ID)
);


''').show()

---------------------------------------------
|"status"                                   |
---------------------------------------------
|Table TAXITRIP_FACT successfully created.  |
---------------------------------------------



In [187]:
fact_table.show(3)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"VENDORID"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"FLOAT"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|0     |2           |1                  |0.97             |1               |161              |141              |9.3            |1.0      |0.5        |0.0           |0.0             |1.0                      |14.3     

In [203]:
time_key_table = session.sql('''
select TimeID,time 
from
time_dim
''')

In [204]:
time_key_table.show()

-----------------------
|"TIMEID"  |"TIME"    |
-----------------------
|1         |06:53:55  |
|2         |06:27:37  |
|3         |06:43:59  |
|4         |06:35:37  |
|5         |06:35:16  |
|6         |06:47:25  |
|7         |06:42:59  |
|8         |06:55:10  |
|9         |06:50:28  |
|10        |06:54:02  |
-----------------------



In [202]:
data.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [106]:
PU_fact_time = session.sql('''
select TimeID as PU_TimeID,* from
uber_data_raw inner join
time_dim
on time_dim.time = time(uber_data_raw.tpep_pickup_datetime)
''')

In [107]:
PU_fact_time.show(3)

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PU_TIMEID"  |"ID"     |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |"TIMEID"  |"TIME"    |"HOUR"  |"MINUTE"  |"SECOND"  |
------------------------------------------------------------------------------------------------------------------------------------------

In [108]:
DO_fact_time = session.sql('''
select TimeID as DO_TimeID,ID 
from
uber_data_raw inner join
time_dim
on time_dim.time = time(uber_data_raw.tpep_dropoff_datetime)
''')


In [255]:
DO_fact_time.show(3)

-------------------------
|"DO_TIMEID"  |"ID"     |
-------------------------
|9175         |2654208  |
|9079         |2654209  |
|29986        |2654210  |
-------------------------



In [109]:
factTime = PU_fact_time.natural_join(DO_fact_time)

In [110]:
factTime.show(2)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"     |"PU_TIMEID"  |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |"TIMEID"  |"TIME"    |"HOUR"  |"MINUTE"  |"SECOND"  |"DO_TIMEID"  |
--------------------------------------------------------------------------------------------------------------

In [111]:
PU_fact_time.count()

3066766

In [112]:
factTime.count()

3066766

In [113]:
factTime = factTime.drop(["TimeID","Time","hour","minute","second"])
factTime.show(2)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"     |"PU_TIMEID"  |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |"DO_TIMEID"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [115]:
PU_fact_date = session.sql('''
select DateID as PU_DateID,* from
uber_data_raw inner join
date_dim
on date_dim.date = date(uber_data_raw.tpep_pickup_datetime)
''')

In [264]:
PU_fact_date.show(3)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PU_DATEID"  |"ID"     |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |"DATEID"  |"DATE"      |"DAY"  |"MONTH"  |"YEAR"  |
----------------------------------------------------------------------------------------------------------------------------------------------

In [116]:
DO_fact_date = session.sql('''
select dateID as DO_dateID,ID 
from
uber_data_raw inner join
date_dim
on date_dim.date = date(uber_data_raw.tpep_dropoff_datetime)
''')
DO_fact_date.show(3)

-------------------------
|"DO_DATEID"  |"ID"     |
-------------------------
|15           |1769472  |
|15           |1769473  |
|15           |1769474  |
-------------------------



In [117]:
factdate= PU_fact_date.natural_join(DO_fact_date)

In [118]:
factdate.count()

3066766

In [119]:
fact = factdate.natural_join(factTime)

In [120]:
fact.count()

2995023

In [331]:
fact.show(2)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"     |"VENDORID"  |"TPEP_PICKUP_DATETIME"  |"TPEP_DROPOFF_DATETIME"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"STORE_N_FORWARD_FLAG"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |"PU_DATEID"  |"DATEID"  |"DATE"      |"DAY"  |"MONTH"  |"YEAR"  |"DO_DATEID"  |"PU_TIMEID"  |"DO_TIMEID"  |
----------------------------------------------------------

In [121]:
fact = fact.drop(["tpep_pickup_datetime","tpep_dropoff_datetime","dateId","date","day","month","year"])

In [346]:
fact.show(2)

In [122]:
strfwd = session.sql('''
select * 
from
store_n_forward_dim
''')

In [125]:
strfwd.show()

--------------------------------------------------------------------
|"FLAGID"  |"STORE_N_FORWARD_FLAG"  |"DESCRIPTION"                 |
--------------------------------------------------------------------
|1         |Y                       |store and forward trip        |
|2         |N                       |not a store and forward trip  |
--------------------------------------------------------------------



In [123]:
fact = fact.natural_join(strfwd,how="left")

In [124]:
fact.count()

2995023

In [338]:
fact.show(2)

In [126]:
fact = fact.drop("Store_n_forward_flag")

In [127]:
fact_attr = fact.select_expr("ID",
                        "VendorID",
                        "PU_dateID",
                        "Do_dateID",
                        "PU_timeID",
                        "Do_timeID",
                        "passenger_count",
                        "trip_distance",
                        "Rate_Code_ID",
                        "FlagID",
                        "pU_locationID",
                        "do_locationID",
                        "PaymenttypeID",
                        "fare_amount",
                        "extra",
                        "mta_tax",
                        "tip_amount",
                        "tolls_amount",
                        "improvement_surcharge",
                        "total_amount",
                        "congestion_surcharge",
                        "airport_fee"
)

In [341]:
fact_attr.show(2)

In [131]:
fact_attr.write.mode('overwrite').save_as_table('taxiTrip_fact')

In [132]:
session.sql('''
select * from taxiTrip_fact limit 3
''').show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"     |"VENDORID"  |"PU_DATEID"  |"DO_DATEID"  |"PU_TIMEID"  |"DO_TIMEID"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"RATE_CODE_ID"  |"FLAGID"  |"PU_LOCATIONID"  |"DO_LOCATIONID"  |"PAYMENTTYPEID"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"AIRPORT_FEE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [133]:
session.close()