In [1]:
from pyspark.sql import Row 
from pyspark.sql.types import *
import pandas as pdd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
import warnings
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [2]:
data = spark.read.csv("/FileStore/tables/train.csv",header=True, inferSchema=True)
data.show(1)

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window().orderBy(year(data.Dates))
df_imb = data.withColumn("idx", row_number().over(win))

In [4]:
data.count()

In [5]:
c=df_imb.select("idx","Category")

In [6]:
df_imbbb=c.groupBy("idx").pivot("Category").count().fillna(0)

In [7]:
df_col=df_imbbb.columns
df_col

In [8]:
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit , CrossValidator

In [9]:
from pyspark.sql.functions import *
#df2 = df_imb.select(year(data.Dates).alias("Year"),month(data.Dates).alias("Month"),weekofyear(data.Dates).alias('dt_week_no'),"Category",minute(df_imb.Dates).alias("Minute"),unix_timestamp(df_imb.Dates).alias('dt_int').cast("double"),hour(df_imb.Dates).alias("Hour"),"X","Y")
#df2.show(5)
#,"DayOfWeek",,month(data.Dates).alias("Month"),, weekofyear(data.Dates).alias('dt_week_no'),,year(data.Dates).alias("Year"),year(df_imb.Dates).alias("Year"),"Address","DayOfWeek","PdDistrict","Address","DayOfWeek",month(df_imb.Dates).alias("Month"),
df2 = df_imb.select(minute(df_imb.Dates).alias("Minute"),unix_timestamp(df_imb.Dates).alias('dt_int').cast("double"),hour(df_imb.Dates).alias("Hour"),"PdDistrict",df_imb.X.cast("double"), df_imb.Y.cast("double"),"Category")

In [10]:
encoding_var = [i[0] for i in df2.dtypes if (i[1]=='string') & (i[0]!='Category')]
print("encoding_var:",encoding_var)

num_var = [i[0] for i in df2.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='Category')]
print("num_var:",num_var)

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import IndexToString

string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]

onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]

label_indexes = StringIndexer(inputCol = 'Category', outputCol = 'label',handleInvalid = 'keep')
labels = label_indexes.fit(df2).labels

label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labels)

assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")

In [12]:
train_data, test_data = df2.randomSplit([.8,.2],seed=1234)

In [13]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder , CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import TrainValidationSplitModel
from pyspark.sql import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import  DecisionTreeClassifier


rf = RandomForestClassifier(labelCol="label", featuresCol="features")
pipe_rf = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes,rf,label_converter])

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')
pipe_lr = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes,lr,label_converter])

In [14]:
rf1 = RandomForestClassifier()

print(rf1.explainParams())

In [15]:
lr1 = LogisticRegression()

print(lr1.explainParams())

In [16]:
params_rf = ParamGridBuilder().addGrid(rf.cacheNodeIds, [True]).addGrid(rf.maxDepth, [5]).addGrid(rf.minInfoGain, [0.001]).addGrid(rf.minInstancesPerNode, [3]).addGrid(rf.numTrees, [100]).build()
crossval_rf = CrossValidator(estimator=pipe_rf,estimatorParamMaps=params_rf,evaluator=BinaryClassificationEvaluator(),numFolds=3)



params_lr = ParamGridBuilder().addGrid(lr.elasticNetParam, [0.0]).addGrid(lr.fitIntercept, [True]).addGrid(lr.maxIter, [100]).addGrid(lr.regParam,[0.3]).build()
crossval_lr = CrossValidator(estimator=pipe_lr,estimatorParamMaps=params_lr,evaluator=BinaryClassificationEvaluator(),numFolds=3)
                          
                   

In [17]:
mod_rf= crossval_rf.fit(train_data)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions_rf = mod_rf.transform(test_data)
evaluator_rf = BinaryClassificationEvaluator()
evaluator_rf.evaluate(predictions_rf)

In [19]:
from pyspark.ml.pipeline import PipelineModel

best_rf_model = mod_rf.bestModel
best_rf_model.write().overwrite().save("/to_path/sparkml/best_rf_model")

model_rf_1 = PipelineModel.load("/to_path/sparkml/best_rf_model")
model_rf_2 = TrainValidationSplitModel(model_rf_1)

best_model_rf=model_rf_2.transform(test_data)

best_evaluator_rf = BinaryClassificationEvaluator()
best_evaluator_rf.evaluate(best_model_rf)

In [20]:
mod_lr = crossval_lr.fit(train_data)

In [21]:
predictions_lr = mod_lr.transform(test_data)
evaluator_lr = BinaryClassificationEvaluator()
evaluator_lr.evaluate(predictions_lr)

In [22]:
Test = spark.read.csv("/FileStore/tables/test.csv",header=True, inferSchema=True)
Test.show(5)

In [23]:
#year(Test.Dates).alias("Year"),Test.Dates).alias("Month")(month,"Address","DayOfWeek"
df3 = Test.select("Id",minute(Test.Dates).alias("Minute"),unix_timestamp(Test.Dates).alias('dt_int').cast("double"),hour(Test.Dates).alias("Hour"),"PdDistrict",Test.X.cast("double"), Test.Y.cast("double"))
df3.show(5)

In [24]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window().orderBy("Hour")
df4 = df3.withColumn("idx", row_number().over(win))
df4.show(2)

In [25]:
dff4=df4.select("Id","idx")
dff4.show(5)

In [26]:
df3=df3.drop("Id")

In [27]:
#Kaggle Submission

In [28]:
p=mod_rf.transform(df3)
p.show(1)

In [29]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window().orderBy("Hour")
df5 = p.withColumn("idx", row_number().over(win))
df5.show(2)

In [30]:
df6=df5.select("idx","predictedLabel")
df6.show(2)

In [31]:
df7=df6.groupBy("idx").pivot("predictedLabel").count().fillna(0)
df7.show(5)

In [32]:
unique_list=[]
for k in df_col:
  if k not in df7.columns:
    unique_list.append(k)       
            
unique_list

In [33]:
for i in unique_list:
  df7=df7.withColumn(i,lit(0))
df7.show(2)

In [34]:
df7=df7.select(*[df_col])

In [35]:
examp = dff4.join(df7,on ="idx")
final_dataset=examp.drop("idx")
final_dataset=final_dataset.orderBy("Id")


In [36]:
final_dataset.show(2)

In [37]:
print((final_dataset.count(), len(final_dataset.columns)))

In [38]:
final_dataset.where(col("Id")==882000).show()

In [39]:
dbutils.fs.rm("/FileStore/df",True)

In [40]:
final_dataset.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/df/final_dataset.csv")