## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
from pyspark.sql import SparkSession

In [0]:
# File location and type
file_location = "/FileStore/tables/INPUTS_1000-3.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

EFF_AGE,ACT_AGE,LND_SQFOOT,TOT_LVG_AREA,S_LEGAL,CONST_CLASS,IMP_QUAL,JV,LND_VAL,NO_BULDNG,NCONST_VAL,DEL_VAL,SPEC_FEAT_VAL,MonthDifference,SALE_PRC1,Target_Var
23,23,20850,1453,LOTS 13 TO 18 INC BLK L 4TH AD,0,3,154349,13755,1,0,0,13661,5,205000,0
18,22,10000,1330,LOT 8 BLK 266 DELTONA LAKES UN,0,3,149271,20400,1,0,0,278,3,210000,0
7,30,10000,1395,LOT 9 BLK 322 DELTONA LAKES UN,0,3,169016,21200,1,0,0,0,5,199900,0
12,20,10000,1551,LOT 9 BLK 370 DELTONA LAKES UN,0,3,167252,21200,1,0,0,142,4,186000,0
42,42,10250,1702,LOT 7 BLK 366 DELTONA LAKES UN,0,3,130450,24600,1,0,0,897,4,219900,0
16,19,12180,1785,LOT 11 BLK 389 DELTONA LAKES U,0,3,183769,21408,1,0,0,0,2,243000,0
38,47,9625,1554,LOT 6 BLK 55 DELTONA LAKES UNI,0,3,133978,24640,1,0,0,87,0,128000,1
32,42,39060,1748,LOT 10 BLK 375 DELTONA LAKES U,0,3,175249,47709,1,0,0,83,1,239900,0
19,44,10000,1402,LOT 17 BLK 292 DELTONA LAKES U,0,3,170472,20400,1,0,0,9612,5,215000,0
34,44,14416,1948,LOT 17 BLK 304 DELTONA LAKES U,0,3,182709,24449,1,0,0,779,6,215000,0


In [0]:
# Create a view or table

temp_table_name = "INPUTS_1000"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `INPUTS_1000`

EFF_AGE,ACT_AGE,LND_SQFOOT,TOT_LVG_AREA,S_LEGAL,CONST_CLASS,IMP_QUAL,JV,LND_VAL,NO_BULDNG,NCONST_VAL,DEL_VAL,SPEC_FEAT_VAL,MonthDifference,SALE_PRC1,Target_Var
23,23,20850,1453,LOTS 13 TO 18 INC BLK L 4TH AD,0,3,154349,13755,1,0,0,13661,5,205000,0
18,22,10000,1330,LOT 8 BLK 266 DELTONA LAKES UN,0,3,149271,20400,1,0,0,278,3,210000,0
7,30,10000,1395,LOT 9 BLK 322 DELTONA LAKES UN,0,3,169016,21200,1,0,0,0,5,199900,0
12,20,10000,1551,LOT 9 BLK 370 DELTONA LAKES UN,0,3,167252,21200,1,0,0,142,4,186000,0
42,42,10250,1702,LOT 7 BLK 366 DELTONA LAKES UN,0,3,130450,24600,1,0,0,897,4,219900,0
16,19,12180,1785,LOT 11 BLK 389 DELTONA LAKES U,0,3,183769,21408,1,0,0,0,2,243000,0
38,47,9625,1554,LOT 6 BLK 55 DELTONA LAKES UNI,0,3,133978,24640,1,0,0,87,0,128000,1
32,42,39060,1748,LOT 10 BLK 375 DELTONA LAKES U,0,3,175249,47709,1,0,0,83,1,239900,0
19,44,10000,1402,LOT 17 BLK 292 DELTONA LAKES U,0,3,170472,20400,1,0,0,9612,5,215000,0
34,44,14416,1948,LOT 17 BLK 304 DELTONA LAKES U,0,3,182709,24449,1,0,0,779,6,215000,0


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "INPUTS_1000"

#df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
df.printSchema()

root
 |-- EFF_AGE: integer (nullable = true)
 |-- ACT_AGE: integer (nullable = true)
 |-- LND_SQFOOT: integer (nullable = true)
 |-- TOT_LVG_AREA: integer (nullable = true)
 |-- S_LEGAL: string (nullable = true)
 |-- CONST_CLASS: integer (nullable = true)
 |-- IMP_QUAL: integer (nullable = true)
 |-- JV: integer (nullable = true)
 |-- LND_VAL: integer (nullable = true)
 |-- NO_BULDNG: integer (nullable = true)
 |-- NCONST_VAL: integer (nullable = true)
 |-- DEL_VAL: integer (nullable = true)
 |-- SPEC_FEAT_VAL: integer (nullable = true)
 |-- MonthDifference: integer (nullable = true)
 |-- SALE_PRC1: integer (nullable = true)
 |-- Target_Var: integer (nullable = true)



