In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

# Initialize SparkSession
spark = SparkSession.builder.appName("Craigslist_Vehicles_Analysis").getOrCreate()

In [35]:
# Assuming the data is present in the 'data/craigslist_vehicles.csv' file
data_path = "data/craigslist_vehicles.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

                                                                                

In [36]:
data.show(2)

+------+----------+--------------------+-------+--------------------+-----+------+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+--------------------+--------------------+------+-----+---------+----------+--------------------+--------------------+
|   _c0|        id|                 url| region|          region_url|price|  year|manufacturer|model|condition|cylinders|fuel|odometer|title_status|transmission| VIN|drive|size|type|paint_color|           image_url|         description|county|state|      lat|      long|        posting_date|        removal_date|
+------+----------+--------------------+-------+--------------------+-----+------+------------+-----+---------+---------+----+--------+------------+------------+----+-----+----+----+-----------+--------------------+--------------------+------+-----+---------+----------+--------------------+--------------------+
|362773|7307679724|https://abilene.c...|abilene|https://abile

23/08/06 14:17:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , id, url, region, region_url, price, year, manufacturer, model, condition, cylinders, fuel, odometer, title_status, transmission, VIN, drive, size, type, paint_color, image_url, description, county, state, lat, long, posting_date, removal_date
 Schema: _c0, id, url, region, region_url, price, year, manufacturer, model, condition, cylinders, fuel, odometer, title_status, transmission, VIN, drive, size, type, paint_color, image_url, description, county, state, lat, long, posting_date, removal_date
Expected: _c0 but found: 
CSV file: file:///home/grayhat/Desktop/int-athena/data/craigslist_vehicles.csv


In [37]:
data.columns

['_c0',
 'id',
 'url',
 'region',
 'region_url',
 'price',
 'year',
 'manufacturer',
 'model',
 'condition',
 'cylinders',
 'fuel',
 'odometer',
 'title_status',
 'transmission',
 'VIN',
 'drive',
 'size',
 'type',
 'paint_color',
 'image_url',
 'description',
 'county',
 'state',
 'lat',
 'long',
 'posting_date',
 'removal_date']

In [38]:
columns_to_drop = ['Unnamed: 0', 'url', 'region_url', 'VIN', 'image_url', 'description', 'county', 'lat', 'long', 'removal_date']

# Filter the columns to drop only those that exist in the DataFrame
columns_to_drop_existing = [col for col in columns_to_drop if col in data.columns]

# Drop the existing columns
data = data.drop(*columns_to_drop_existing)


In [39]:
data = data.withColumn('posting_date', col('posting_date').cast(TimestampType()).alias('posting_date'))


In [40]:
from pyspark.sql import functions as F

def handle_missing_values(data):
    # Fill missing numerical values with mean
    numerical_columns = ['year', 'odometer']
    for column in numerical_columns:
        mean_value = data.selectExpr(f'avg({column})').collect()[0][0]
        data = data.withColumn(column, F.when(F.col(column).isNull(), mean_value).otherwise(F.col(column)))

    # Fill missing categorical values with mode
    categorical_columns = ['manufacturer', 'model', 'condition', 'cylinders', 'fuel', 'title_status',
                           'transmission', 'drive', 'size', 'type', 'paint_color', 'posting_date']
    for column in categorical_columns:
        mode_value = data.groupBy(column).count().orderBy(F.col('count').desc()).first()[column]
        data = data.withColumn(column, F.when(F.col(column).isNull(), mode_value).otherwise(F.col(column)))

    return data

data = handle_missing_values(data)

                                                                                

### **Spark will save the cleaned data in partations which we can easily use creating Time series Models.** 

- Since i  the modeling in the "Pandas Explore the Data and Build the Model - Complete.ipynb" i will stop this.   