# Cars data cleanup

In [2]:
dbutils.widgets.text("storage_account_name", "")
dbutils.widgets.text("storage_account_key", "")

In [3]:
import pyspark.sql.functions as F
from os.path import splitext

In [4]:
storage_account_name = dbutils.widgets.get("storage_account_name")
storage_account_key = dbutils.widgets.get("storage_account_key")
blob_path_root = f"wasbs://carsdata@{storage_account_name}.blob.core.windows.net/"

spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [5]:
def get_file_url(file_name):
  return f"{blob_path_root}/{file_name}"

def write_csv(data_frame, file_name):
  data_frame.write.format("csv").mode("overwrite").option("header", "true").save(get_file_url(file_name))

In [6]:
files = ["Make.json", "MakeModel.json", "Option.json", "OptionValue.json", "Series.json", "Specification.json", "SpecificationValue.json", "Trim.json", "Generation.json"]
data_frames = {}

get_key = lambda file_name: splitext(file_name)[0].lower()

for file in files:
  df = spark.read.json(get_file_url(file))
  data_frames.update({get_key(file): df})

In [7]:
makes_df = data_frames["make"]
models_df = data_frames["makemodel"]
options_df = data_frames["option"]
optionvalues_df = data_frames["optionvalue"]
series_df = data_frames["series"]
specs_df = data_frames["specification"]
specvalues_df = data_frames["specificationvalue"]
trims_df = data_frames["trim"]
generations_df = data_frames["generation"]

In [8]:
specs_with_parent_names_df = (
  specs_df.alias("df1")
  .join(specs_df.alias("df2"), F.col("df1.id") == F.col("df2.parent_id"), "inner")
  .select(
    F.col("df2.id").alias("specification_id"), 
    F.col("df2.name").alias("specification_name"), 
    F.col("df1.name").alias("specification_category"))
)


In [9]:
specs_with_values_df = (specvalues_df
                        .join(specs_with_parent_names_df, on = ['specification_id'], how = 'inner')
                       .select(
                              F.col("trim_id"),
                              F.col("specification_name"),
                              F.col("value").alias("specification_value"),
                              F.col("unit").alias("specification_unit"),
                              F.col("specification_category")))

In [10]:
trims_df = (trims_df
  .withColumnRenamed("id", "trim_id")
  .withColumnRenamed("name", "trim_name"))

In [11]:
trim_specs_df = trims_df.join(specs_with_values_df, ["trim_id"], "inner")

In [12]:
series_df = (series_df.withColumnRenamed("id", "series_id")
              .withColumnRenamed("name", "series_name")
              .join(trim_specs_df, ["series_id"], "inner")
              .drop("mode_id", "trim_id", "model_id")
            )

In [13]:
generations_df = (generations_df
  .withColumnRenamed("name", "generation_name")
  .withColumnRenamed("id", "generation_id")
  .withColumnRenamed("year_end", "generation_year_end")
  .withColumnRenamed("year_start", "generation_year_start"))

In [14]:
series_generations_df = series_df.join(generations_df, ["generation_id"], "inner")

In [15]:
models_df = models_df.withColumnRenamed("name", "model_name")
model_details_df = (models_df.join(series_generations_df, "model_id", "inner")
                    .drop("model_id")
                    .drop("generation_id")
                    .drop("series_id")
                   .withColumnRenamed("year_production_end", "trim_year_production_end")
                   .withColumnRenamed("year_production_start", "trim_year_production_start")
                   )

In [16]:
makes_df = makes_df.withColumnRenamed("name", "make_name").withColumnRenamed("id", "make_id")
make_model_details_df = makes_df.join(model_details_df, "make_id", "inner").drop("make_id")

In [17]:
cars_df = make_model_details_df

In [18]:
count1 = cars_df.count()

In [19]:
write_csv(cars_df, "cars_complete_csv")

In [20]:
test_df = spark.read.csv(get_file_url("cars_complete_csv"), header = True)

In [21]:
count2 = test_df.count()

In [22]:
count1 == count2 == 2025118