## Install Snowflake packages

In [None]:
!pip install snowflake
!pip install snowflake-snowpark-python



In [None]:
from snowflake.snowpark.session import Session
import sys
from snowflake.snowpark.functions import col,is_null, lit, sum , max ,mean ,count
# from pyspark.sql.functions import first, col, months_between, lit, sum, max


## Connection parameters

In [None]:
snowflake_connection_parameters = {
   "user":"BIGDATA",
    "account":"BNYTXUB-TH07367",
    "password":"Ffbuxv5.RsM.ZQm",
    "database":"BIG_DATA_PROJECT",
    "schema":"PUBLIC"
}
TABLE_NAME="TAXI_TRIPS"

In [None]:
snowpark_session = Session.builder.configs(snowflake_connection_parameters).create()


In [None]:
snowpark_session.sql("USE DATABASE BIG_DATA_PROJECT").show()

------------------------------------
|"status"                          |
------------------------------------
|Statement executed successfully.  |
------------------------------------



In [None]:
query=f""" SELECT * FROM  {TABLE_NAME}
ORDER BY DROPOFF_TIMESTAMP DESC
limit 10
"""
snowpark_session.sql(query).show()

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"VendorID"  |"tpep_pickup_datetime"  |"tpep_dropoff_datetime"  |"passenger_count"  |"trip_distance"  |"RatecodeID"  |"store_and_fwd_flag"  |"PULocationID"  |"DOLocationID"  |"payment_type"  |"fare_amount"  |"extra"  |"mta_tax"  |"tip_amount"  |"tolls_amount"  |"improvement_surcharge"  |"total_amount"  |"congestion_surcharge"  |"PICKUP_TIMESTAMP"   |"DROPOFF_TIMESTAMP"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
taxi_df = snowpark_session.table(TABLE_NAME)
print("Memory usage (bytes): ", sys.getsizeof(taxi_df))
numRows = taxi_df.count()

print(numRows)

Memory usage (bytes):  48
11916662


In [None]:
taxi_df.dtypes

[('"VendorID"', 'double'),
 ('"tpep_pickup_datetime"', 'bigint'),
 ('"tpep_dropoff_datetime"', 'bigint'),
 ('"passenger_count"', 'double'),
 ('"trip_distance"', 'double'),
 ('"RatecodeID"', 'double'),
 ('"store_and_fwd_flag"', 'string(16777216)'),
 ('"PULocationID"', 'bigint'),
 ('"DOLocationID"', 'bigint'),
 ('"payment_type"', 'double'),
 ('"fare_amount"', 'double'),
 ('"extra"', 'double'),
 ('"mta_tax"', 'double'),
 ('"tip_amount"', 'double'),
 ('"tolls_amount"', 'double'),
 ('"improvement_surcharge"', 'double'),
 ('"total_amount"', 'double'),
 ('"congestion_surcharge"', 'double'),
 ('PICKUP_TIMESTAMP', 'timestamp'),
 ('DROPOFF_TIMESTAMP', 'timestamp')]

## Cleaning Data

First remove from all column names '"' and make all upper case

In [None]:
column_names_mapper={}
for column in taxi_df.columns:
  column_names_mapper[column]=column.replace('"',"").upper()

taxi_df=taxi_df.rename(column_names_mapper)
taxi_df.columns

['VENDORID',
 'TPEP_PICKUP_DATETIME',
 'TPEP_DROPOFF_DATETIME',
 'PASSENGER_COUNT',
 'TRIP_DISTANCE',
 'RATECODEID',
 'STORE_AND_FWD_FLAG',
 'PULOCATIONID',
 'DOLOCATIONID',
 'PAYMENT_TYPE',
 'FARE_AMOUNT',
 'EXTRA',
 'MTA_TAX',
 'TIP_AMOUNT',
 'TOLLS_AMOUNT',
 'IMPROVEMENT_SURCHARGE',
 'TOTAL_AMOUNT',
 'CONGESTION_SURCHARGE',
 'PICKUP_TIMESTAMP',
 'DROPOFF_TIMESTAMP']

get how many null values exist with columns

In [None]:
from snowflake.snowpark.functions import col,is_null, lit, sum , max ,mean
taxi_df.select(sum(is_null('VENDORID').cast('int'))).toPandas().iloc[0].values[0]


0

In [None]:
column_null_values={}

