
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practising...").getOrCreate()

24/02/24 19:44:15 WARN Utils: Your hostname, MacIndamutsa-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.12 instead (on interface en0)
24/02/24 19:44:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/24 19:44:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/24 19:44:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
# File location and type
# file_location = "/FileStore/tables/employees_data.csv"
# file_loc = "/FileStore/tables/tips.csv"
file_type = "csv"
file_location = "./employees_data_enhanced.csv"
file_loc = "./tips.csv"

# # CSV options
# infer_schema = "false"
# first_row_is_header = "false"
# delimiter = ","

# # The applied options are for CSV files. For other file types, these will be ignored.
# df = spark.read.format(file_type) \
#   .option("inferSchema", infer_schema) \
#   .option("header", first_row_is_header) \
#   .option("sep", delimiter) \
#   .load(file_location)

# display(df)

df = spark.read.csv(file_location, header=True, inferSchema=True)
data = spark.read.csv(file_loc, header=True, inferSchema=True)
data.printSchema()
data.show()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57

In [7]:
# Handling categorical features
from pyspark.ml.feature import StringIndexer # Converting categorical features into numerical features


In [8]:
indexer = StringIndexer(inputCol="sex", outputCol="sex_indexed")
data_r = indexer.fit(data).transform(data)
data_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [9]:
indexer = StringIndexer(inputCols=["smoker", "day", "time"], outputCols=["smoker_indexed", "day_indexed", "time_indexed"])
data_r = indexer.fit(data).transform(data_r)
data_r.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|           0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|


In [10]:
# Grouping independent features and dependent features separately
from pyspark.ml.feature import VectorAssembler
featureAssembled = VectorAssembler(inputCols=['tip','size','sex_indexed', 'smoker_indexed','day_indexed','time_indexed'], outputCol="Independent Features")
output = featureAssembled.transform(data_r)
output.show()

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|[1.01,2.0,1.0,0.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[1.66,3.0,0.0,0.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|[3.5,3.0,0.0,0.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|[3.31,2.0,0.0,0.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|[3.61,4.0,1.0,0.0...|
|     25.29|4.71|  Male|    No|S

In [11]:
output.select("Independent features").show()

+--------------------+
|Independent features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [12]:
finalized_data = output.select("Independent features", "total_bill")
finalized_data.show()

+--------------------+----------+
|Independent features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [13]:
# Now Let us apply the linear regression
from pyspark.ml.regression import LinearRegression

# Train test split
train_data, test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol="Independent features", labelCol="total_bill")
regressor = regressor.fit(train_data)

24/02/24 19:44:47 WARN Instrumentation: [d2f3e1d6] regParam is zero, which might cause numerical instability and overfitting.
24/02/24 19:44:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/02/24 19:44:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [14]:
regressor.intercept

2.2404576793993254

In [15]:
# Predictions
predictor_model =  regressor.evaluate(test_data)

In [16]:
# Let us predict
predictor_model.predictions.show()

+--------------------+----------+------------------+
|Independent features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.174816488984726|
|(6,[0,1],[2.31,3.0])|     18.69|18.955667890282307|
|(6,[0,1],[2.72,2.0])|     13.28|16.909006923566608|
|(6,[0,1],[3.18,2.0])|     19.82|18.283189003492737|
| (6,[0,1],[9.0,4.0])|     48.33| 42.21253052542308|
|[1.0,1.0,1.0,0.0,...|      7.25| 7.579284596527254|
|[1.01,2.0,1.0,0.0...|     16.99|10.578842896218392|
|[1.1,2.0,1.0,1.0,...|      12.9| 13.09113088505714|
|[1.17,2.0,0.0,1.0...|     32.83|14.220246409189855|
|[1.5,2.0,0.0,0.0,...|     12.46|12.359065100282473|
|[1.5,2.0,0.0,1.0,...|     12.03|14.300700724787253|
|[1.5,2.0,1.0,0.0,...|     26.41|12.344436199618562|
|[1.5,2.0,1.0,0.0,...|     11.17|10.837726535744764|
|[1.56,2.0,0.0,0.0...|      9.94|13.141887547230548|
|[1.71,2.0,0.0,0.0...|     10.27|13.589990399380374|
|[1.8,2.0,1.0,0.0,...|     12.43|11.7339322400

In [17]:
regressor.transform(test_data).show()

+--------------------+----------+------------------+
|Independent features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.174816488984726|
|(6,[0,1],[2.31,3.0])|     18.69|18.955667890282307|
|(6,[0,1],[2.72,2.0])|     13.28|16.909006923566608|
|(6,[0,1],[3.18,2.0])|     19.82|18.283189003492737|
| (6,[0,1],[9.0,4.0])|     48.33| 42.21253052542308|
|[1.0,1.0,1.0,0.0,...|      7.25| 7.579284596527254|
|[1.01,2.0,1.0,0.0...|     16.99|10.578842896218392|
|[1.1,2.0,1.0,1.0,...|      12.9| 13.09113088505714|
|[1.17,2.0,0.0,1.0...|     32.83|14.220246409189855|
|[1.5,2.0,0.0,0.0,...|     12.46|12.359065100282473|
|[1.5,2.0,0.0,1.0,...|     12.03|14.300700724787253|
|[1.5,2.0,1.0,0.0,...|     26.41|12.344436199618562|
|[1.5,2.0,1.0,0.0,...|     11.17|10.837726535744764|
|[1.56,2.0,0.0,0.0...|      9.94|13.141887547230548|
|[1.71,2.0,0.0,0.0...|     10.27|13.589990399380374|
|[1.8,2.0,1.0,0.0,...|     12.43|11.7339322400

In [18]:
predictor_model.meanSquaredError, predictor_model.meanAbsoluteError

(40.32400732982333, 4.482761526036484)

In [20]:
# Let us save the model

# Specifying the path where I save the model in DBFS
# model_path = "/FileStore/tables/my_linear_regression_model"
model_path = "./model_data"
# Save the model
regressor.write().overwrite().save(model_path)


                                                                                

In [22]:
## Load it back
from pyspark.ml.regression import LinearRegressionModel

# Specify the same path where you saved the model
loaded_regressor = LinearRegressionModel.load(model_path)

# Now we can use the loaded model
predictions = loaded_regressor.transform(test_data)
predictions.show()

+--------------------+----------+------------------+
|Independent features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.174816488984726|
|(6,[0,1],[2.31,3.0])|     18.69|18.955667890282307|
|(6,[0,1],[2.72,2.0])|     13.28|16.909006923566608|
|(6,[0,1],[3.18,2.0])|     19.82|18.283189003492737|
| (6,[0,1],[9.0,4.0])|     48.33| 42.21253052542308|
|[1.0,1.0,1.0,0.0,...|      7.25| 7.579284596527254|
|[1.01,2.0,1.0,0.0...|     16.99|10.578842896218392|
|[1.1,2.0,1.0,1.0,...|      12.9| 13.09113088505714|
|[1.17,2.0,0.0,1.0...|     32.83|14.220246409189855|
|[1.5,2.0,0.0,0.0,...|     12.46|12.359065100282473|
|[1.5,2.0,0.0,1.0,...|     12.03|14.300700724787253|
|[1.5,2.0,1.0,0.0,...|     26.41|12.344436199618562|
|[1.5,2.0,1.0,0.0,...|     11.17|10.837726535744764|
|[1.56,2.0,0.0,0.0...|      9.94|13.141887547230548|
|[1.71,2.0,0.0,0.0...|     10.27|13.589990399380374|
|[1.8,2.0,1.0,0.0,...|     12.43|11.7339322400