In [None]:
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession.builder.appName("first_paquet").config("spark.jars.packages", "org.postgresql:postgresql:42.7.3").getOrCreate()
df = spark.read.parquet("../data/dataset.parquet")
df.printSchema()
df.show(5)


In [None]:
#spark.stop()

In [None]:
df.show(5)


In [None]:
print(df.count())
len(df.columns)

In [None]:
type(df)

# Count the number of null values in each columns

In [None]:
from pyspark.sql.functions import col, sum
df.select([sum(col(c).isNull().cast("int")).alias(c + "_nulles") for c in df.columns]).count()



# check for outliers

In [None]:
import pyspark as ps
# pdf['fare_amount'].plot.box()


# Deal with null values

In [None]:
df.na.drop().count()

In [None]:
df.count()

# Create dure_trajet Column

In [None]:
type(df)

In [None]:
import pyspark.sql.functions as sf
df = df.withColumn("dure_trajet", (sf.unix_timestamp(sf.col("tpep_dropoff_datetime"))- sf.unix_timestamp(sf.col("tpep_pickup_datetime")))/60)

In [None]:
df.show(2)

In [None]:
df.describe().show(2)

# Search columns  with the same value duplicates over the columns 

In [None]:
df.select("trip_distance").distinct().count()

# Select the Categorial Columns

In [None]:
categrial_df= df.select("dure_trajet","VendorID", "RatecodeID", "store_and_fwd_flag","payment_type","PULocationID","DOLocationID")
categrial_df.show(2)

# Analyse the relation between Categorial data and the target

In [None]:
# categrial_df.plot.countplot(x='VendorID', y='dure_trajet')

from pyspark.sql.functions import count
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg
categorical_cols = ["VendorID", "RatecodeID", "store_and_fwd_flag", "payment_type"]

for col in categorical_cols:
   agg = (
    categrial_df.groupBy(col).agg(avg("dure_trajet").alias("avg_duration"))
)
   pdf = agg.toPandas()    
   plt.figure(figsize= (25,3))
   sns.barplot(data=pdf, x=col, y="avg_duration")
   plt.title(f"Average trip duration by {col}")
   plt.show()


In [None]:
from pyspark.sql.functions import avg, desc

high_categorial_cols= ["PULocationID","DOLocationID"]
for col in high_categorial_cols:
   agg = (
    categrial_df.groupBy(col).agg(avg("dure_trajet").alias("avg_duration")).orderBy(desc("avg_duration")).limit(20)
    )
   pdf = agg.toPandas()    
   plt.figure(figsize= (25,3))
   sns.barplot(data=pdf, x=col, y="avg_duration")
   plt.title(f"Average trip duration by {col}")
   plt.show()


# Select the Numerical Features

In [None]:
numerical_df= df.select("dure_trajet","passenger_count", "trip_distance", "fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount","congestion_surcharge","Airport_fee","cbd_congestion_fee")
numerical_df.show(2)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
plt.figure(figsize=(10,7))
vector_col= "assembled_features"
assembler= VectorAssembler(inputCols= numerical_df.columns, outputCol=vector_col, handleInvalid="skip")
print(type(assembler))

df_vector= assembler.transform(numerical_df).select(vector_col)
# df_vector.show(2)
corr_matrix= Correlation.corr(df_vector, vector_col)
matrix_array = corr_matrix.collect()[0][0].toArray()
pd_matrix = pd.DataFrame(matrix_array, columns=numerical_df.columns, index=numerical_df.columns)
# print(matrix_array)
matrix_df= spark.createDataFrame(pd_matrix, numerical_df.columns)

sns.heatmap(pd_matrix, annot=True)




# Filter Data

In [None]:
# Filter data
from pyspark.sql.functions import col

df_filter= df.filter((col("trip_distance")> 0) & (col("trip_distance")<200 ) & ( col("dure_trajet") > 0) & (col("passenger_count") > 0))
df_filter.show(2)
print(df_filter.count())

# Look for outliers

I decided to delete thoese columns: 

**iqr method**

