# $$Preprocessing\ part :$$

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, when, isnan, isnull, count

#### Initialize spark session :

In [2]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Car Price Prediction") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

___________________

## Utils functions :

#### Load the csv : 

In [3]:
# Read CSV file
def load_data(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

#### Transform data to pyspark :

In [4]:
def transform_spark(df):
    for column, dtype in df.dtypes:
        if dtype == 'string':
            df = df.withColumn(
                column,
                when(
                    col(column).rlike(r'(?i)\\"nan\\"'),
                    None
                ).otherwise(col(column))
            )
    return df

#### Drop useless columns : 

In [5]:
def drop_useless_column(df, columns):
    if isinstance(columns, str):
        columns = [columns]
    return df.drop(*columns)

#### Save data as csv : 

In [6]:
def save_as_csv(spark_df, file_path="output.csv"):
    """
    Converts a PySpark DataFrame to a Pandas DataFrame and saves it as a CSV file.
    """
    pandas_df = spark_df.toPandas()
    pandas_df.to_csv(file_path, index=False)

#### Handle missing values :

In [7]:
def handle_missing(df) :
    # Drop all the missing values :
    return df.dropna()

#### To split equipments types :

In [8]:
def split_equipment(df, equipment_col="equipment"):
    for eq in equipment_types:
        # Generate a clean column name
        new_col = eq.lower().replace(" ", "_").replace("/", "_").replace("-", "_")
        # Add column with True/False if the equipment exists
        df = df.withColumn(
                new_col,
                when(lower(col(equipment_col)).contains(eq.lower()), lit(True)).otherwise(lit(False))
            )
    return df

### To label ecode categorical columns :

In [9]:
def inplace_label_encode(df, columns):
    for col in columns:
        indexer = StringIndexer(inputCol=col, outputCol=col + "_idx")
        df = indexer.fit(df).transform(df).drop(col).withColumnRenamed(col + "_idx", col)
    return df

______________

## Prepare Data for preprocessing :

In [10]:
# To prepare the data for preprocessing :
def prepare_data(file_path ) :
    raw_data = load_data(file_path)
    data = transform_spark(raw_data)
    useless = ['creator' , 'source' , 'image_folder' , 'id' , 'title']
    for col in useless : 
        data = drop_useless_column(data , col)
    return data 

______________

## Preprocess the data :

In [11]:
def features_ingeneering(df):

    # Drop all missing values : 
    data_cleaned = handle_missing(df)
 
  

    # Split publication_date column : 
    data_cleaned = data_cleaned.withColumn("publication_date", F.to_timestamp("publication_date", "dd/MM/yyyy HH:mm"))
    data_features = data_cleaned.withColumn("publication_Year", F.year("publication_date")) \
       .withColumn("publication_Month", F.month("publication_date")) \
       .withColumn("publication_Day", F.dayofmonth("publication_date")) \
       .withColumn("publication_Weekday", F.dayofweek("publication_date")) \
       .withColumn("Is_Weekend", (F.dayofweek("publication_date") >= 6).cast(IntegerType())) \
       .withColumn("Days_since_posted", F.datediff(F.current_date(), "publication_date"))
    # Drop publication_date:
    data_features = drop_useless_column(data_features, "publication_date")






    
    ## Split equipment column :
    equipment_types = [
    "Jantes aluminium", "Airbags", "Climatisation", "navigation_system",
    "Toit ouvrant", "Sièges cuir", "Radar de recul", "Caméra de recul",
    "Vitres électriques", "ABS", "ESP", "Régulateur de vitesse", 
    "Limiteur de vitesse", "CD/MP3/Bluetooth", "Ordinateur de bord", "Verrouillage centralisé"
    ]
    data_equipment = split_equipment(data_features)
    # Drop equipment column :
    data_equipment = drop_useless_column(data_equipment , "equipment")




    
    # Change columns to english : 
    column_mapping = {
    "jantes_aluminium": "Alloy_wheels",
    "airbags": "Airbags",
    "climatisation": "Air_conditioning",
    "navigation_system": "Navigation_system",
    "toit_ouvrant": "Sunroof",
    "sièges_cuir": "Leather_seats",
    "radar_de_recul": "Parking_sensors",
    "caméra_de_recul": "Rear_camera",
    "vitres_électriques": "Electric_windows",
    "abs": "ABS",
    "esp": "ESP",
    "régulateur_de_vitesse": "Cruise_control",
    "limiteur_de_vitesse": "Speed_limiter",
    "cd_mp3_bluetooth": "CD/MP3/Bluetooth",
    "ordinateur_de_bord": "On_board_computer",
    "verrouillage_centralisé": "Central_locking"
    }

    for old_name, new_name in column_mapping.items():
        data_equipment = data_equipment.withColumnRenamed(old_name, new_name)

    save_as_csv(data_equipment , "data_after_feature_inginering.csv")




    
    return data_equipment 



## Variable encoding :

In [12]:

def variable_encoding(df):
    
    bool_cols = ["Alloy_wheels", "Airbags", "Air_conditioning", "Navigation_system", "Sunroof", 
                 "Leather_seats", "Parking_sensors", "Rear_camera", "Electric_windows", "ABS",
                 "ESP", "Cruise_control", "Speed_limiter", "CD/MP3/Bluetooth", "On_board_computer", 
                 "Central_locking" ]

    for col_name in bool_cols:
        dt = dt.withColumn(col_name, col(col_name).cast(IntegerType()))
    

    
    # Cast numeric columns to correct type
    numeric_cols = ["door_count", "fiscal_power", "mileage", "price", "year", 
                       "publication_Year", "publication_Month", "publication_Day",
                       "Days_since_posted"]
    
    for col_name in numeric_cols:
        dt = dt.withColumn(col_name, col(col_name).cast("double"))



    # Categorical columns : 
    categorical_cols = ["brand", "condition", "fuel_type", "model", "origin", "first_owner",
                        "sector", "seller_city", "transmission"]
    
    dt = inplace_label_encode(dt, categorical_cols)

    save_as_csv(df , "data_after_variable_encoding.csv")

    return dt 

__________

## Main :

In [13]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, when, isnan, isnull, count, lower, lit
from pyspark.sql.types import IntegerType

In [None]:
def main() :
    '''
    Main function to begin data preprocessing pipeline 
    '''
    input_file_path = "raw.csv"     # Raw data 
    output_file_path = "preprocessed_data.csv" 

    # Start preprocessing pipeline : 
    print("Starting Data preprocessing pipeline....")


    # Step 1 : prepare the data for preprocessing : 
    print("Step 1 : Loading and preparing the data ....")
    data = prepare_data(input_file_path)


    # Step 2 : Features ingeneering :
    print("Step 2 : Performing features ingeniering ....")
    data_features = features_ingeneering(data)


    # Step 3 : Encoding variables 
    print("Step 3 : encoding categorical and binary column....")
    data_encoded = variable_encoding(data_features)

    # Step 4 : Saving preprocessed data 
    print("Final Step : Save the preprocessed data to ", output_file_path , "....")
    save_as_csv(data_encoded , output_file_path)


    print("Congratulations , preprocessing is all completed now !")


    return data_encoded
