## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [None]:
# File location and type
file_location = "/FileStore/tables/tips.csv"
file_type = "csv"



# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.csv(file_location,header=True,inferSchema=True)

display(df)

total_bill,tip,sex,smoker,day,time,size
16.99,1.01,Female,No,Sun,Dinner,2
10.34,1.66,Male,No,Sun,Dinner,3
21.01,3.5,Male,No,Sun,Dinner,3
23.68,3.31,Male,No,Sun,Dinner,2
24.59,3.61,Female,No,Sun,Dinner,4
25.29,4.71,Male,No,Sun,Dinner,4
8.77,2.0,Male,No,Sun,Dinner,2
26.88,3.12,Male,No,Sun,Dinner,4
15.04,1.96,Male,No,Sun,Dinner,2
14.78,3.23,Male,No,Sun,Dinner,2


In [None]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [None]:
df.columns

Out[14]: ['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="sex",outputCol="sex_indexed")
df_preprocessed = indexer.fit(df).transform(df)
df_preprocessed.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [None]:
df_preprocessed = df_preprocessed.drop("sex")
display(df_preprocessed)

total_bill,tip,smoker,day,time,size,sex_indexed
16.99,1.01,No,Sun,Dinner,2,1.0
10.34,1.66,No,Sun,Dinner,3,0.0
21.01,3.5,No,Sun,Dinner,3,0.0
23.68,3.31,No,Sun,Dinner,2,0.0
24.59,3.61,No,Sun,Dinner,4,1.0
25.29,4.71,No,Sun,Dinner,4,0.0
8.77,2.0,No,Sun,Dinner,2,0.0
26.88,3.12,No,Sun,Dinner,4,0.0
15.04,1.96,No,Sun,Dinner,2,0.0
14.78,3.23,No,Sun,Dinner,2,0.0


In [None]:
indexer = StringIndexer(inputCols=['smoker','day','time'],outputCols=['smoker_indexed','day_indexed','time_indexed'])
df_preprocessed = indexer.fit(df_preprocessed).transform(df_preprocessed)
df_preprocessed.show()

+----------+----+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     26.88|3.12|    No|Sun|Dinner|   4|        0.0|           0.0|   

In [None]:
columns_to_drop = ['smoker','day','time']
df_preprocessed = df_preprocessed.select([column for column in df_preprocessed.columns if column not in columns_to_drop])

df_preprocessed.show()

+----------+----+----+-----------+--------------+-----------+------------+
|total_bill| tip|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+----+-----------+--------------+-----------+------------+
|     16.99|1.01|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|   2|        0.0|           0.0|        1.0|         0.0|
|     26.88|3.12|   4|        0.0|           0.0|        1.0|         0.0|
|     15.04|1.96|   2|        0.0|           0.0|        1.0|         0.0|
|     14.78|3.23|   2|        0.0|           0.0|        1.0|         0.0|
|     10.27|1.71|   2|   

In [None]:
df_preprocessed.columns

Out[21]: ['total_bill',
 'tip',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed']

In [None]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols=['tip',
 'size',
 'sex_indexed',
 'smoker_indexed',
 'day_indexed',
 'time_indexed'],outputCol='independent_features')

output = feature_assembler.transform(df_preprocessed)

In [None]:
output.select("independent_features").show()

+--------------------+
|independent_features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [None]:
finilized_data = output.select('independent_features','total_bill')


In [None]:
finilized_data.show()

+--------------------+----------+
|independent_features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [None]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finilized_data.randomSplit([0.75,0.25])
model = LinearRegression(featuresCol='independent_features',labelCol='total_bill')
model = model.fit(train_data)

In [None]:
model.coefficients

Out[33]: DenseVector([3.2977, 3.2088, -1.7932, 2.2824, -0.1727, -0.8123])

In [None]:
model.intercept

Out[35]: 1.869861034535951

In [None]:
pred_results = model.evaluate(test_data)

In [None]:
pred_results.predictions.show()

+--------------------+----------+------------------+
|independent_features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.135206204705753|
|(6,[0,1],[1.75,2.0])|     17.82|14.058572494237318|
| (6,[0,1],[2.0,2.0])|     13.37| 14.88300668131907|
|(6,[0,1],[2.24,3.0])|     16.04|  18.8832995759821|
|(6,[0,1],[2.34,4.0])|     17.81| 22.42190932587934|
| (6,[0,1],[3.0,2.0])|      14.0|18.180743429646085|
|(6,[0,1],[3.27,2.0])|     17.78| 19.07113235169438|
|(6,[0,1],[5.92,3.0])|     29.03|31.018970809825518|
|[1.0,1.0,1.0,1.0,...|      3.07|  8.86571527445954|
|[1.32,2.0,0.0,0.0...|      9.68|12.467799978610227|
|[1.5,2.0,0.0,0.0,...|     12.46|12.715901165616144|
|[1.56,2.0,0.0,0.0...|      9.94|13.259256798208712|
|[1.57,2.0,0.0,0.0...|     15.42|13.292234165691982|
|[1.58,2.0,0.0,1.0...|     13.42|14.449850127884442|
|[1.63,2.0,1.0,0.0...|     11.87|10.711892271732836|
|[1.64,2.0,0.0,1.0...|     15.36|15.9782573638

In [None]:
pred_results.r2,pred_results.meanSquaredError

Out[45]: (0.4649539490953174, 33.644044774994704)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
lr = LinearRegression(featuresCol="independent_features", labelCol="total_bill")

# Create a ParamGridBuilder to specify the hyperparameter grid
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1, 0.2]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Create an evaluator for the regression task
evaluator = RegressionEvaluator(metricName="rmse", labelCol="total_bill")

# Create a CrossValidator with the Linear Regression model, ParamGridBuilder, and evaluator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

# Fit the CrossValidator to the training data
cv_model = crossval.fit(train_data)

# Make predictions on the test data using the best model
predictions = cv_model.transform(test_data)

# Evaluate the best model using the evaluator
rmse = evaluator.evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
# Print the best model's parameters and the RMSE
best_model = cv_model.bestModel
print("Best Model Parameters:")
print("regParam =", best_model.getRegParam())
print("elasticNetParam =", best_model.getElasticNetParam())
print("RMSE:", rmse)
print("R-squared:", r2)

Best Model Parameters:
regParam = 0.1
elasticNetParam = 1.0
RMSE: 5.788075747793521
R-squared: 0.46721575630061396
