### Import Libs :

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, when, hour, dayofweek, month

### Create the spark Session:

In [54]:
spark = SparkSession.builder.appName("App").getOrCreate()

### Read Data :

In [55]:
def ReadDataset(file_path):
    df = spark.read.parquet(file_path)
    if df is None:
        raise Exception("Failed to read dataset")
    else :
        return df

file_path = 'data/Bronze/dataset.parquet'
df = ReadDataset(file_path)
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

### Show Data :

In [56]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

### Display Type and names of columns :

In [57]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



### Get Number of columns :

In [58]:
num_rows = df.count()
print(num_rows)

3475226


### Check for null values :

In [59]:
def CheckNull(Data):
    num_rows = Data.count()       
    Columns_list = Data.columns

    for c in Columns_list:
        num_null = Data.filter(col(c).isNull()).count()
        if num_null > 0:
            null_percent = (num_null / num_rows) * 100
            print(f"Column {c} has {num_null} null values ({null_percent:.2f}%)")
            
            if null_percent < 5:
                Data = Data.na.drop(subset=[c])
            else:
                try:
                    mean_value = Data.select(mean(c)).collect()[0][0]
                    Data = Data.fillna({c: mean_value})
                except:
                    mode_value = Data.groupBy(c).count().orderBy(col("count").desc()).first()[0]
                    Data = Data.fillna({c: mode_value})
    return df

df = CheckNull(df)
            

Column passenger_count has 540149 null values (15.54%)
Column RatecodeID has 540149 null values (15.54%)
Column store_and_fwd_flag has 540149 null values (15.54%)
Column congestion_surcharge has 540149 null values (15.54%)
Column Airport_fee has 540149 null values (15.54%)


### Check Duplicate values :

In [60]:
def CheckDuplicated(Data):
    num_rows = Data.count()
    num_rows_no_duplicate = Data.distinct().count()
    num_duplicate_values = num_rows - num_rows_no_duplicate
    if num_duplicate_values == 0:
        print("you don't have any duplicated values !!")
    else:
        Data = Data.distinct()
        return Data
    return Data

df = CheckDuplicated(df)
df.show()

26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
26/01/07 00:16:55 WARN RowBasedKeyValueBatch: Calling spill() on

you don't have any duplicated values !!
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|       

                                                                                

### Time Converting:

In [61]:
def Time_Cleaning(Data):
    Data = Data.withColumn("Durée", col("tpep_dropoff_datetime") - col("tpep_pickup_datetime"))
    Data = Data.withColumn("Durée_minutes", (col("Durée").cast("long")/60).cast("int"))

    Data = Data.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
    Data = Data.withColumn("pickup_day_week", dayofweek(col("tpep_pickup_datetime")))
    Data = Data.withColumn("pickup_month", month(col("tpep_pickup_datetime")))

    Data = Data.drop("Durée", "tpep_dropoff_datetime", "tpep_pickup_datetime")
    return Data

df = Time_Cleaning(df)
df.show()


+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------+-----------+---------------+------------+
|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|Durée_minutes|pickup_hour|pickup_day_week|pickup_month|
+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+-------------+-----------+---------------+------------+
|       1|              1|          1.6|         1|                 N|         229|         23

### Outlier trips filtering:

In [64]:
# Distance ≤ 0 ou > 200 miles | Durée ≤ 0 minutes | Passagers ≤ 0
from pyspark.sql.functions import col

def Outliers_filtering(Data):
    Data = Data.filter(
        (col("trip_distance") > 0) & 
        (col("trip_distance") <= 200) & 
        (col("passenger_count") > 0) & 
        (col("Durée_minutes") > 0)      
    )
    return Data

df = Outliers_filtering(df)
df.count()


                                                                                

2859779

### Save New Data Like a Silver Data in local And data base : 

In [None]:
# save it in local
def Silver_Local(Data):
    Data.write.mode('overwrite').format("parquet").save("/media/rachid/d70e3dc6-74e7-4c87-96bc-e4c3689c979a/lmobrmij/Projects/Smart-LogiTrack-Syst-me-Pr-dictif-de-Transport-Urbain-ETA-/data/Silver")


#### Check if data saved With Succe:

In [73]:
Silver_Local(df)

df_get = ReadDataset('data/Silver/')
df_get.count()

                                                                                

2859779

#### Save Data in db: