In [None]:
!pip install pyspark

In [None]:
import os
memory = '20g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
pyspark_submit_args = ' --executor-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [None]:
import pyspark
from pyspark.sql import SparkSession

Creating a PySpark Session

In [None]:
# spark = SparkSession.builder.appName('CarSales').getOrCreate()
spark = SparkSession.builder \
    .config("spark.executor.instances", "4") \
    .appName("CarSales") \
    .getOrCreate()


In [None]:
spark

In [None]:
# read files from hdfs
rus25 = spark.read.csv("hdfs://localhost:9000/user/russia/region25_en.csv", inferSchema=True, header=True)
rus41 = spark.read.csv("hdfs://localhost:9000/user/russia/region41_en.csv", inferSchema=True, header=True)
usa = spark.read.csv("hdfs://localhost:9000/user/usa/us-dealers-used.csv", inferSchema=True, header=True)

In [None]:
!pip install pandas

In [None]:
import pandas as pd
import pyspark.sql as sparksql

Data Exploration

In [None]:
rus25.printSchema()

In [None]:
rus25.head(1)

In [None]:
rus25.select("engineDisplacement").distinct().show()

In [None]:
usa.head(1)

In [None]:
usa.select("trim").show(10)
usa.select("engine_size").distinct().show()

In [None]:
rus25.describe()
rus41.describe()
usa.describe()

In [None]:
# create DataFrame as a temporary view
rus25.createOrReplaceTempView('rus25')
rus41.createOrReplaceTempView('rus41')
usa.createOrReplaceTempView('usa')

# perform some queries to explore data
spark.sql("SELECT brand, count(brand) as brand_count FROM rus25 GROUP BY brand ORDER BY brand_count DESC").show()
# spark.sql("SELECT bodyType, count(bodyType) as b_count, count(bodyType)*100/sum(count(bodyType)) over() as percent FROM rus25 GROUP BY bodyType").show()
spark.sql("SELECT year, bodyType, count(bodyType) as b_count, count(bodyType)*100/sum(count(bodyType)) over(PARTITION BY year) as percent FROM rus25 GROUP BY year, bodyType").show()
usa.filter((usa['make'] == 'Lexus') & (usa['price'] > 55000)).count()

The second query was throwing a warning:
23/06/02 22:04:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

Which means that the window operation does not have a defined partition, so all the data is being moved to a single partition. To enable distribution of the data across multiple partitions for parallel processing we can add a partitioning column, what I do in the 3rd query.

Another insight is that the usa dataset does not contain a color feature, which can hugely impact ability of the model to predict the prices, because people are not rationally thinking creatures. If I was investigating this data for business analysis, I would ditch this dataset.

Also, the value "Truck" in the vehicle_type column of the usa dataset seems to be disturbing, because obviously a car is not a truck, right? So I googled a few records just to find out that those are just vans, as described in "body_type" column. So we can drop "vehicle_type" column and not bother.

Here starts the data preprocessing part. Since the rus25 and rus41 dataframes have the same column names, I will merge them, because differentiating between regions is irrelevant for this work. Before doing so, I check if the schema is for sure the same, because union does not take care of it, neither removes duplicates.

In [None]:
from pyspark.sql.types import StructType

if rus25.schema == rus41.schema:
    print("The schemas are the same")
else:
    print("The schemas are different")

rus = rus25.union(rus41)

In [None]:
# check for redundancy
usa = usa.dropDuplicates()
rus = rus.dropDuplicates()

First thing I need to do is dropping unuseful columns:
a) USA:
    id, vin, stock_no, street, city, state, zip, trim, vehicle_type
b) Russia:
    link, date, parse_date, location, vehicleConfiguration
    
Both datasets have their better and worse sides. Russian dataset is better because of "color" and "power" features, and USA dataset is better because of "seller_name" and "drivetrain" features. I decided to not remove those four distinct attributes, as they provide a meaningful business insight.

Also, it's possible to make an early sociological conclusion based on the very structure of the dataset, that probably American people are more easily influenced by the selling person or company, meanwhile Russian people care more about technical aspects, such as power of the engine or color of a car.

In [None]:
drop_cols_usa = ["id", "vin", "stock_no", "street", "city", "zip", "state", "trim", "vehicle_type"]
drop_cols_russ = ["link", "date", "parse_date", "location","vehicleConfiguration"]

usa = usa.drop(*drop_cols_usa)
rus = rus.drop(*drop_cols_russ)

usa.head(1)
rus.head(1)

The datasets from Russia and USA are quite different, but it's possible to make them more compatible, and overall better. 

Overall, the compatible columns are:
- brand (rus) is make (us)
- price is price, but I have to convert
- mileage (rus) is miles (us), plus conversion needed
- name (rus) is model (us)
- fuelType (rus) is fuel_type (us), and there are some values that need further investigation
- bodyType (rus) is body_type (us), and probably I will convert it to some more general terms
- engineName (rus) is not compatible but I'll try to convert it to values similar to engine_block from usa DF, using chatgpt extension

Moreover, it's still necessary to investigate those columns.

In [None]:
# brand/make
usa.select("make").distinct().show()
rus.select("brand").distinct().show()
# as we can see, only the column name is different, but the way of naming brands is the same, so I just rename
rus = rus.withColumnRenamed("brand", "make")

In [None]:
# price will be convertet to euros in both cases
from pyspark.sql.functions import round
dolar_eur = 0.8770
rubel_eur = 0.0122
usa = usa.withColumn("price", round(usa["price"] * dolar_eur, 3))
rus = rus.withColumn("price", round(rus["price"] * rubel_eur, 3))

random_rows = rus.select("price").sample(False, 0.1, seed=42).limit(5)
random_rows.show()


In [None]:
# mileage
usa = usa.withColumnRenamed("miles", "mileage")
mile_to_km = 1.609344
usa = usa.withColumn("mileage", round(usa["mileage"] * mile_to_km, 1))

random_rows = usa.select("mileage").sample(False, 0.1, seed=42).limit(5)
random_rows.show()

In [None]:
# fuel type
rus = rus.withColumnRenamed("fuelType", "fuel_type")
rus.select("fuel_type").distinct().show()
we can see that the only values are Gasoline, Diesel, Electro and null. 
these are relevant and simple, american DF has 43 unique values 
which is not informative, so I will convert relevant american values to russian, saving gas, methanol and hydrogen
unique_values = usa.select("fuel_type").distinct()
unique_values.show(unique_values.count(), truncate=False)

from pyspark.sql.functions import when, col

usa = usa.withColumn("fuel_type", when(col("fuel_type").contains("Electric"), "Electro")
                                  .when(col("fuel_type").contains("Biodiesel"), "Diesel")
                                  .when(col("fuel_type").contains("E85"), "Gasoline")
                                  .when(col("fuel_type").contains("Diesel"), "Diesel")
                                  .when(col("fuel_type").contains("M85"), "Methanol")
                                   .when(col("fuel_type").contains("Methanol"), "Methanol")
                                    .when(col("fuel_type").contains("Unleaded") & ~col("fuel_type").contains("Diesel"), "Gasoline")
                                  .when(col("fuel_type").contains("Gas"), "Gas")
                                  .otherwise(usa["fuel_type"]))

unique_values = usa.select("fuel_type").distinct()
unique_values.show()

In [None]:
# model
rus = rus.withColumnRenamed("name", "model")

