## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/CarPrice_Assignment.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

car_ID,symboling,CarName,fueltype,aspiration,doornumber,carbody,drivewheel,enginelocation,wheelbase,carlength,carwidth,carheight,curbweight,enginetype,cylindernumber,enginesize,fuelsystem,boreratio,stroke,compressionratio,horsepower,peakrpm,citympg,highwaympg,price
1,3,alfa-romero giulia,gas,std,two,convertible,rwd,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000,21,27,13495.0
2,3,alfa-romero stelvio,gas,std,two,convertible,rwd,front,88.6,168.8,64.1,48.8,2548,dohc,four,130,mpfi,3.47,2.68,9.0,111,5000,21,27,16500.0
3,1,alfa-romero Quadrifoglio,gas,std,two,hatchback,rwd,front,94.5,171.2,65.5,52.4,2823,ohcv,six,152,mpfi,2.68,3.47,9.0,154,5000,19,26,16500.0
4,2,audi 100 ls,gas,std,four,sedan,fwd,front,99.8,176.6,66.2,54.3,2337,ohc,four,109,mpfi,3.19,3.4,10.0,102,5500,24,30,13950.0
5,2,audi 100ls,gas,std,four,sedan,4wd,front,99.4,176.6,66.4,54.3,2824,ohc,five,136,mpfi,3.19,3.4,8.0,115,5500,18,22,17450.0
6,2,audi fox,gas,std,two,sedan,fwd,front,99.8,177.3,66.3,53.1,2507,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,19,25,15250.0
7,1,audi 100ls,gas,std,four,sedan,fwd,front,105.8,192.7,71.4,55.7,2844,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,19,25,17710.0
8,1,audi 5000,gas,std,four,wagon,fwd,front,105.8,192.7,71.4,55.7,2954,ohc,five,136,mpfi,3.19,3.4,8.5,110,5500,19,25,18920.0
9,1,audi 4000,gas,turbo,four,sedan,fwd,front,105.8,192.7,71.4,55.9,3086,ohc,five,131,mpfi,3.13,3.4,8.3,140,5500,17,20,23875.0
10,0,audi 5000s (diesel),gas,turbo,two,hatchback,4wd,front,99.5,178.2,67.9,52.0,3053,ohc,five,131,mpfi,3.13,3.4,7.0,160,5500,16,22,17859.167


In [0]:
# Create a view or table

temp_table_name = "CarPrice_Assignment_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `CarPrice_Assignment_csv`

In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "carprice_assignment_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [0]:
spark = SparkSession.builder.appName("MultipleRegression").getOrCreate()


In [0]:
# Load Example Data
df1 = spark.read.csv("/FileStore/tables/CarPrice_Assignment.csv", header=True, inferSchema=True)

In [0]:
df1.columns

Out[37]: ['car_ID',
 'symboling',
 'CarName',
 'fueltype',
 'aspiration',
 'doornumber',
 'carbody',
 'drivewheel',
 'enginelocation',
 'wheelbase',
 'carlength',
 'carwidth',
 'carheight',
 'curbweight',
 'enginetype',
 'cylindernumber',
 'enginesize',
 'fuelsystem',
 'boreratio',
 'stroke',
 'compressionratio',
 'horsepower',
 'peakrpm',
 'citympg',
 'highwaympg',
 'price']

In [0]:
from pyspark.sql.functions import col, sum

# Checking for Missing Values
missing_counts = df1.select([sum(col(c).isNull().cast("int")).alias(c) for c in df1.columns])
print("Missing value counts:")
missing_counts.show()


Missing value counts:
+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+
|car_ID|symboling|CarName|fueltype|aspiration|doornumber|carbody|drivewheel|enginelocation|wheelbase|carlength|carwidth|carheight|curbweight|enginetype|cylindernumber|enginesize|fuelsystem|boreratio|stroke|compressionratio|horsepower|peakrpm|citympg|highwaympg|price|
+------+---------+-------+--------+----------+----------+-------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-----+
|     0|        0|      0|       0|         0|         0|      0|         0|             0|        0|        0|       0|        0|         0|         0|             0|       

