## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/CASE_STUDY_DATA_LATE_DELIVERY_ROOT_CAUSE-8a2bf.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
# df = df.drop('_c0')
display(df)

_c0,order_id,country,shipping_method,units_per_order,facility,product_category,on_sale,transit_days,datetime_ordered,datetime_sourced,datetime_product_ready,datetime_planned,deadline_source,deadline_make,deadline_deliver,delivered_to_plan
0,E00000001,UNITED KINGDOM,Ground,1,OXFORD,ACCESSORIES,Y,2,2016-07-03 03:07:29,2016-07-03 04:09:49,2016-07-06 00:59:42,2016-07-08 00:00:00,1612.0,38,3,PASS
1,E00000002,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,N,3,2016-07-03 00:08:43,2016-07-03 02:16:24,2016-07-03 07:17:04,2016-07-06 00:00:00,531.0,8,3,PASS
2,E00000003,FRANCE,Ground,1,ANTWERP,TOPS,Y,3,2016-07-03 00:36:00,2016-07-03 02:16:18,2016-07-03 06:16:57,2016-07-06 00:00:00,504.0,8,3,PASS
3,E00000004,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,Y,5,2016-07-03 00:47:45,2016-07-03 02:16:18,2016-07-03 06:16:55,2016-07-07 00:00:00,492.0,8,5,PASS
4,E00000005,UNITED KINGDOM,Next Day,1,OXFORD,JACKETS & VESTS,Y,1,2016-07-03 03:52:13,2016-07-03 07:56:33,2016-07-05 08:53:19,2016-07-06 00:00:00,1567.0,6,1,PASS
5,E00000006,UNITED KINGDOM,Ground,1,MANCHESTER,JACKETS & VESTS,Y,1,2016-07-03 04:15:07,2016-07-03 07:56:53,2016-07-04 19:04:46,2016-07-07 00:00:00,1544.0,34,2,PASS
6,E00000007,UNITED KINGDOM,Ground,1,OXFORD,TOPS,N,2,2016-07-03 04:21:14,2016-07-05 16:44:24,2016-07-06 18:38:55,2016-07-08 00:00:00,1538.0,25,5,PASS
7,E00000008,UNITED KINGDOM,Ground,1,OXFORD,JACKETS & VESTS,Y,2,2016-07-03 04:28:20,2016-07-03 07:46:47,2016-07-05 13:38:41,2016-07-08 00:00:00,1531.0,31,2,PASS
8,E00000009,FRANCE,Ground,1,ANTWERP,TOPS,Y,4,2016-07-03 01:29:43,2016-07-03 03:17:01,2016-07-03 08:16:25,2016-07-06 00:00:00,450.0,7,4,PASS
9,E00000010,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,Y,3,2016-07-03 01:39:28,2016-07-03 04:16:31,2016-07-03 06:17:19,2016-07-06 00:00:00,440.0,6,3,PASS


In [3]:
# Create a view or table

temp_table_name = "CASE_STUDY_DATA_LATE_DELIVERY_ROOT_CAUSE"

df.createOrReplaceTempView(temp_table_name)

In [4]:
%sql

/* Query the created temp table in a SQL cell */

select * from `CASE_STUDY_DATA_LATE_DELIVERY_ROOT_CAUSE`

_c0,order_id,country,shipping_method,units_per_order,facility,product_category,on_sale,transit_days,datetime_ordered,datetime_sourced,datetime_product_ready,datetime_planned,deadline_source,deadline_make,deadline_deliver,delivered_to_plan
0,E00000001,UNITED KINGDOM,Ground,1,OXFORD,ACCESSORIES,Y,2,2016-07-03 03:07:29,2016-07-03 04:09:49,2016-07-06 00:59:42,2016-07-08 00:00:00,1612.0,38,3,PASS
1,E00000002,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,N,3,2016-07-03 00:08:43,2016-07-03 02:16:24,2016-07-03 07:17:04,2016-07-06 00:00:00,531.0,8,3,PASS
2,E00000003,FRANCE,Ground,1,ANTWERP,TOPS,Y,3,2016-07-03 00:36:00,2016-07-03 02:16:18,2016-07-03 06:16:57,2016-07-06 00:00:00,504.0,8,3,PASS
3,E00000004,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,Y,5,2016-07-03 00:47:45,2016-07-03 02:16:18,2016-07-03 06:16:55,2016-07-07 00:00:00,492.0,8,5,PASS
4,E00000005,UNITED KINGDOM,Next Day,1,OXFORD,JACKETS & VESTS,Y,1,2016-07-03 03:52:13,2016-07-03 07:56:33,2016-07-05 08:53:19,2016-07-06 00:00:00,1567.0,6,1,PASS
5,E00000006,UNITED KINGDOM,Ground,1,MANCHESTER,JACKETS & VESTS,Y,1,2016-07-03 04:15:07,2016-07-03 07:56:53,2016-07-04 19:04:46,2016-07-07 00:00:00,1544.0,34,2,PASS
6,E00000007,UNITED KINGDOM,Ground,1,OXFORD,TOPS,N,2,2016-07-03 04:21:14,2016-07-05 16:44:24,2016-07-06 18:38:55,2016-07-08 00:00:00,1538.0,25,5,PASS
7,E00000008,UNITED KINGDOM,Ground,1,OXFORD,JACKETS & VESTS,Y,2,2016-07-03 04:28:20,2016-07-03 07:46:47,2016-07-05 13:38:41,2016-07-08 00:00:00,1531.0,31,2,PASS
8,E00000009,FRANCE,Ground,1,ANTWERP,TOPS,Y,4,2016-07-03 01:29:43,2016-07-03 03:17:01,2016-07-03 08:16:25,2016-07-06 00:00:00,450.0,7,4,PASS
9,E00000010,FRANCE,Ground,1,ANTWERP,JACKETS & VESTS,Y,3,2016-07-03 01:39:28,2016-07-03 04:16:31,2016-07-03 06:17:19,2016-07-06 00:00:00,440.0,6,3,PASS


