In [1]:
#Step 1: Import and Read Data + Convert to DF
clean_mc = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/clean_mc/export__1_-d35a2.csv')

clean_mc.printSchema()

In [2]:
clean_mc.createOrReplaceTempView("clean_mc")

In [3]:
%sql DROP TABLE IF EXISTS trans

In [4]:
%sql
CREATE TABLE trans
FROM clean_mc

In [5]:
dataset = spark.table("trans")
cols = dataset.columns
display(dataset)

In [6]:
spark.range(1).createOrReplaceTempView("dataset")
type(spark.table("dataset"))

In [7]:
from pyspark.sql.functions import trim
from pyspark.sql.functions import split
df = dataset.withColumn("purch_location", trim(dataset.purch_location))
df.show(5)


In [8]:
#Remove ON from purch_location and null values and any other weird values split purch_location in order to separate unnecessary string, will delete in future
from pyspark.sql.functions import split
splitcol = split(df['purch_location'], ' ')
df1 = df.withColumn('purch_loc', splitcol.getItem(0))

In [9]:
#Generate a list of common cities traveled to in Canada, if column1 does not equal any string from that list then state it as GTA
#Above failed, replaced blank spots with GTA (Will work better for prediction purposes to have several values in one column)
from pyspark.sql.functions import col, when

def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise('GTA')

df2 = df1.withColumn("purch_loc", blank_as_null("purch_loc"))

#Dropping uneccesary columns
drop_list = ['purch_location', 'column2', 'id', 'purch_date']
df3=df2.select([column for column in df2.columns if column not in drop_list])
df3.show(5)

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
# categorical columns
categorical_columns = ["purch_class", "serv-provider", "purch_loc"]

In [11]:
#Attempt 2 String Indexing, One Hot Encoding and Vector Assembling
stages = []
for catCol in categorical_columns:
  stringIndexer = StringIndexer(inputCol=catCol, outputCol=catCol+"Index")
  
  encoder = OneHotEncoder(inputCol=catCol+"Index", outputCol=catCol+"classVec")
  
  stages += [stringIndexer, encoder]


In [12]:
label_stringIdx = StringIndexer(inputCol = "y", outputCol = "label")
stages += [label_stringIdx]

In [13]:
numCols = ["tot_amt"]
assemblerInputs = map(lambda c: c + "classVec", categorical_columns) + numCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [14]:
pipeline = Pipeline(stages=stages)
cols=df3.columns

pipeline_model = pipeline.fit(df3)
final_columns = cols + ['features', 'label']
df_p = pipeline_model.transform(df3).\
            select(final_columns)

display(df_p)

In [15]:
training, test = df_p.randomSplit([0.8, 0.2], seed=123)


In [16]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=4)

# Train model with Training Data
dtModel = dt.fit(training)
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

In [17]:
predictions = dtModel.transform(test)
selected = predictions.select("label", "prediction", "probability", "tot_amt", "purch_class")
display(selected)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

dt.getImpurity()


In [19]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,3,4])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [20]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(training)

In [21]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [22]:
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)


In [23]:
selected = predictions.select("label", "prediction", "probability", "tot_amt")
display(selected)