In [0]:
# Import the required libraries

from pyspark.sql.functions import datediff,date_format,to_date,to_timestamp
import pyspark.sql.functions as f

In [0]:
#Converting CONST_CLASS and IMP_QUAL from int to string data types
df = df.withColumn('CONST_CLASS',df.CONST_CLASS.cast('string'))
df = df.withColumn('IMP_QUAL',df.IMP_QUAL.cast('string'))
        

In [0]:
df.printSchema()

root
 |-- EFF_AGE: integer (nullable = true)
 |-- ACT_AGE: integer (nullable = true)
 |-- LND_SQFOOT: integer (nullable = true)
 |-- TOT_LVG_AREA: integer (nullable = true)
 |-- S_LEGAL: string (nullable = true)
 |-- CONST_CLASS: string (nullable = true)
 |-- IMP_QUAL: string (nullable = true)
 |-- JV: integer (nullable = true)
 |-- LND_VAL: integer (nullable = true)
 |-- NO_BULDNG: integer (nullable = true)
 |-- NCONST_VAL: integer (nullable = true)
 |-- DEL_VAL: integer (nullable = true)
 |-- SPEC_FEAT_VAL: integer (nullable = true)
 |-- MonthDifference: integer (nullable = true)
 |-- SALE_PRC1: integer (nullable = true)
 |-- Target_Var: integer (nullable = true)



In [0]:
# Creating a 70-30 train test split
train_data, test_data = df.randomSplit([0.7,0.3])

### Building the Logistic Regression model

In [0]:
# Import the required libraries

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer ,OneHotEncoder
from pyspark.ml import Pipeline

In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data

s_legal_indexer = StringIndexer(inputCol='S_LEGAL',outputCol='s_legal_index',handleInvalid='keep')
imp_qual_indexer = StringIndexer(inputCol='IMP_QUAL',outputCol='imp_qual_index',handleInvalid='keep')
const_class_indexer = StringIndexer(inputCol='CONST_CLASS',outputCol='const_class_index',handleInvalid='keep')


In [0]:
# OneHotEncoderEstimator converts the indexed data into a vector which will be effectively handled by Logistic Regression model
data_encoder = OneHotEncoder(inputCols = ['s_legal_index','imp_qual_index','const_class_index'], outputCols = ['s_legal_vec','imp_qual_vec','const_class_vec'], handleInvalid = 'keep')

In [0]:
# Vector assembler is used to create a vector of input features
assembler = VectorAssembler(inputCols = ["s_legal_vec",'imp_qual_vec','const_class_vec'], outputCol = "features")

In [0]:
#Creating an object for the Logistic Regression model
lr_model = LogisticRegression(labelCol ='Target_Var')


In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data. It also 

pipe = Pipeline(stages = [s_legal_indexer, imp_qual_indexer, const_class_indexer,data_encoder, assembler, lr_model])



In [0]:

fit_model = pipe.fit(train_data)

In [0]:
# Storing the results in a dataframe

results = fit_model.transform(test_data)

In [0]:
results.select(['Target_Var','prediction']).show()

+----------+----------+
|Target_Var|prediction|
+----------+----------+
|         1|       0.0|
|         0|       0.0|
|         1|       0.0|
|         0|       0.0|
|         0|       0.0|
|         1|       0.0|
|         1|       1.0|
|         0|       0.0|
|         0|       0.0|
|         0|       0.0|
|         1|       0.0|
|         0|       0.0|
|         1|       0.0|
|         0|       0.0|
|         1|       1.0|
|         0|       0.0|
|         0|       0.0|
|         0|       0.0|
|         0|       0.0|
|         0|       0.0|
+----------+----------+
only showing top 20 rows



### Evaluating the model

d
#####  1. Area under the ROC

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Target_Var',metricName='areaUnderROC')

In [0]:
AUC = AUC_evaluator.evaluate(results)

In [0]:
print("The area under the curve is {}".format(AUC))

The area under the curve is 0.5191627832235257


#####  2. Area under the PR

In [0]:
PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Target_Var',metricName='areaUnderPR')

In [0]:
PR = PR_evaluator.evaluate(results)

In [0]:
print("The area under the PR curve is {}".format(PR))

The area under the PR curve is 0.2184751128174042


#####  3. Accuracy

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="Target_Var", predictionCol="prediction", metricName="accuracy")

In [0]:
accuracy = ACC_evaluator.evaluate(results)

In [0]:
print("The accuracy of the model is {}".format(accuracy))

The accuracy of the model is 0.8151815181518152


#####  4. Confusion Matrix

In [0]:
from sklearn.metrics import confusion_matrix

In [0]:
y_true = results.select("Target_Var")
y_true = y_true.toPandas()

y_pred = results.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print("Below is the confusion matrix \n {}".format(cnf_matrix))

Below is the confusion matrix 
 [[243  11]
 [ 45   4]]