In [5]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "CASE_STUDY_DATA_LATE_DELIVERY_ROOT_CAUSE_original"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [6]:
import pyspark
from pyspark import keyword_only
import pyspark.sql.functions as F
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DateType

In [7]:
df = df.dropna()


In [8]:
#splitting the timestamps into time and date
split_col = pyspark.sql.functions.split(df['datetime_ordered'], ' ')
df_1 = df.withColumn('Date_ordered', split_col.getItem(0))
df_2 = df.withColumn('Time_ordered', split_col.getItem(1))

split_date=pyspark.sql.functions.split(df_1['Date_ordered'], '-')     
df_1= df_1.withColumn('Year_ordered', split_date.getItem(0))
df_1= df_1.withColumn('Month_ordered', split_date.getItem(1))
df_1= df_1.withColumn('Day_ordered', split_date.getItem(2))
                  
split_time=pyspark.sql.functions.split(df_2['Time_ordered'], ':')
df_2= df_2.withColumn('Hour_ordered', split_time.getItem(0))
df_2= df_2.withColumn('Minutes_ordered', split_time.getItem(1))
df_2= df_2.withColumn('Sec_ordered', split_time.getItem(2))

#splitting the timestamps into time and date
split_col = pyspark.sql.functions.split(df['datetime_sourced'], ' ')
df_3 = df.withColumn('Date_sourced', split_col.getItem(0))
df_4 = df.withColumn('Time_sourced', split_col.getItem(1))
                  
split_date=pyspark.sql.functions.split(df_3['Date_sourced'], '-')     
df_3= df_3.withColumn('Year_sourced', split_date.getItem(0))
df_3= df_3.withColumn('Month_sourced', split_date.getItem(1))
df_3= df_3.withColumn('Day_sourced', split_date.getItem(2)) 

split_time=pyspark.sql.functions.split(df_4['Time_sourced'], ':')     
df_4= df_4.withColumn('Hour_sourced', split_time.getItem(0))
df_4= df_4.withColumn('Minutes_sourced', split_time.getItem(1))
df_4= df_4.withColumn('Sec_sourced', split_time.getItem(2))  

#splitting the timestamps into time and date
split_col = pyspark.sql.functions.split(df['datetime_product_ready'], ' ')
df_5 = df.withColumn('Date_product_ready', split_col.getItem(0))
df_6 = df.withColumn('Time_product_ready', split_col.getItem(1))
                  
split_date=pyspark.sql.functions.split(df_5['Date_product_ready'], '-')     
df_5= df_5.withColumn('Year_product_ready', split_date.getItem(0))
df_5= df_5.withColumn('Month_product_ready', split_date.getItem(1))
df_5= df_5.withColumn('Day_product_ready', split_date.getItem(2))

split_time=pyspark.sql.functions.split(df_6['Time_product_ready'], ':')     
df_6= df_6.withColumn('Hour_product_ready', split_time.getItem(0))
df_6= df_6.withColumn('Minutes_product_ready', split_time.getItem(1))
df_6= df_6.withColumn('Sec_product_ready', split_time.getItem(2))  

#splitting the timestamps into time and date
split_col = pyspark.sql.functions.split(df['datetime_planned'], ' ')
df_7 = df.withColumn('Date_planned', split_col.getItem(0))
df_8 = df.withColumn('Time_planned', split_col.getItem(1))
                  
