
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

Databricks Steps:

1. Create a Cluster
2. Upload Dataset and create a notebook from that


In [0]:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification

In [0]:
# File location and type
file_location = "/FileStore/tables/tips.csv"
file_type = "csv"

# # CSV options
# infer_schema = "false"
# first_row_is_header = "false"
# delimiter = ","

# # The applied options are for CSV files. For other file types, these will be ignored.
# df = spark.read.format(file_type) \
#   .option("inferSchema", infer_schema) \
#   .option("header", first_row_is_header) \
#   .option("sep", delimiter) \
#   .load(file_location)

# display(df)
df_pyspark = spark.read.csv(file_location, header=True, inferSchema=True)
df_pyspark.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [0]:
df_pyspark.columns

Out[3]: ['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [0]:
# Number of rows
df_pyspark.count()

Out[4]: 244

In [0]:
# Do Males or Females tip more?
df_pyspark.groupBy("sex").agg({"tip": "mean"}).show()

+------+------------------+
|   sex|          avg(tip)|
+------+------------------+
|Female| 2.833448275862069|
|  Male|3.0896178343949052|
+------+------------------+



In [0]:
from pyspark.sql.functions import desc

In [0]:
# What day receieves the highest bill, tip amount, and customer sizel on average?
df_pyspark.groupBy("day").avg().sort(desc("avg(total_bill)")).show()

+----+------------------+-----------------+------------------+
| day|   avg(total_bill)|         avg(tip)|         avg(size)|
+----+------------------+-----------------+------------------+
| Sun|21.410000000000004|3.255131578947369|2.8421052631578947|
| Sat|20.441379310344825|2.993103448275862|2.5172413793103448|
|Thur|17.682741935483865|2.771451612903226|2.4516129032258065|
| Fri|17.151578947368417|2.734736842105263|2.1052631578947367|
+----+------------------+-----------------+------------------+



In [0]:
df_pyspark.select(["total_bill", "smoker"]).orderBy(desc("total_bill")).show()

+----------+------+
|total_bill|smoker|
+----------+------+
|     50.81|   Yes|
|     48.33|    No|
|     48.27|    No|
|     48.17|    No|
|     45.35|   Yes|
|      44.3|   Yes|
|     43.11|   Yes|
|     41.19|    No|
|     40.55|   Yes|
|     40.17|   Yes|
|     39.42|    No|
|     38.73|   Yes|
|     38.07|    No|
|     38.01|   Yes|
|     35.83|    No|
|     35.26|    No|
|     34.83|    No|
|     34.81|    No|
|     34.65|   Yes|
|     34.63|   Yes|
+----------+------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import isnan, isnull, col

In [0]:
# Find null values in a specific column
df_pyspark.select(isnan('day')).show()

+----------+
|isnan(day)|
+----------+
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
|     false|
+----------+
only showing top 20 rows



In [0]:
# Find nan values for a whole dataframe
df_pyspark.select([isnan(i) | col(i).isNull() for i in df_pyspark.columns]).collect()[0]

Out[45]: Row((isnan(total_bill) OR (total_bill IS NULL))=False, (isnan(tip) OR (tip IS NULL))=False, (isnan(sex) OR (sex IS NULL))=False, (isnan(smoker) OR (smoker IS NULL))=False, (isnan(day) OR (day IS NULL))=False, (isnan(time) OR (time IS NULL))=False, (isnan(size) OR (size IS NULL))=False)

In [0]:
df_pyspark.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [0]:
# Given the tip, sex, smoker, day, time, and size (independent features) as inputs, create a ML model that learns to predict the total bill (dependent feature)

In [0]:
# Convert categorical features into numerical values 
from pyspark.ml.feature import StringIndexer

In [0]:
string_type_features = ["sex", "smoker", "day", "time"]
str_indexer = StringIndexer(
    inputCols=string_type_features,
    outputCols=[f"{col_name}_index" for col_name in string_type_features]
              )
str_indexer

Out[85]: StringIndexer_8daf21bc2d3c

In [0]:
df_pyspark_indexed = str_indexer.fit(df_pyspark).transform(df_pyspark)
df_pyspark_indexed.show()

+----------+----+------+------+---+------+----+---------+------------+---------+----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_index|smoker_index|day_index|time_index|
+----------+----+------+------+---+------+----+---------+------------+---------+----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|      1.0|         0.0|      1.0|       0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|      0.0|         0.0|      1.0|       0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|      0.0|         0.0|      1.0|       0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|      0.0|         0.0|      1.0|       0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|      1.0|         0.0|      1.0|       0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|      0.0|         0.0|      1.0|       0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|      0.0|         0.0|      1.0|       0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|      0.0|         0.0|      1.0|

In [0]:
df_pyspark_indexed.columns

Out[87]: ['total_bill',
 'tip',
 'sex',
 'smoker',
 'day',
 'time',
 'size',
 'sex_index',
 'smoker_index',
 'day_index',
 'time_index']

In [0]:
# Group the features together to supply them and the dependent feature to a VectorAssembler

from pyspark.ml.feature import VectorAssembler

In [0]:
vector_assembler = VectorAssembler(
    inputCols = ["tip", "sex_index", "smoker_index", "day_index", "time_index", "size"],
    outputCol = "IndependentFeatures"
)
vector_assembler

Out[89]: VectorAssembler_a4b28d191089

In [0]:
pyspark_features = vector_assembler.transform(df_pyspark_indexed).select(["IndependentFeatures", "total_bill"])
pyspark_features.show()

+--------------------+----------+
| IndependentFeatures|total_bill|
+--------------------+----------+
|[1.01,1.0,0.0,1.0...|     16.99|
|[1.66,0.0,0.0,1.0...|     10.34|
|[3.5,0.0,0.0,1.0,...|     21.01|
|[3.31,0.0,0.0,1.0...|     23.68|
|[3.61,1.0,0.0,1.0...|     24.59|
|[4.71,0.0,0.0,1.0...|     25.29|
|[2.0,0.0,0.0,1.0,...|      8.77|
|[3.12,0.0,0.0,1.0...|     26.88|
|[1.96,0.0,0.0,1.0...|     15.04|
|[3.23,0.0,0.0,1.0...|     14.78|
|[1.71,0.0,0.0,1.0...|     10.27|
|[5.0,1.0,0.0,1.0,...|     35.26|
|[1.57,0.0,0.0,1.0...|     15.42|
|[3.0,0.0,0.0,1.0,...|     18.43|
|[3.02,1.0,0.0,1.0...|     14.83|
|[3.92,0.0,0.0,1.0...|     21.58|
|[1.67,1.0,0.0,1.0...|     10.33|
|[3.71,0.0,0.0,1.0...|     16.29|
|[3.5,1.0,0.0,1.0,...|     16.97|
|(6,[0,5],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [0]:
# Split Data

train_data, test_data = pyspark_features.randomSplit(weights=[.7,.3])
train_data.show(), test_data.show()

+--------------------+----------+
| IndependentFeatures|total_bill|
+--------------------+----------+
|(6,[0,5],[1.25,2.0])|     10.07|
|(6,[0,5],[1.25,2.0])|     10.51|
|(6,[0,5],[1.47,2.0])|     10.77|
|(6,[0,5],[1.75,2.0])|     17.82|
|(6,[0,5],[1.97,2.0])|     12.02|
| (6,[0,5],[2.0,2.0])|     12.69|
| (6,[0,5],[2.0,2.0])|     13.37|
| (6,[0,5],[2.0,3.0])|     16.31|
|(6,[0,5],[2.01,2.0])|     20.23|
|(6,[0,5],[2.24,3.0])|     16.04|
|(6,[0,5],[2.31,3.0])|     18.69|
|(6,[0,5],[2.34,4.0])|     17.81|
| (6,[0,5],[2.5,4.0])|     18.35|
|(6,[0,5],[2.64,3.0])|     17.59|
| (6,[0,5],[3.0,4.0])|     20.45|
|(6,[0,5],[3.18,2.0])|     19.82|
|(6,[0,5],[3.27,2.0])|     17.78|
|(6,[0,5],[3.35,3.0])|     20.65|
|(6,[0,5],[3.39,2.0])|     11.61|
| (6,[0,5],[3.6,3.0])|     24.06|
+--------------------+----------+
only showing top 20 rows

+--------------------+----------+
| IndependentFeatures|total_bill|
+--------------------+----------+
|(6,[0,5],[1.45,2.0])|      9.55|
|(6,[0,5],[2.72,2.0])|

In [0]:
train_data.count(), test_data.count()

Out[99]: (179, 65)

In [0]:
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

In [0]:
model = LinearRegression(featuresCol="IndependentFeatures", labelCol="total_bill")
model

Out[105]: LinearRegression_e87eac14aa65

In [0]:
results = model.fit(train_data)

In [0]:
results.coefficients

Out[108]: DenseVector([3.6393, -1.2737, 2.1496, 0.2143, -0.794, 3.4614])

In [0]:
results.intercept

Out[110]: -0.30353258875047284

In [0]:
test_results = results.evaluate(test_data)
test_results.predictions.show()

+--------------------+----------+------------------+
| IndependentFeatures|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,5],[1.45,2.0])|      9.55|11.896272034862674|
|(6,[0,5],[2.72,2.0])|     13.28|16.518209320862518|
| (6,[0,5],[3.0,2.0])|      14.0|17.537219116201065|
|(6,[0,5],[3.15,3.0])|     20.08|21.544512026865977|
|(6,[0,5],[3.76,2.0])|     18.24|20.303102846405693|
|(6,[0,5],[6.73,4.0])|     48.27|  38.0346749304281|
|[1.17,0.0,1.0,0.0...|     32.83|13.026834439442707|
|[1.25,1.0,0.0,2.0...|      8.51| 9.529334618224246|
|[1.5,0.0,1.0,1.0,...|     15.69|14.442129996911104|
|[1.5,1.0,0.0,0.0,...|     26.41|10.804565530500954|
|[1.5,1.0,0.0,2.0,...|     10.65|10.439164792633663|
|[1.5,1.0,0.0,2.0,...|     11.17|10.439164792633663|
|[1.56,0.0,0.0,1.0...|      9.94|12.510917038850785|
|[1.64,0.0,1.0,0.0...|     15.36|14.737315167332412|
|[1.67,1.0,0.0,1.0...|     10.33|15.098964582366586|
|[1.73,0.0,0.0,2.0...|      9.78|12.5498810923

In [0]:
test_results.predictions.summary().show()

+-------+------------------+------------------+
|summary|        total_bill|        prediction|
+-------+------------------+------------------+
|  count|                65|                65|
|   mean|21.417230769230766|21.741891818415244|
| stddev| 8.868048775606646| 8.150977835804948|
|    min|              7.25| 9.529334618224246|
|    25%|             14.83|15.279461320491764|
|    50%|             20.69|21.099713885275687|
|    75%|             26.41|26.609580928158174|
|    max|             48.27| 44.48288418367021|
+-------+------------------+------------------+



In [0]:
# Regression Performance Metrics
test_results.r2, test_results.meanAbsoluteError, test_results.meanSquaredError, test_results.rootMeanSquaredError

Out[128]: (0.4003754068024289, 4.910609995527969, 46.4303759770008, 6.813983855058713)

In [0]:
# model.save("/FileStore/models/model_ex.pkl")

results.save("ex_model.pkl")

# results.save("ex1_model.model")

In [0]:
# Load model back in 
new_model = LinearRegressionModel.load("/ex_model.pkl")

In [0]:
# Perform inference with the new loaded model
new_model.evaluate(test_data).predictions.show()

+--------------------+----------+------------------+
| IndependentFeatures|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,5],[1.45,2.0])|      9.55|11.896272034862674|
|(6,[0,5],[2.72,2.0])|     13.28|16.518209320862518|
| (6,[0,5],[3.0,2.0])|      14.0|17.537219116201065|
|(6,[0,5],[3.15,3.0])|     20.08|21.544512026865977|
|(6,[0,5],[3.76,2.0])|     18.24|20.303102846405693|
|(6,[0,5],[6.73,4.0])|     48.27|  38.0346749304281|
|[1.17,0.0,1.0,0.0...|     32.83|13.026834439442707|
|[1.25,1.0,0.0,2.0...|      8.51| 9.529334618224246|
|[1.5,0.0,1.0,1.0,...|     15.69|14.442129996911104|
|[1.5,1.0,0.0,0.0,...|     26.41|10.804565530500954|
|[1.5,1.0,0.0,2.0,...|     10.65|10.439164792633663|
|[1.5,1.0,0.0,2.0,...|     11.17|10.439164792633663|
|[1.56,0.0,0.0,1.0...|      9.94|12.510917038850785|
|[1.64,0.0,1.0,0.0...|     15.36|14.737315167332412|
|[1.67,1.0,0.0,1.0...|     10.33|15.098964582366586|
|[1.73,0.0,0.0,2.0...|      9.78|12.5498810923

In [0]:
# # Create a view or table

# temp_table_name = "tips_csv"

# df.createOrReplaceTempView(temp_table_name)

In [0]:
# %sql

# /* Query the created temp table in a SQL cell */

# select * from `tips_csv`

In [0]:
# # With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# # Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# # To do so, choose your table name and uncomment the bottom line.

# permanent_table_name = "tips_csv"

# # df.write.format("parquet").saveAsTable(permanent_table_name)