In [None]:
# bodyType
from pyspark.sql.functions import col, when
rus = rus.withColumnRenamed("bodyType", "body_type")
unique_values = rus.select("body_type").distinct()
unique_values.show(unique_values.count(), truncate=False)
unique_values2 = usa.select("body_type").distinct()
unique_values2.show(unique_values2.count(), truncate=False)
rus.createOrReplaceTempView('rus')
spark.sql("SELECT make FROM rus WHERE body_type = 'jeep 3 doors'").show()
# as I supposed, "jeep" doesn't meen jeep, its a suv
rus = rus.withColumn("body_type", when(col("body_type").contains("jeep"), "suv")
                     .when(col("body_type").contains("hatchback"), "hatchback")
                     .when(col("body_type").contains("wagon"), "combi")
                     .when(col("body_type").contains("SUV"), "suv")
                     .when(col("body_type").contains("liftback"), "sedan")
                     .otherwise(rus["body_type"]))

usa = usa.withColumn("body_type", when(col("body_type").contains("Sedan"), "sedan")
                     .when(col("body_type").contains("Hatchback"), "hatchback")
                     .when(col("body_type").contains("Targa"), "open")
                     .when(col("body_type").contains("Crossover"), "suv")
                     .when(col("body_type").contains("Chassis Cab"), "pickup")
                     .when(col("body_type").contains("Convertible"), "open")
                     .when(col("body_type").contains("Combi"), "combi")
                     .when(col("body_type").contains("Pickup"), "pickup")
                     .when(col("body_type").contains("Roadster"), "open")
                     .when(col("body_type").contains("Wagon"), "combi")
                     .when(col("body_type").contains("Commercial Wagon"), "combi")
                     .when(col("body_type").contains("Minivan"), "minivan")
                     .when(col("body_type").contains("Coupe"), "coupe")
                     .when(col("body_type").contains("SUV"), "suv")
                     .otherwise(usa["body_type"]))

# drop cargo van, cutaway, micro car, van, passenger van, chassis cowl, mini mpv, car van
drops = ["Cargo Van", "Mini Mpv", "Van", "Car Van", "Cutaway", "Micro Car", "Cargo Van", "Passenger Van"]
usa = usa.filter(~col("body_type").isin(drops))

In [None]:
# engineName to engine_block

unique_values2 = usa.select("engine_block").distinct()
unique_values2.show(unique_values2.count(), truncate=False)

!pip install openai

In [None]:
import openai
openai.api_key = 

def generate(user_input):
    response = openai.Completion.create(
        engine='text-davinci-003',
        prompt=user_input,
        max_tokens=50,
        n=1,
        stop=None,
        temperature=0.7
    )
    return response.choices[0].text.strip()

rus = rus.withColumnRenamed("engineName", "engine_block")
uniqs = rus.select("engine_block").distinct().collect()
unilst = [row["engine_block"] for row in uniqs]

conversion_map = {
    'V': 'V Engine',
    'I': 'I Engine',
    'H': 'H Engine'
}

converted_values = []
for value in unilst:
    user_input = f"What is the engine block type for '{value}', you can choose V, I or H?"
    response = generate(user_input)
    converted_value = conversion_map.get(response, 'Unknown')
    converted_values.append(converted_value)

# Create a DataFrame with the original and converted values
data = {'Engine Name': unilst, 'Engine Block Type': converted_values}
df = pd.DataFrame(data)

# Display the DataFrame
print(df)

In [None]:
# engine_size
from pyspark.sql.functions import regexp_extract
rus = rus.withColumnRenamed("engineDisplacement", "engine_size")
rus = rus.withColumn('engine_size', regexp_extract(rus['engine_size'], r'(\d+\.\d+|\d+)', 1))

# Show the updated column
# rus.select('engine_size').show()

In [None]:
# drop engine name/block
rus = rus.withColumnRenamed("engineName", "engine_block")
usa = usa.drop("engine_block")
rus = rus.drop("engine_block")

In [None]:
# manage null values
# first check how many of them
from pyspark.sql.functions import col, sum, isnan
nuls = usa.select([sum(col(c).isNull().cast("int") + isnan(col(c)).cast("int")).alias(c) for c in usa.columns])
nuls.show()
total = usa.count()
print(total)