split_date=pyspark.sql.functions.split(df_7['Date_planned'], '-')     
df_7= df_7.withColumn('Year_planned', split_date.getItem(0))
df_7= df_7.withColumn('Month_planned', split_date.getItem(1))
df_7= df_7.withColumn('Day_planned', split_date.getItem(2))   

split_time=pyspark.sql.functions.split(df_8['Time_planned'], ':')     
df_8= df_8.withColumn('Hour_planned', split_time.getItem(0))
df_8= df_8.withColumn('Minutes_planned', split_time.getItem(1))
df_8= df_8.withColumn('Sec_planned', split_time.getItem(2))  

In [9]:
#merging it all to a final df

df_11 = df_1.join(df_2, ["_c0"])

df_22 = df_11.join(df_3,  ["_c0"])

df_33 = df_22.join(df_4, ["_c0"])

df_44 = df_33.join(df_5, ["_c0"])

df_55 = df_44.join(df_6, ["_c0"])

df_66 = df_55.join(df_7, ["_c0"])

df_temp = df_66.join(df_8, ["_c0"])

In [10]:
df_temp = df_temp.drop("order_id","country","shipping_method", "units_per_order", "facility", "product_category", "on_sale", "transit_days", "datetime_ordered", "datetime_sourced", "datetime_product_ready", "datetime_planned","deadline_source", "deadline_make", "deadline_deliver", "delivered_to_plan")

df_final = df.join(df_temp, ["_c0"])
df_final = df_final.drop("datetime_ordered", "datetime_sourced", "datetime_product_ready", "datetime_planned")
df_final = df_final.drop("Date_ordered", "Date_sourced", "Date_product_ready", "Date_planned", "Time_ordered", "Time_sourced", "Time_planned", "Time_product_ready")#contains 21 columns -- final dataframe for training

In [11]:
import pandas as pd
# df_11_pandas = df_final.toPandas()
# print(df_11_pandas)
print((df_final))


In [12]:
df_final = df_final.drop("_c0")
# df_final = df_final.drop("order_id")
# df_final = df_final.drop("delivered_to_plan")

training= df_final.withColumn("country" ,df_final[ "country" ].cast("string")).withColumn("shipping_method" ,df_final[ "shipping_method" ].cast("string")).withColumn("facility" ,df_final[ "facility" ].cast("string")).withColumn("product_category" ,df_final[ "product_category" ].cast("string")).withColumn("on_sale" ,df_final[ "on_sale" ].cast("string")).withColumn("Hour_ordered" ,df_final[ "Hour_ordered" ].cast("float")).withColumn("Minutes_ordered" ,df_final[ "Minutes_ordered" ].cast("float")).withColumn("Sec_ordered",df_final["Sec_ordered"].cast("float")).withColumn("Day_ordered", df_final["Day_ordered"].cast("float")).withColumn("Month_ordered", df_final["Month_ordered"].cast("float")).withColumn("Year_ordered", df_final["Year_ordered"].cast("float")).withColumn("Hour_sourced" ,df_final[ "Hour_sourced" ].cast("float")).withColumn("Minutes_sourced" ,df_final[ "Minutes_sourced" ].cast("float")).withColumn("Sec_sourced",df_final["Sec_sourced"].cast("float")).withColumn("Day_sourced", df_final["Day_sourced"].cast("float")).withColumn("Month_sourced", df_final["Month_sourced"].cast("float")).withColumn("Year_sourced", df_final["Year_sourced"].cast("float")).withColumn("Hour_product_ready" ,df_final[ "Hour_product_ready" ].cast("float")).withColumn("Minutes_product_ready" ,df_final[ "Minutes_product_ready" ].cast("float")).withColumn("Sec_product_ready",df_final["Sec_product_ready"].cast("float")).withColumn("Day_product_ready", df_final["Day_product_ready"].cast("float")).withColumn("Month_product_ready", df_final["Month_product_ready"].cast("float")).withColumn("Year_product_ready", df_final["Year_product_ready"].cast("float")).withColumn("Hour_planned" ,df_final[ "Hour_planned" ].cast("float")).withColumn("Minutes_planned" ,df_final[ "Minutes_planned" ].cast("float")).withColumn("Sec_planned",df_final["Sec_planned"].cast("float")).withColumn("Day_planned", df_final["Day_planned"].cast("float")).withColumn("Month_planned", df_final["Month_planned"].cast("float")).withColumn("Year_planned", df_final["Year_planned"].cast("float")).withColumn("transit_days" ,df_final[ "transit_days" ].cast("float")).withColumn("units_per_order" ,df_final[ "units_per_order" ].cast("float")).withColumn("deadline_source" ,df_final[ "deadline_source" ].cast("float")).withColumn("deadline_make" ,df_final[ "deadline_make" ].cast("float")).withColumn("deadline_deliver" ,df_final[ "deadline_deliver" ].cast("float"))