for column in taxi_df.columns:
  number_of_nans=taxi_df.select(sum(is_null('VENDORID').cast('int'))).toPandas().iloc[0].values[0]
  percent_of_nans=taxi_df.select(mean(is_null('VENDORID').cast('int'))).toPandas().iloc[0].values[0]
  column_null_values[column]=[number_of_nans,str(percent_of_nans*100)+'%']

column_null_values

{'VENDORID': [91448, '0.767400%'],
 'TPEP_PICKUP_DATETIME': [91448, '0.767400%'],
 'TPEP_DROPOFF_DATETIME': [91448, '0.767400%'],
 'PASSENGER_COUNT': [91448, '0.767400%'],
 'TRIP_DISTANCE': [91448, '0.767400%'],
 'RATECODEID': [91448, '0.767400%'],
 'STORE_AND_FWD_FLAG': [91448, '0.767400%'],
 'PULOCATIONID': [91448, '0.767400%'],
 'DOLOCATIONID': [91448, '0.767400%'],
 'PAYMENT_TYPE': [91448, '0.767400%'],
 'FARE_AMOUNT': [91448, '0.767400%'],
 'EXTRA': [91448, '0.767400%'],
 'MTA_TAX': [91448, '0.767400%'],
 'TIP_AMOUNT': [91448, '0.767400%'],
 'TOLLS_AMOUNT': [91448, '0.767400%'],
 'IMPROVEMENT_SURCHARGE': [91448, '0.767400%'],
 'TOTAL_AMOUNT': [91448, '0.767400%'],
 'CONGESTION_SURCHARGE': [91448, '0.767400%'],
 'PICKUP_TIMESTAMP': [91448, '0.767400%'],
 'DROPOFF_TIMESTAMP': [91448, '0.767400%']}

It appears we have more than 90K+ rows all nans

In [None]:
taxi_df=taxi_df.dropna()

In [None]:
taxi_df.count()


11825214

In [None]:
def describe_columns(column_name):
    taxi_df.select(column_name).describe().show()

numercial_columns = ["PASSENGER_COUNT", "TRIP_DISTANCE", "FARE_AMOUNT", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "IMPROVEMENT_SURCHARGE", "TOLLS_AMOUNT", "MTA_TAX", "TIP_AMOUNT", "EXTRA"]
def print_numerical_describe(columns) :
    for column in columns:
        describe_columns(column)

In [None]:
print_numerical_describe(numercial_columns)

----------------------------------
|"SUMMARY"  |"PASSENGER_COUNT"   |
----------------------------------
|stddev     |1.1489607858471607  |
|min        |0.0                 |
|mean       |1.5104327076025854  |
|count      |11825214.0          |
|max        |9.0                 |
----------------------------------

----------------------------------
|"SUMMARY"  |"TRIP_DISTANCE"     |
----------------------------------
|min        |-22.18              |
|stddev     |3.710358985303801   |
|mean       |2.8242698449262735  |
|count      |11825214.0          |
|max        |369.94              |
----------------------------------

----------------------------------
|"SUMMARY"  |"FARE_AMOUNT"       |
----------------------------------
|min        |-1238.0             |
|mean       |12.482792057716672  |
|stddev     |11.76847690985343   |
|max        |4265.0              |
|count      |11825214.0          |
----------------------------------

----------------------------------
|"SUMMARY"  |"TOT

In [None]:
def count_zero(column):
    return taxi_df.filter(taxi_df[column] < 0).count()

non_zero_columns = ["PASSENGER_COUNT", "TRIP_DISTANCE", "FARE_AMOUNT", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "IMPROVEMENT_SURCHARGE", "TOLLS_AMOUNT", "MTA_TAX", "TIP_AMOUNT", "EXTRA"]
def print_non_zero_count(columns) :
    for column in columns:
        print(column, ": ", count_zero(column))
print_non_zero_count(non_zero_columns)


PASSENGER_COUNT :  0
TRIP_DISTANCE :  13
FARE_AMOUNT :  36898
TOTAL_AMOUNT :  36897
CONGESTION_SURCHARGE :  29406
IMPROVEMENT_SURCHARGE :  36890
TOLLS_AMOUNT :  757
MTA_TAX :  36100
TIP_AMOUNT :  328
EXTRA :  17791


Drop negative values

In [None]:
def remove_zero(column):
    global taxi_df  # Use global keyword to modify global variable
    taxi_df = taxi_df.filter(taxi_df[column] >= 0)