nuls2 = rus.select([sum(col(c).isNull().cast("int") + isnan(col(c)).cast("int")).alias(c) for c in rus.columns])
nuls2.show()
total2 = rus.count()
print(total2)

In [None]:
# drop rows where price, drivetrain, transmission, power, seller name and fuel type is missing
dropus = ['price', 'drivetrain', 'transmission', 'seller_name', 'fuel_type', 'engine_size']
dropru = ['price', 'transmission', 'power', 'fuel_type', 'engine_size']
rus = rus.dropna(subset=dropru)
usa = usa.dropna(subset=dropus)

In [None]:
# for year: inpute with mean year of cars with mileage same +- 10000
from pyspark.sql.functions import avg, when, col, coalesce

average_year = usa.groupBy(((col("mileage") / 10000).cast("integer") * 10000).alias("mileage_range")) \
                 .agg(avg("year").cast("integer").alias("average_year"))

usa = usa.join(average_year, ((usa["mileage"] / 10000).cast("integer") * 10000) == col("mileage_range"), "left")
usa = usa.withColumn("year", when(col("year").isNull(), col("average_year")).otherwise(col("year")))
usa = usa.drop("mileage_range", "average_year")

usa = usa.dropna(subset=["year"], how="any")

average_rok = rus.groupBy(((col("mileage") / 10000).cast("integer") * 10000).alias("mileage_range_rus")) \
                 .agg(avg("year").cast("integer").alias("average_rok"))

rus = rus.join(average_rok, ((rus["mileage"] / 10000).cast("integer") * 10000) == col("mileage_range_rus"), "left")
rus = rus.withColumn("year", when(col("year").isNull(), col("average_rok")).otherwise(col("year")))
rus = rus.drop("mileage_range_rus", "average_rok")

nuls = rus.select([sum(col(c).isNull().cast("int") + isnan(col(c)).cast("int")).alias(c) for c in rus.columns])
nuls.show()

rus = rus.dropna(subset=["year"], how="any")

In [None]:
# color: inpute randomly within colors list
from pyspark.sql.functions import array, lit, rand

colors = rus.select("color").distinct().collect()
c_list = [row["color"] for row in colors]

rus = rus.withColumn("color", array([lit(c) for c in c_list])[(rand() * len(c_list)).cast("int")])
# rus.select("color").show()
rus = rus.dropna(subset=["color"], how="any")


In [None]:
# for mileage: inpute with average mileage of cars from same year
# coalesce() func is used to return the first non-null val among specified cols
average_mileage_rus = rus.groupBy("year").agg(avg("mileage").alias("average_mileage_rus"))
rus = rus.join(average_mileage_rus, "year", "left")
rus = rus.withColumn("mileage", coalesce(col("mileage"), col("average_mileage_rus")))
rus = rus.drop("average_mileage_rus")

nuls = rus.select([sum(col(c).isNull().cast("int") + isnan(col(c)).cast("int")).alias(c) for c in rus.columns])
# nuls.show()
# as we can see, we still get 57 records with missing data, where it wasn't possible to infer the value from years
# I drop them
rus = rus.dropna(subset=["mileage"], how="any")

average_mileage_us = usa.groupBy("year").agg(avg("mileage").alias("average_mileage_us"))
usa = usa.join(average_mileage_us, "year", "left")
usa = usa.withColumn("mileage", coalesce(col("mileage"), col("average_mileage_us")))
usa = usa.drop("average_mileage_us")

nuls = usa.select([sum(col(c).isNull().cast("int") + isnan(col(c)).cast("int")).alias(c) for c in usa.columns])
# nuls.show()
# no need to drop anything

In [None]:
!pip install matplotlib
!pip install seaborn

What is the most common brand, color, fuel type during years, for USA and Russia?

In [None]:
from pyspark.sql.functions import desc
import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
import seaborn as sns

