Introduction: Let's generate some random and noisy data about a table of a car sales company

In [1]:
import random
import csv
import numpy as np

def generate_car_data(n_rows):
    """Generates a CSV file with random car sales data.

    Args:
        n_rows: Number of rows to generate.
    """

    with open('car_sales.csv', 'w', newline='') as csvfile:
        fieldnames = ['id', 'model', 'year', 'mileage', 'price', 'color']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()

        for i in range(n_rows):
            row = {}
            row['id'] = i + 1
            row['model'] = random.choice(['Camry', 'Civic', 'F-150', 'Silverado', 'Altima', 'Sierra', 'Model Y', 'RAV4', 'Wrangler'])
            row['year'] = random.randint(2010, 2024) + np.random.choice([-10, 0, 10], p=[0.05, 0.85, 0.1])  # Introduce outliers in year
            row['mileage'] = random.randint(10000, 100000) + np.random.choice([-50000, 0, 50000], p=[0.05, 0.85, 0.1])  # Introduce outliers in mileage
            row['price'] = random.randint(10000, 50000)
            row['price'] = None if random.random() < 0.05 else row['price']  # Introduce null values in price
            row['mileage'] = None if random.random() < 0.05 else row['mileage']  # Introduce null values in mileage
            row['color'] = random.choice(['Red', 'Blue', 'White', 'Black', 'Silver'])
            writer.writerow(row)

In [2]:
n_rows = 1000
generate_car_data(n_rows)

# Data Cleaning


Let's create the spark session and load the data previuosly created.

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("DataCleaning").getOrCreate()


24/08/27 18:05:28 WARN Utils: Your hostname, air.local resolves to a loopback address: 127.0.0.1; using 192.168.5.143 instead (on interface en0)
24/08/27 18:05:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 18:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv("car_sales.csv", header=True, inferSchema=True)

                                                                                

## Missing Values

Let's find the number of missing values in each column

In [7]:
missing_values_dict = {col: df.filter(df[col].isNull()).count() for col in df.columns}
print("Number of missing values:", missing_values_dict)

Number of missing values: {'id': 0, 'model': 0, 'year': 0, 'mileage': 52, 'price': 40, 'color': 0}


Depending on the use case we have different strategies to follow.

### Drop rows

In [8]:
df_dropped = df.dropna()
print(df_dropped.count()) # Count the final number of rows after removing the nulls

912


### Impute the mileage with zero


In [11]:
df_zero_imp = df.fillna(0)

print("number of missing values in mileage column", df_zero_imp.filter(df_zero_imp['mileage'].isNull()).count() )

number of missing values in mileage column 0


### Mean imputation. 
Let's impute the price with the mean value

In [14]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["price"], outputCols=["imputed_price"], strategy="mean")
df_mean_imputed = imputer.fit(df).transform(df)


In [16]:
missing_values_mean_imp = {col: df_mean_imputed.filter(df_mean_imputed[col].isNull()).count() for col in df_mean_imputed.columns}
print("Number of missing values after mean imputation:", missing_values_mean_imp)

Number of missing values after mean imputation: {'id': 0, 'model': 0, 'year': 0, 'mileage': 52, 'price': 40, 'color': 0, 'imputed_price': 0}


### Mode imputation. 
Let's impute the mileage with the mode (no reason to do it other than the exercise purpose)

In [18]:
imputer = Imputer(inputCols=["mileage"], outputCols=["imputed_mileage"], strategy="mode")
df_mode_imputed = imputer.fit(df).transform(df)

missing_values_mode_imp = {col: df_mode_imputed.filter(df_mode_imputed[col].isNull()).count() for col in df_mode_imputed.columns}
print("Number of missing values after mode imputation:", missing_values_mode_imp)

Number of missing values after mode imputation: {'id': 0, 'model': 0, 'year': 0, 'mileage': 52, 'price': 40, 'color': 0, 'imputed_mileage': 0}


Other method can be interpolation. Or impute based on grouping conditions, we will review this later

## Outliers

### Identify Outliers: Z Score Method

In [22]:
from pyspark.sql.functions import stddev_pop, mean

z_score = (col("price") - mean(col("price"))) / stddev_pop(col("price"))
print(z_score)
#df_outliers_z = df.filter(abs(z_score) > 3)

#df_cleaned_z = df.subtract(df_outliers_z)

#print("number of rows after z score removal", df_cleaned_z.count())

Column<'((price - avg(price)) / stddev_pop(price))'>


## Replace Values