In [None]:
import pyspark
from pyspark.sql.types import DoubleType, StringType, StructField, StructType, IntegerType
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import StringIndexer, VectorAssembler
from distutils.version import LooseVersion
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [None]:
schema = schema = StructType([
    StructField('Date', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('MinTemp', DoubleType(), True),
    StructField('MaxTemp', DoubleType(), True),
    StructField('Rainfall', DoubleType(), True),
    StructField('Evaporation', DoubleType(), True),
    StructField('Sunshine', DoubleType(), True),
    StructField('WindGustDir', StringType(), True),
    StructField('WindGustSpeed', IntegerType(), True),
    StructField('WindDir9am', StringType(), True),
    StructField('WindDir3pm', StringType(), True),
    StructField('WindSpeed9am', IntegerType(), True),
    StructField('WindSpeed3pm', IntegerType(), True),
    StructField('Humidity9am', IntegerType(), True),
    StructField('Humidity3pm', IntegerType(), True),
    StructField('Pressure9am', DoubleType(), True),
    StructField('Pressure3pm', DoubleType(), True),
    StructField('Cloud9am', IntegerType(), True),
    StructField('Cloud3pm', IntegerType(), True),
    StructField('Temp9am', DoubleType(), True),
    StructField('Temp3pm', DoubleType(), True),
    StructField('RainToday', StringType(), True),
    StructField('RainTomorrow', StringType(), True)
])


indata = spark.read.csv('weatherAUS.csv', header=True, schema=schema)
dataset = indata.na.drop('any')

In [None]:
#Step 1
(trainingData, testingData) = dataset.randomSplit([0.8,0.2], seed=12345)

In [None]:
#Step 2
#cat_colmn = ['Location', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindGustSpeed', 'WindDir9am', 'WindDir3pm', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RainToday', 'RainTomorrow']
categorical_columns = ['MinTemp', 'MaxTemp']#, 'Rainfall', 'Evaporation']#, 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm']
stage=[]
for cat_col in categorical_columns:
    stringIndexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + "Index")
    if LooseVersion(pyspark. __version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCols()], outputCols=[cat_col+ 'classVec'])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[cat_col + 'classVec'])
        #print(cat_col)
        #print(encoder)
    stage += [stringIndexer, encoder]

In [None]:
label_stringIdx = StringIndexer(inputCol='RainTomorrow', outputCol='label')
stage += [label_stringIdx]

In [None]:
numericCols = ['MinTemp', 'MaxTemp']#, 'Rainfall', 'Evaporation']#, 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm']
asemblerInput = [c + 'classVec' for c in categorical_columns] + numericCols
asembler = VectorAssembler(inputCols=asemblerInput, outputCol='features')
stage += [asembler]

In [None]:
partialPipeline = Pipeline().setStages(stage)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [None]:
val schema = new StructType().add("features", new VectorUDT())

In [None]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol='MinTemp', maxDepth=3)
dtModel = dt.fit(trainingData)

In [None]:
display(dtModel)

In [None]:
preditions = dtModel.transform(testData)

In [None]:
predictions.printSchema()

In [None]:
selected = preditions.select('MinTemp', 'MaxTemp')
display(selected)

In [None]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [None]:
#Step 3
paramGrid = (ParamGridBuilder().addGrid(dt.maxDepth, [3,5,7])
                               .addGrid(dt.maxBins, [20, 40, 80])
                               .addGrid(dt.minInfoGain, [0.0,0.2,0.4])
                               .addGrid(dt.impurity)

In [None]:
#Step 4
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)
cvModel = cv.fit(trainingData)

In [None]:
predictions = cvModel.transform(testData)

In [None]:
#Step 5
pipeline = Pipeline(stages=[label_stringIdx,dt])

In [None]:
#Step 6
pipeline.printSchema()