In [None]:
from functions import *
from datetime import datetime


model_indexing_dir = "log_index.md"

images_dir = "images/ML"
models_dir = "models"



file = "data/US_Accidents_Dec19.csv"


df, sc,spark = setup_spark(file)


sc

# Debug tool

df,_ = df.randomSplit([1.0,0.0], 1) # Speed up the program a bit
df.count()

for region in df.groupBy('State').count().orderBy('count',ascending=False).toPandas()['State']:
    timeSignature = str(datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
    createDirectory(models_dir+"/"+timeSignature +"/")
    logs_dir = models_dir+"/"+ timeSignature +"/logs.md"
    printSparkConf(sc, logs_dir)

    filter_state = str(region)
    print(filter_state)
    
    # 1. Define variables

    colLabel = ["Severity"]

    colRem = ['ID', 
              'Source',
              'End_Time',
              'End_Lat',
              'End_Lng',
              'Description',
            ]

    df, colCat, colNum = setup_variables(df, sc, colLabel, colRem, logs_dir)  

    write2file(logs_dir,"Number of rows", str(df.count()))

    # 2. Preprocessing

    from pyspark.ml.feature import VectorAssembler,OneHotEncoderEstimator,QuantileDiscretizer, StringIndexer, Imputer,StandardScaler,MinMaxScaler
    from pyspark.ml import Pipeline
    from numpy import argmax

    ## Analyse single state
    #This could be CA since it stands for a little less than 50% of the total set

    if filter_state != '':
        df = df.filter(df.State == filter_state) # Lowers the dataset quite a lot
        write2file(logs_dir,"Specified state",str(filter_state))
    else:
        write2file(logs_dir,"No state specified","")

    ## Modify time
    #Convert the timestamp into a numeric value and then into a string so that the time of day and month can be categorised.

    # Convert to int then cast to string

    df = df.withColumn('Start_Hour', hour(to_timestamp(from_utc_timestamp(df['Start_Time'], df['Timezone']).cast('string'), 'yyyy-MM-dd HH:mm:ss')))
    df = df.withColumn('Start_Month', month(to_timestamp(from_utc_timestamp(df['Start_Time'], df['Timezone']).cast('string'), 'yyyy-MM-dd HH:mm:ss')))

    df = df.withColumn('Weather_Hour', hour(to_timestamp(from_utc_timestamp(df['Weather_Timestamp'], df['Timezone']).cast('string'), 'yyyy-MM-dd HH:mm:ss')))
    df = df.withColumn('Weather_Month', month(to_timestamp(from_utc_timestamp(df['Weather_Timestamp'], df['Timezone']).cast('string'), 'yyyy-MM-dd HH:mm:ss')))

    df.show(1)
    write2file(logs_dir,"Dataset after modifying UTC timestamp", str(df.take(1)))

    ## Quantiles function
    #Alternative solution to get position 
    #Remove City, Country, State, Zipcode, Airport_Code
    #In order to reduce dimensionality feature crossing will be applied upon the Start_Lat and Start_Lng then fused in order to create a new set of data

    # TODO - Calculate how many buckets needed (Bins)
    discretizer_Lat = QuantileDiscretizer(numBuckets=100, inputCol="Start_Lat", outputCol="Start_Lat_disc")
    discretizer_Lng = QuantileDiscretizer(numBuckets=100, inputCol="Start_Lng", outputCol="Start_Lng_disc")

    # Into categorical values
    indexer_cord = [StringIndexer(inputCol=c + "_disc", outputCol=c+"_IDX") for c in ["Start_Lat","Start_Lng"]]

    # One-hot (feature crossing)
    encoder_cord = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexer_cord], outputCols=["{0}_vec".format(indexer.getOutputCol()) for indexer in indexer_cord])

    pipeline_cord = Pipeline(stages=[discretizer_Lat,discretizer_Lng, *indexer_cord, encoder_cord])
    preprocessed_cord = pipeline_cord.fit(df).transform(df)

    position = VectorAssembler(inputCols=["Start_Lat_IDX_vec","Start_Lng_IDX_vec"], outputCol="position")

    df = position.transform(preprocessed_cord)

    df.show(1)
    write2file(logs_dir,"Dataset after adding quantiles",str(df.take(1)))

    ## Cast data 
    #Cast all the datacolumns into correct format so they will be sorted to numerical or categorical values. Data removed caused to big feature vector, contained too many NaN values or were converted into another form.

    df = df.select(
            #col('TMC').cast('string'),  Lot of missing values
            col('Severity').cast('int'),
            col('Start_Hour').cast('string'),
            col('Start_Month').cast('string'),
            col('Weather_Hour').cast('string'),
            col('Weather_Month').cast('string'),
            col('position'), # already preprocessed
            col('Distance(mi)').cast('double'),
            col('Side').cast('string'),   
            col('Temperature(F)').cast('double'),
            col('Wind_Chill(F)').cast('double'), #  Lot of missing values
            col('Humidity(%)').cast('double'),
            col('Pressure(in)').cast('double'),
            col('Visibility(mi)').cast('double'),
            col('Wind_Direction').cast('string'),
            col('Wind_Speed(mph)').cast('double'),  #Lot of missing values
            col('Weather_Condition').cast('string'),
            col('Amenity').cast('string'),
            col('Bump').cast('string'),
            col('Crossing').cast('string'),
            col('Give_Way').cast('string'),
            col('Junction').cast('string'),
            col('No_Exit').cast('string'),
            col('Railway').cast('string'),
            col('Roundabout').cast('string'),
            col('Station').cast('string'),
            col('Stop').cast('string'),
            col('Traffic_Calming').cast('string'),
            col('Traffic_Signal').cast('string'),
            col('Turning_Loop').cast('string'),
            col('Sunrise_Sunset').cast('string'),
            col('Civil_Twilight').cast('string'),
            col('Nautical_Twilight').cast('string'),
            col('Astronomical_Twilight').cast('string')
        ) 

    # Since the last check can be unorganised we recreate a new list that contains all data
    #df.cache()  # TODO does this work? - Heatbeat crash otherwise
    colLabel = ["Severity"]

    colCat, colNum = createNewClasses(df, sc, colLabel, logs_dir)

    ## Clean data

    ### Recheck the missing values
    #Check so that the output contains 0 missing values

    printMissingValues(df, logs_dir)

    ### Drop NaN
    #Cant drop NaN since imputer should take care of the most of them! However we can drop values from the categorical set if we want!

    #old = df.count()
    #df = df.na.drop()
    #print("Rows removed:",old-df.count())

    ### Recheck the missing values
    #Check so that the output contains 0 missing values

    printMissingValues(df, logs_dir)

    ### Remove data with occurance less than 1%
    #Based on information from analysis. With further analysis lower procentage can be used to find better results.

    original_rows = df.count()
    n = int(df.count()*0.001) # Limit the plot to ignore conditions below an limit

    weather_freq = df.groupBy('Weather_Condition').count().orderBy('count',ascending=False)
    df_filtered = weather_freq.filter(weather_freq['count'] > n)
    filtered_conditions = df_filtered.select("Weather_Condition").rdd.flatMap(lambda x: x).collect()
    df = df.filter(df['Weather_Condition'].isin(*filtered_conditions))

    df_filtered.show()
    print("Rows removed:",original_rows - df.count())

    write2file(logs_dir,"Weather condition", str(df_filtered.take(df_filtered.count())) + "\nRows removed: " + str(original_rows - df.count()))

    ## Prepare Pipeline

    imputer = Imputer(inputCols=colNum, outputCols=colNum)
    imputer.setStrategy("median")

    num_assembler = VectorAssembler(inputCols=colNum, outputCol="num_features",handleInvalid="skip")
    scaler = MinMaxScaler(min=0.0, max=1.0, inputCol="num_features", outputCol="scaledFeatures")

    indexers = [StringIndexer(inputCol = c, outputCol = c +'_IDX', handleInvalid='skip') for c in colCat]

    encoder = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexers], outputCols=["{0}_vec".format(indexer.getOutputCol()) for indexer in indexers],handleInvalid="skip")

    ## Preprocessing - Pipeline

    imputer = Imputer(inputCols=colNum, outputCols=colNum)
    imputer.setStrategy("median")


    num_assembler = VectorAssembler(inputCols=colNum, outputCol="num_features",handleInvalid="skip")
    #scaler = StandardScaler(inputCol="num_features", withMean=True, withStd=True, outputCol="scaledFeatures")
    scaler = MinMaxScaler(min=0.0, max=1.0, inputCol="num_features", outputCol="scaledFeatures")

    indexers = [StringIndexer(inputCol = c, outputCol = c +'_IDX', handleInvalid='skip') for c in colCat]

    encoder = OneHotEncoderEstimator(inputCols=[indexer.getOutputCol() for indexer in indexers], outputCols=["{0}_vec".format(indexer.getOutputCol()) for indexer in indexers],handleInvalid="skip")

    # Categorical values

    numPipeline = Pipeline(stages=[imputer, num_assembler, scaler])
    catPipeline = Pipeline(stages=[*indexers, encoder])

    pipeline = Pipeline(stages=[numPipeline, catPipeline])
    preprocessed_df = pipeline.fit(df).transform(df)

    preprocessed_df.select("position","scaledFeatures",*[c + "_IDX_vec" for c in colCat]).show(1,False)

    ## Combine features

    va2 = VectorAssembler(inputCols=["position","scaledFeatures",*[c + "_IDX_vec" for c in colCat]], outputCol="final_features")

    df = va2.transform(preprocessed_df)

    df = df.withColumn('label', col("Severity"))
    df_features = df.withColumn('features', df.final_features).select("features","label")
    df_features.show(1, False)

    write2fileModel(df_features, models_dir,"df_features", timeSignature)

    #df_features.write.format('parquet').mode('overwrite').option("header", "true").save(models_dir+'/df_features.parquet')
    write2file(logs_dir,"Feature set size", str(df_features.count()) + "\n\n__Feature vector and label:__\n" + str(df_features.take(1)))
    write2file(logs_dir,"Number of rows", str(df_features.count()))
    print("Feature set size: ",df_features.count())

    ## Count categorical values

    # Checking how many classes that can be used
    tmp = [df.select(countDistinct(c).alias(c)).collect()[0] for c in [*colCat, "position"]] 
    print("Unique column values:", tmp)

    write2file(logs_dir,"Unique column values", str(tmp))

    # 3. Feature importance

    from pyspark.ml.feature import ChiSqSelector,PCA

    ## PCA
    #In order to understand how much the variance affect the dataset we check with PCA. Try to get above 90%

    k=250
    pca = PCA(k=k, inputCol="features", outputCol="pca_features")
    pca_model = pca.fit(df_features)
    pca_df = pca_model.transform(df_features)
    #chi_model[0].pca_features

    print("PCA - Feature Variance:","Top 50:\n" + str(pca_model.explainedVariance[:50]) + "\nNumber of items: "+str(k)+"\nSum of variance: "+ str(sum(pca_model.explainedVariance)))
    write2file(logs_dir,"PCA - Feature variance", "Top 50:\n" + str(pca_model.explainedVariance[:50]) + "\nNumber of items: "+ str(k) +"\nSum of variance: "+ str(sum(pca_model.explainedVariance)))
    write2fileModel(pca_df, models_dir, "pca_df", timeSignature)

    ## ChiSqSelector
    #Check top 100 which of the values in the feature vector

    selector = ChiSqSelector(numTopFeatures=k, 
                             labelCol='label', 
                             featuresCol='features', 
                             outputCol="selectedFeatures",
                             selectorType='numTopFeatures', 
                             percentile=0.1, 
                             fpr=0.05, fdr=0.05, fwe=0.05)

    chi_model = selector.fit(df_features)
    chi_df = chi_model.transform(df_features)

    write2file(logs_dir,"Top selected features according to ChiSqSelector", str(chi_model.selectedFeatures)+ "\nNumber of features: " + str(k) + "\nExample data:\n"+str(chi_df.take(5)))
    write2fileModel(chi_df, models_dir, "chi_df", timeSignature)

    print("Top selected features according to ChiSqSelector:", chi_model.selectedFeatures)
    print("Transformed selected features:",chi_df.head().selectedFeatures)
    print("Transformed feature vector:", chi_df.show(10))

    # 4. Machine learning

    from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier
    from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator

    # Use the chi_df for training since it is reduced
    #trainSet, testSet = df_features.randomSplit([0.8 ,0.2], 1)
    trainSet, testSet = chi_df.randomSplit([0.8 ,0.2], 1)
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

    trainSet.show(2,False)

    ## Logistic regression

    lr = LogisticRegression(labelCol="label", featuresCol="selectedFeatures")

    paramGrid = ParamGridBuilder() \
                    .addGrid(lr.regParam, [0.1,0.08,0.01]) \
                    .addGrid(lr.maxIter, [5,10.15]) \
                    .addGrid(lr.elasticNetParam, [0.6,0.8]) \
                    .build()
    try: 
        _, _ = evaluateModel(lr, paramGrid, "LR_Model", trainSet, testSet, timeSignature, evaluator=MulticlassClassificationEvaluator(),k=10, seed=None,logs_dir=logs_dir,models_dir=models_dir)
    except Exception as e:
        write2file(logs_dir,"Error:", str(e))
        print(e)

    ## Decision tree

    dt = DecisionTreeClassifier(labelCol="label", featuresCol="selectedFeatures") 

    paramGrid = ParamGridBuilder().build()

    _, _ = evaluateModel(dt, paramGrid, "DT_Model", trainSet, testSet, timeSignature, evaluator=MulticlassClassificationEvaluator(),k=10, seed=None,logs_dir=logs_dir,models_dir=models_dir)

    ## Random forest

    rf = RandomForestClassifier(labelCol="label", featuresCol="selectedFeatures") #numTrees=10
    paramGrid = ParamGridBuilder() \
                    .addGrid(rf.numTrees,[10,15,20,25]) \
                    .build()

    _, _ = evaluateModel(rf, paramGrid, "RF_Model", trainSet, testSet, timeSignature, evaluator=MulticlassClassificationEvaluator(),k=10, seed=None,logs_dir=logs_dir,models_dir=models_dir)

    write2file("logs_dir","Program finished!", "")

    write2file(model_indexing_dir,"New model folder created", \
               "Model finished: Yes" + \
               "Folder name: " + timeSignature + \
               "State: "+ filter_state + \
               "Logs directory: " + logs_dir +  \
               "File: " + file + \
               "Note: " \
              )
    


Config: [('spark.driver.extraClassPath', '/opt/conda/lib/python3.7/site-packages/sparkmonitor/listener.jar'), ('SPARKMONITOR_UI_HOST', '192.168.1.228'), ('spark.executor.id', 'driver'), ('spark.driver.host', 'localhost'), ('spark.app.name', 'Spark Project'), ('spark.executor.cores', '4'), ('executor-memory', '6'), ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'), ('spark.app.id', 'local-1588644394139'), ('spark.num.executors', '6'), ('spark.kryoserializer.buffer.max', '10'), ('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener'), ('spark.rdd.compress', 'True'), ('spark.driver.port', '45469'), ('spark.driver.memory', '8g'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.memory.fraction', '.6'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true')]
CA
Label: ['Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Wind_Direction', 'Weather_Cond