make_counts = rus.groupBy("year", "make").count().orderBy("year", desc("count"))
color_counts = rus.groupBy("year", "color").count().orderBy("year", desc("count"))
fuel_type_counts = rus.groupBy("year", "fuel_type").count().orderBy("year", desc("count"))

make_counts_pd = make_counts.toPandas()
color_counts_pd = color_counts.toPandas()
fuel_type_counts_pd = fuel_type_counts.toPandas()

Get the top 10 most popular makes
top_10_makes = make_counts_pd.groupby('make').sum().nlargest(10, 'count').index
make_counts_pd_filtered = make_counts_pd[make_counts_pd['make'].isin(top_10_makes)]

plt.figure(figsize=(12, 8))
plt.subplot(3, 1, 1)
sns.barplot(data=make_counts_pd_filtered, x="year", y="count", hue="make", palette='tab10')
plt.title("Most Common Brand during Years - Russia")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")

plt.subplot(3, 1, 2)
sns.barplot(data=color_counts_pd, x="year", y="count", hue="color")
plt.title("Most Common Color during Years - Russia")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")

plt.subplot(3, 1, 3)
sns.barplot(data=fuel_type_counts_pd, x="year", y="count", hue="fuel_type")
plt.title("Most Common Fuel Type during Years - Russia")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")

plt.tight_layout()
plt.show()


usa_make_counts = usa.groupBy("year", "make").count().orderBy("year", desc("count"))
usa_fuel_type_counts = usa.groupBy("year", "fuel_type").count().orderBy("year", desc("count"))

usa_make_counts_pd = usa_make_counts.toPandas()
usa_fuel_type_counts_pd = usa_fuel_type_counts.toPandas()

# Get the top 10 most popular makes
usa_top_10_makes = usa_make_counts_pd.groupby('make').sum().nlargest(10, 'count').index
usa_make_counts_pd_filtered = usa_make_counts_pd[usa_make_counts_pd['make'].isin(usa_top_10_makes)]

plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
sns.barplot(data=usa_make_counts_pd_filtered, x="year", y="count", hue="make", palette='tab10')
plt.title("Most Common Brand during Years - USA")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")

plt.subplot(2, 1, 2)
sns.barplot(data=usa_fuel_type_counts_pd, x="year", y="count", hue="fuel_type")
plt.title("Most Common Fuel Type during Years - USA")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")

In [None]:
# check if data follow normal distrib
import matplotlib.pyplot as plt
cols = ["year", "mileage", "power", "price", "engine_size", "make", "model", "body_type", "color", "fuel_type", "transmission"]

plt.figure(figsize=(15, 10))
plt.suptitle("Histograms - Russia")
for i, col_name in enumerate(cols):
    plt.subplot(3, 4, i+1)
    data = rus.select(col(col_name)).toPandas()
    plt.figure()
    plt.hist(data[col_name], bins=20)
    plt.title(col_name)
    plt.xlabel(col_name)
    plt.ylabel("Frequency")
    plt.show()

plt.figure(figsize=(15, 10))
plt.suptitle("Histograms - USA")
colu = ["year", "mileage", "price", "engine_size", "make", "model", "body_type", "fuel_type", "transmission", "seller_name", "drivetrain"]
for i, col_name in enumerate(colu):
    plt.subplot(3, 4, i+1) 
    data = usa.select(col(col_name)).toPandas()
    plt.figure()
    plt.hist(data[col_name], bins=20)
    plt.title(col_name)
    plt.xlabel(col_name)
    plt.ylabel("Frequency")
    plt.show()

Median Car Price by Year

In [None]:
rus_median_price = rus.groupBy("year").agg({"price": "median"}).orderBy("year")
rus_median_price_pd = rus_median_price.toPandas()
import matplotlib.pyplot as plt
import seaborn as sns
usa_median_price = usa.groupBy("year").agg({"price": "median"}).orderBy("year")
usa_median_price_pd = usa_median_price.toPandas()

plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
sns.lineplot(data=rus_median_price_pd, x="year", y="median(price)")
plt.title("Median Car Price by Year - Russia")

plt.subplot(1, 2, 2)
sns.lineplot(data=usa_median_price_pd, x="year", y="median(price)")
plt.title("Median Car Price by Year - USA")

plt.tight_layout()
plt.show()

Average prices of cars by make

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg
import matplotlib.ticker as ticker

top_brands = rus.groupBy("make").count().orderBy("count", ascending=False).limit(3).select("make").rdd.flatMap(lambda x: x).collect()
brand_prices = rus.filter(rus["make"].isin(top_brands)).groupBy(["make", "year"]).agg(avg("price").alias("average_price")).orderBy("make", "year").toPandas()

years = brand_prices["year"].unique()
years = years[::5]

plt.figure(figsize=(12, 6))

sns.barplot(data=brand_prices, x="year", y="average_price", hue="make")

plt.xlabel("Year")
plt.ylabel("Average Price")
plt.title("Average Car Prices by Make - Russia")
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(5))
plt.xticks(rotation=90)
plt.legend(bbox_to_anchor=(1, 1), loc="upper left")
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg
# Group by fuel type and calculate average price for each year
fuel_type_prices = rus.groupby(["fuel_type", "year"]).agg(avg("price").alias("average_price")).orderBy("fuel_type", "year").toPandas()

fuel_types = fuel_type_prices["fuel_type"].unique()

years = fuel_type_prices["year"].unique()
years = years[::5]

n_fuel_types = len(fuel_types)
bar_width = 0.8 / n_fuel_types
opacity = 0.8

plt.figure(figsize=(12, 6))

for i, fuel_type in enumerate(fuel_types):
    data = fuel_type_prices[fuel_type_prices["fuel_type"] == fuel_type]
    plt.bar(data["year"], data["average_price"], width=bar_width, alpha=opacity, label=fuel_type)

plt.xlabel("Year")
plt.ylabel("Average Price")
plt.title("Average Car Prices by Fuel Type - Russia")
plt.xticks(years)
plt.legend()
plt.tight_layout()
plt.show()



fuel_type_prices = usa.groupby(["fuel_type", "year"]).agg(avg("price").alias("average_price")).orderBy("fuel_type", "year").toPandas()

fuel_types = fuel_type_prices["fuel_type"].unique()

years = fuel_type_prices["year"].unique()
years = years[::5]

n_fuel_types = len(fuel_types)
bar_width = 0.8 / n_fuel_types
opacity = 0.8

plt.figure(figsize=(12, 6))

for i, fuel_type in enumerate(fuel_types):
    data = fuel_type_prices[fuel_type_prices["fuel_type"] == fuel_type]
    plt.bar(data["year"], data["average_price"], width=bar_width, alpha=opacity, label=fuel_type)

plt.xlabel("Year")
plt.ylabel("Average Price")
plt.title("Average Car Prices by Fuel Type - USA")
plt.xticks(years)
plt.legend()
plt.tight_layout()
plt.show()


Average price for specific fuel type

In [None]:
# after analysis, dropping columns that are different
usa = usa.drop("seller_name", "drivetrain")
rus = rus.drop("power", "color")

In [None]:
rus.printSchema()

In [None]:
# encoding and handling data types
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.functions import col
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

numeric_cols = ["year", "mileage","price", "engine_size"]
for col_name in numeric_cols:
    rus = rus.withColumn(col_name, col(col_name).cast("float"))
rus = rus.filter(col("year") > 2010)
   
cat_cols = ["make", "model", "body_type", "fuel_type", "transmission"]
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name+"_index") for col_name in cat_cols]
encoders = [OneHotEncoder(inputCols=[col_name+"_index"], outputCols=[col_name+"_encoded"]) for col_name in cat_cols]