In [0]:
# Find categorical columns
categorical_columns = [col for col in df1.columns if df1.select(col).dtypes[0][1] in ["string", "int"]]

print("Categorical columns:", categorical_columns)

Categorical columns: ['car_ID', 'symboling', 'CarName', 'fueltype', 'aspiration', 'doornumber', 'carbody', 'drivewheel', 'enginelocation', 'curbweight', 'enginetype', 'cylindernumber', 'enginesize', 'fuelsystem', 'horsepower', 'peakrpm', 'citympg', 'highwaympg']


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [0]:
df1

Out[41]: DataFrame[car_ID: int, symboling: int, CarName: string, fueltype: string, aspiration: string, doornumber: string, carbody: string, drivewheel: string, enginelocation: string, wheelbase: double, carlength: double, carwidth: double, carheight: double, curbweight: int, enginetype: string, cylindernumber: string, enginesize: int, fuelsystem: string, boreratio: double, stroke: double, compressionratio: double, horsepower: int, peakrpm: int, citympg: int, highwaympg: int, price: double]

In [0]:
columns = ['car_ID', 'symboling', 'CarName', 'fueltype', 'aspiration', 'doornumber', 'carbody', 'drivewheel', 'enginelocation', 'curbweight', 'enginetype', 'cylindernumber', 'enginesize', 'fuelsystem', 'horsepower', 'peakrpm', 'citympg', 'highwaympg']


# StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col + "Index") for col in columns]
indexer_pipeline = Pipeline(stages=indexers)
indexed_df = indexer_pipeline.fit(df1).transform(df1)

# OneHotEncoder for each indexed column
encoders = [OneHotEncoder(inputCol=col + "Index", outputCol=col + "Vec") for col in columns]
encoder_pipeline = Pipeline(stages=encoders)
encoded_df = encoder_pipeline.fit(indexed_df).transform(indexed_df)

In [0]:
# Feature Engineering
feature_cols = ['CarName',
 'fueltype',
 'aspiration',
 'doornumber',
 'carbody',
 'drivewheel',
 'enginelocation',
 'carlength',
 'carwidth',
 'carheight',
 'curbweight',
 'enginetype',
 'enginesize',
 'fuelsystem',
 'stroke',
 'compressionratio',
 'horsepower',
 'peakrpm',
 'citympg',
 'highwaympg']
# Feature Engineering
feature_cols = [col + "Vec" for col in columns]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = assembler.transform(encoded_df)

In [0]:
# Split Data
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=123)

In [0]:
# Create and Train Multiple Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train_data)

In [0]:
# Model Evaluation
predictions = lr_model.transform(test_data)
predictions.show()

+------+---------+--------------------+--------+----------+----------+---------+----------+--------------+---------+---------+--------+---------+----------+----------+--------------+----------+----------+---------+------+----------------+----------+-------+-------+----------+-------+-----------+--------------+------------+-------------+---------------+---------------+------------+---------------+-------------------+---------------+---------------+-------------------+---------------+---------------+---------------+------------+------------+---------------+-----------------+-------------+----------------+-------------+-------------+-------------+-------------+-------------+-----------------+-----------------+-------------+-----------------+---------------+-------------+---------------+---------------+---------------+---------------+--------------------+------------------+
|car_ID|symboling|             CarName|fueltype|aspiration|doornumber|  carbody|drivewheel|enginelocation|wheelbase|c

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Create a RegressionEvaluator instance
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

In [0]:
# Calculate the RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 2736.0290109174443


In [0]:
# Other metrics
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print("Mean Absolute Error (MAE):", mae)
print("R-squared:", r2)

Mean Absolute Error (MAE): 1990.4220463735157
R-squared: 0.8175655988657071