In [None]:
from pyspark.sql.functions import col, when, dayofweek, hour, month, unix_timestamp
from pyspark.sql.functions import sin, cos, col, lit
import math
def data_cleaning(df):
      cleaned_df= df.na.drop()
      cleaned_df = cleaned_df.withColumn("dure_trajet", (sf.unix_timestamp(sf.col("tpep_dropoff_datetime"))- sf.unix_timestamp(sf.col("tpep_pickup_datetime")))/60)
      cleaned_df= cleaned_df.filter((col("trip_distance")> 0) & (col("trip_distance")<200 ) & ( col("dure_trajet") > 0) & (col("passenger_count") > 0))
      #drop unicessary columns
      cleaned_df =cleaned_df.drop("PULocationID","fare_amount","mta_tax", "total_amount","cbd_congestion_fee","store_and_fwd_flag", "extra","improvement_surcharge")
      
      
      # cleaned_df = df_filter.withColumn("tpep_dropoff_datetime", (sf.unix_timestamp(sf.col("tpep_dropoff_datetime"))))

      
      #cleaned_df = cleaned_df.withColumn("tpep_pickup_datetime", (sf.unix_timestamp(sf.col("tpep_pickup_datetime"))))
      
      #add date columns

      cleaned_df= cleaned_df.withColumn("dropoff_hour", hour("tpep_dropoff_datetime"))\
                 .withColumn("dropoff_dayofweek", dayofweek("tpep_dropoff_datetime")) \
                 .withColumn("dropoff_month", month("tpep_dropoff_datetime")) \
                 .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
                 .withColumn("pickup_dayofweek", dayofweek("tpep_pickup_datetime")) \
                 .withColumn("pickup_month", month("tpep_pickup_datetime"))
      cleaned_df=cleaned_df.drop("tpep_dropoff_datetime","tpep_pickup_datetime")
      #am not goign to unclude passenger_count because it have limited values in outliers handling
      numerical_df_cols= cleaned_df.select("dure_trajet","trip_distance","tip_amount","tolls_amount","congestion_surcharge","Airport_fee")

      #handle Outliers
      for feature in numerical_df_cols.columns:
         quartilles = numerical_df_cols.approxQuantile(feature, [0.25,0.50, 0.75],0) #0 err

         iqr = quartilles[2] - quartilles[0]
         uper_bound= quartilles[2] + 1.5 * iqr
         lower_bound= quartilles[0] - 1.5 * iqr

         cleaned_df= cleaned_df.withColumn( 
              feature,
              when(col(feature)>uper_bound , uper_bound) 
              .when(col(feature) < lower_bound , lower_bound)
              .otherwise(col(feature))
            
         )
         
         print(feature, quartilles)
      return cleaned_df
      


In [None]:
df_clean = data_cleaning(df)


**encode tmestamp features**

In [None]:
#df_clean = add_cyclical_time_features(df_clean)
#df_clean.show(2)

In [None]:
for c in df_clean.columns:
    val = df_clean.filter(col(c).isNull()).count()
    print(f'{c} : {val} nulls')

# download data to postgreSQL

In [None]:
print(spark.sparkContext._conf.get("spark.jars.packages"))


In [None]:
df.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://localhost:5432/taxi_eta") \
  .option("dbtable", "taxis") \
  .option("user", "postgres") \
  .option("password", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save() 

In [None]:
def save_data(df):
    df.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://localhost:5432/taxi_eta") \
  .option("dbtable", "taxis") \
  .option("user", "postgres") \
  .option("password", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .mode("overwrite") \
  .save()


# Split Data

In [None]:

def split_data(df_clean):
  train, test= df_clean.randomSplit([0.7, 0.3])
  return train, test
train, test = split_data(df_clean)

# Training

In [None]:
train.show(1)

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
def training_rf(train, test):
  #set spark logs
  # spark.sparkContext.setLogLevel("INFO")
  cols = ["VendorID","passenger_count","trip_distance","RatecodeID","DOLocationID","payment_type","tip_amount","tolls_amount","congestion_surcharge","Airport_fee","dure_trajet","dropoff_hour","dropoff_dayofweek","dropoff_month","pickup_hour","pickup_dayofweek","pickup_month"]
  assembler = VectorAssembler(
    inputCols=cols,
    outputCol="features"
  )

  train_vec = assembler.transform(train)
  test_vec  = assembler.transform(test)
  rf= RandomForestRegressor(featuresCol= "features", labelCol="dure_trajet", predictionCol="prediction_dure",  numTrees=10,  maxDepth=6)
  model=rf.fit(train_vec)
  predictions= model.transform(test_vec)
  evaluator= RegressionEvaluator(labelCol="dure_trajet", predictionCol="prediction_dure", metricName="rmse")
  print(f"remse:  {evaluator.evaluate(predictions)}" )

  evaluator= RegressionEvaluator(labelCol="dure_trajet", predictionCol="prediction_dure", metricName="r2")
  print(f"re : {evaluator.evaluate(predictions)}")

  return model
training_rf(train, test)