non_zero_columns = ["PASSENGER_COUNT", "TRIP_DISTANCE", "FARE_AMOUNT", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "IMPROVEMENT_SURCHARGE", "TOLLS_AMOUNT", "MTA_TAX", "TIP_AMOUNT", "EXTRA"]
def non_zero(columns):
    for column in columns:
        remove_zero(column)

# Call the non_zero function
non_zero(non_zero_columns)


In [None]:
taxi_df.count()

11788297

In [None]:
print_non_zero_count(non_zero_columns)


PASSENGER_COUNT :  0
TRIP_DISTANCE :  0
FARE_AMOUNT :  0
TOTAL_AMOUNT :  0
CONGESTION_SURCHARGE :  0
IMPROVEMENT_SURCHARGE :  0
TOLLS_AMOUNT :  0
MTA_TAX :  0
TIP_AMOUNT :  0
EXTRA :  0


In [None]:
taxi_df.toPandas().columns

1.0

In [None]:
def remove_outliers_iqr(df, column):
    Q1,Q3 = taxi_df.stat.approx_quantile(column,[0.25,0.75])
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return df.filter((df[column] >= lower_bound) & (df[column] <= upper_bound))

numerical_columns = ['PASSENGER_COUNT','TIP_AMOUNT','TOLLS_AMOUNT']
for column in numerical_columns:
    taxi_df = remove_outliers_iqr(taxi_df, column)

taxi_df.count()

10191114

Removed around 1.6M rows of data due to outliers

In [None]:
#remove trips with zero distance
taxi_df=taxi_df.filter((taxi_df["TRIP_DISTANCE"] > 0))
taxi_df.count()


10106061

Removed around 100K of rows due to misleading trip distance

Dropping extra columns we dont't need

In [None]:
taxi_df=taxi_df.drop("TPEP_PICKUP_DATETIME","TPEP_DROPOFF_DATETIME","RATECODEID","STORE_AND_FWD_FLAG")

Converting payment type to string as it should represent a category not a numverical value

In [None]:
col_casted_to_string=['PAYMENT_TYPE']
final_columnes_to_keep=[]
for column in taxi_df.columns:
  if column not in col_casted_to_string:
    final_columnes_to_keep.append(col(column))
  else:
    final_columnes_to_keep.append(col(column).cast("string(5)").as_(column))

taxi_df=taxi_df.select(final_columnes_to_keep)
taxi_df.dtypes

[('VENDORID', 'double'),
 ('PASSENGER_COUNT', 'double'),
 ('TRIP_DISTANCE', 'double'),
 ('PULOCATIONID', 'bigint'),
 ('DOLOCATIONID', 'bigint'),
 ('PAYMENT_TYPE', 'string(5)'),
 ('FARE_AMOUNT', 'double'),
 ('EXTRA', 'double'),
 ('MTA_TAX', 'double'),
 ('TIP_AMOUNT', 'double'),
 ('TOLLS_AMOUNT', 'double'),
 ('IMPROVEMENT_SURCHARGE', 'double'),
 ('TOTAL_AMOUNT', 'double'),
 ('CONGESTION_SURCHARGE', 'double'),
 ('PICKUP_TIMESTAMP', 'timestamp'),
 ('DROPOFF_TIMESTAMP', 'timestamp')]

Saving cleaned verison of the data

In [None]:
taxi_df.write .mode("overwrite").save_as_table("TAXI_TRIPS_CLEANED") ## write the dataframe into a table

In [None]:
query = """
SELECT * FROM TAXI_TRIPS_CLEANED
limit 10
"""

snowpark_session.sql(query).show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"VENDORID"  |"PASSENGER_COUNT"  |"TRIP_DISTANCE"  |"PULOCATIONID"  |"DOLOCATIONID"  |"PAYMENT_TYPE"  |"FARE_AMOUNT"  |"EXTRA"  |"MTA_TAX"  |"TIP_AMOUNT"  |"TOLLS_AMOUNT"  |"IMPROVEMENT_SURCHARGE"  |"TOTAL_AMOUNT"  |"CONGESTION_SURCHARGE"  |"PICKUP_TIMESTAMP"   |"DROPOFF_TIMESTAMP"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1.0         |1.0                |0.9              |148             |79              |1               |6.0            |3.0      |0.5        |1

In [None]:
snowpark_session.close()