In [13]:
# let's have a look at the distribution of our target variable:
# to make it look better, we first convert our spark df to a Pandas

import matplotlib.pyplot as plt
import seaborn as sns
# %matplotlib inline

df_pd = training.toPandas()
print(len(df_pd))
plt.figure(figsize=(12,10))
# print(df_pd)

sns.countplot(x='delivered_to_plan', data=df_pd, order=df_pd['delivered_to_plan'].value_counts().index)

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [15]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
# featureIndexer =VectorIndexer(inputCol= "countryIndex" , outputCol="indexedFeatures", maxCategories=10).fit(indexed)
# output = assembler.transform(training)
# now let's see how many categorical and numerical features we have:

cat_cols_1 = [item[0] for item in training.dtypes if item[1].startswith('string')] 
print(str(len(cat_cols_1)) + '  categorical features')
print(cat_cols_1)

num_cols = [item[0] for item in training.dtypes if item[1].startswith('float') | item[1].startswith('int')][1:]

print(str(len(num_cols)) + '  numerical features')
# print(cat_cols)
print(training.columns)

In [16]:
# we use the OneHotEncoderEstimator from MLlib in spark to convert #each v=categorical feature into one-hot vectors
# next, we use VectorAssembler to combine the resulted one-hot ector #and the rest of numerical features into a 
# single vector column. we append every step of the process in a #stages array
# import org.apache.spark.ml.feature.OneHotEncoder, StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder
stages = []

cat_cols = ['country', 'shipping_method', 'facility', 'product_category', 'on_sale']
for categoricalCol in cat_cols:
  if (categoricalCol != 'delivered_to_plan'):
     indexer = StringIndexer().setInputCol(categoricalCol).setOutputCol(categoricalCol  + "Index").fit(training)
     training = indexer.transform(training)
#    training.show  
     encoder = OneHotEncoder().setInputCol(categoricalCol  + "Index").setOutputCol(categoricalCol + "classVec")
     training = encoder.transform(training)
# training.show
  
assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

training = assembler.transform(training)
training.show
print(training)

In [17]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="delivered_to_plan", outputCol="label").fit(training)
print(labelIndexer)


In [18]:
# we use a pipeline to apply all the stages of transformation

from pyspark.ml import Pipeline

cols = training.columns

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = training.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=200)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)


In [19]:
import pandas as pd
from pyspark.sql import SQLContext
print (sc)
df = pd.read_csv("test_1234.csv")
print (type(df))
# print (df)
sqlCtx = SQLContext(sc)
# sqlCtx.createDataFrame(df).show()

In [20]:
## Convert into Spark DataFrame
spark_df = spark.createDataFrame(df)
print(spark_df.columns)
spark_df = spark_df.drop('Unnamed: 0')
# print(spark_df)
spark_df.write.mode("overwrite").saveAsTable("table_name")

In [21]:
# Select example rows to display.
predictions.select("predictedLabel", "delivered_to_plan", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % ( accuracy))

rfModel = model.stages[1]
print(rfModel)  # summary only


In [22]:
selected = predictions.select(["label","predictedLabel"])
selected.dtypes
# selected.write.csv("rf_model_output_1.csv")

In [23]:
#finding the area under the ROC curve
from pyspark.ml.evaluation import *
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [24]:
# from dbmlModelExport import ModelExport
# for modelName in ModelExport.supportedModels:
#     print(modelName)

In [25]:
# Can remove an old model file, if needed.
# dbutils.fs.rm( "/tmp/ml_python_model_export/pipeline", recurse=True)

In [26]:
#for saving model
sparkTransformed = model.transform(training)

In [27]:
%sh 
rm -rf /tmp/mleap_python_model_export
mkdir /tmp/mleap_python_model_export

In [28]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

model.serializeToBundle("jar:file:/tmp/mleap_python_model_export/1pipeline-json.json", sparkTransformed)

In [29]:
dbutils.fs.cp("file:/tmp/mleap_python_model_export/1pipeline-json.json", "dbfs:/example/1pipeline-json.json")
display(dbutils.fs.ls("dbfs:/example"))

dbutils.fs.cp( "dbfs:/example/1pipeline-json.json", "dbfs:/FileStore/pipeline.json")

path,name,size
dbfs:/example/1pipeline-json.json,1pipeline-json.json,208840
dbfs:/example/20news_pipeline-json.zip,20news_pipeline-json.zip,208840


In [30]:
#visualising the ROC curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = predictions.select(['probability', 'label'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

In [31]:
#visualising the ROC curve
from sklearn.metrics import roc_curve, auc
 
fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)
 
# %matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()