for indexer in indexers:
    rus = indexer.fit(rus).transform(rus)

for encoder in encoders:
    rus = encoder.fit(rus).transform(rus)
    
columns_to_drop = cat_cols + [col_name+"_index" for col_name in cat_cols]
rus = rus.drop(*columns_to_drop)

for col_name in numeric_cols:
    usa = usa.withColumn(col_name, col(col_name).cast("float"))

indexerss = [StringIndexer(inputCol=col_name, outputCol=col_name+"_index") for col_name in cat_cols]
encoderss = [OneHotEncoder(inputCols=[col_name+"_index"], outputCols=[col_name+"_encoded"]) for col_name in cat_cols]

for indexer in indexerss:
    usa = indexer.fit(usa).transform(usa)

for encoder in encoderss:
    usa = encoder.fit(usa).transform(usa)
    
cols_to_drop = cat_cols + [col_name+"_index" for col_name in cat_cols]
usa = usa.drop(*cols_to_drop)

In [None]:
# split data
rus_train, rus_test = rus.randomSplit([0.8, 0.2], seed=42)
usa_train, usa_test = usa.randomSplit([0.8, 0.2], seed=42)

In [None]:
# train Linear Regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

input_cols = ["year", "mileage", "engine_size", "make_encoded", "model_encoded", "body_type_encoded", "fuel_type_encoded", "transmission_encoded"]

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

rus_train = assembler.transform(rus_train)
# usa_train = assembler.transform(usa_train)

lr = LinearRegression(featuresCol="features", labelCol="price")
rus_lr_model = lr.fit(rus_train)
usa_lr_model = lr.fit(usa_train)

# predictions
rus_predictions = rus_lr_model.transform(assembler.transform(rus_test))
usa_predictions = usa_lr_model.transform(assembler.transform(usa_test))

# MAE
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="price", metricName="mae")
rus_mae = evaluator.evaluate(rus_predictions)
# usa_mae = evaluator.evaluate(usa_predictions)

print("MAE for rus model:", rus_mae)
# print("MAE for usa model:", usa_mae)

# RMSE
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")

rus_rmse = evaluator.evaluate(rus_predictions)
# usa_rmse = evaluator.evaluate(usa_predictions)

print("RMSE for rus model:", rus_rmse)
# print("RMSE for usa model:", usa_rmse)

# loss function
loss_val = rus_lr_model.summary.losses
figure()
plt.plot(loss_values)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Loss Function - Russia')
plt.show()

loss_val = usa_lr_model.summary.losses
figure()
plt.plot(loss_values)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Loss Function - USA')
plt.show()

In [None]:
# train Random Forest
from pyspark.ml.regression import RandomForestRegressor
input_cols = ["year", "mileage", "engine_size", "make_encoded", "model_encoded", "body_type_encoded", "fuel_type_encoded", "transmission_encoded"]

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

rus_train = assembler.transform(rus_train)

rf = RandomForestRegressor(featuresCol="features", labelCol="price")
rus_rf_model = rf.fit(rus_train)
# usa_rf_model = rf.fit(usa_train)

#predictions
rus_predictions = rus_rf_model.transform(assembler.transform(rus_test))
usa_predictions = usa_rf_model.transform(assembler.transform(usa_test)



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
# MAE
evaluator = RegressionEvaluator(labelCol="price", metricName="mae")

rus_mae = evaluator.evaluate(rus_predictions)
usa_mae = evaluator.evaluate(usa_rf_predictions)

print("MAE for rus Random Forest model:", rus_mae)
print("MAE for usa Random Forest model:", usa_mae)

#RMSE
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")

rus_rmse = evaluator.evaluate(rus_predictions)
usa_rmse = evaluator.evaluate(usa_rf_predictions)

print("RMSE for rus Random Forest model:", rus_rmse)
print("RMSE for usa Random Forest model:", usa_rmse)