### Decision tree Estimator on Analyzing a all_vars_for_zeroinf_analysis data 

- This Python notebook demonstrates creating an ML Pipeline to preprocess a dataset, train a Machine Learning model, and make predictions.

- Data: The dataset contains all_vars_for_zeroinf_analysis  

- Goal: We want to learn to predict confirmed deaths  from information such as 

- Approach: We will use Spark ML Pipelines, which help users piece together parts of a workflow such as feature processing and model training. We will also demonstrate model selection (a.k.a. hyperparameter tuning) using Cross Validation in order to fine-tune and improve our ML model.

In [2]:
%sh 
pip install mleap

In [3]:
from pyspark.sql.functions import log

In [4]:
%sh ls /dbfs/FileStore/tables/

In [5]:
# File location and type
file_location = "/FileStore/tables/all_vars_for_zeroinf_analysis.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

## import all_vars_for_zeroinf_analysis  file 
# The applied options are for CSV files. For other file types, these will be ignored.
zeroinf_analysis = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [6]:
#display(zeroinf_analysis)

In [7]:
zeroinf_analysis.count()

In [8]:
zeroinf_analysis.dtypes

In [9]:
new_column_name_list= list(map(lambda x: x.replace(".", "_"), zeroinf_analysis.columns))

zeroinf_analysis = zeroinf_analysis.toDF(*new_column_name_list)

In [10]:
zeroinf_analysis.where( zeroinf_analysis['confirmed_deaths'].isNull() ).count()

In [11]:
zeroinf_analysis.where( zeroinf_analysis['pop_fraction'].isNull() ).count()

In [12]:
def count_nulls(df):
    null_counts = []          #make an empty list to hold our results
    for col in df.dtypes:     #iterate through the column data types we saw above, e.g. ('C0', 'bigint')
        cname = col[0]        #splits out the column name, e.g. 'C0'    
        ctype = col[1]        #splits out the column type, e.g. 'bigint'
        if ctype != 'string': #skip processing string columns for efficiency (can't have nulls)
            nulls = df.where( df[cname].isNull() ).count()
            result = tuple([cname, nulls])  #new tuple, (column name, null count)
            null_counts.append(result)      #put the new tuple in our result list
    return null_counts

null_counts = count_nulls(zeroinf_analysis)

In [13]:
null_counts

In [14]:
zeroinf_analysis_described = zeroinf_analysis.describe()
zeroinf_analysis_described.show()

In [15]:
from pyspark.sql.functions import skewness, kurtosis
from pyspark.sql.functions import var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile
zeroinf_analysis.select(skewness('confirmed_deaths')).show()


In [16]:
from pyspark.sql import Row

columns = zeroinf_analysis_described.columns  #list of column names
funcs   = [skewness, kurtosis]  #list of functions we want to include (imported earlier)
fnames  = ['skew', 'kurtosis']  #a list of strings describing the functions in the same order

def new_item(func, column):
    """
    This function takes in an aggregation function and a column name, then applies the aggregation to the
    column, collects it and returns a value.  The value is in string format despite being a number, 
    because that matches the output of describe.
    """
    return str(zeroinf_analysis.select(func(column)).collect()[0][0])

new_data = []
for func, fname in zip(funcs, fnames):
    row_dict = {'summary':fname}  #each row object begins with an entry for "summary"
    for column in columns[1:]:
        row_dict[column] = new_item(func, column)
    new_data.append(Row(**row_dict))  #using ** tells Python to unpack the entries of the dictionary
    
print(new_data)

In [17]:
zeroinf_analysis_described.collect()

In [18]:
new_describe = sc.parallelize(new_data).toDF()           #turns the results from our loop into a dataframe
new_describe = new_describe.select(zeroinf_analysis_described.columns) #forces the columns into the same order

expanded_describe = zeroinf_analysis_described.unionAll(new_describe)  #merges the new stats with the original describe
expanded_describe.show()


In [19]:
import pandas as pd
#import matplotlib.pyplot as plt

#%matplotlib inline #tells the Jupyter Notebook to display graphs inline (rather than in a separate window)

In [20]:
confirmed_deaths = zeroinf_analysis[['confirmed_deaths']].collect()
confirmed_cases = zeroinf_analysis[['confirmed_cases']].collect()

In [21]:
print(confirmed_deaths[:5])
print(confirmed_cases[:5])

In [22]:
zeroinf_analysis.columns


In [23]:
from pyspark.sql.functions import col

dataset = zeroinf_analysis.select( col('length_of_lockdown'), col('confirmed_cases'), col('confirmed_deaths'), col('POP_ESTIMATE_2018'), col('ICU_Beds'), col('Adult_obesity_percentage'),col('Quality_of_Life_rank'), col('Excessive_drinking_percentage'),col('Population_per_sq_mile'), col('Clinical_Care_rank'), col('Adult_smoking_percentage'), col('Total_Specialist_Physicians__2019_'), col('Physical_Environment_rank'), col('Number_of_Tests_with_Results_per_1_000_Population'))

In [24]:

dataset.show(5)

In [25]:
dataset.dtypes

In [26]:
from pyspark.sql.functions import col


dataset = dataset.withColumnRenamed('confirmed_deaths', 'label')

In [28]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor, GeneralizedLinearRegression, AFTSurvivalRegression
from pyspark.ml.feature import VectorIndexer

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType
 
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder  

In [29]:
valuableColumns = list(dataset.columns)

In [30]:
valuableColumns

In [31]:
for col in valuableColumns[:-1]:
    # Of cause we can't change immutable values, but we can owerwrite them
    dataset = dataset.withColumn(col+"_d", dataset[col].cast("double"))
    
    
dataset = dataset.fillna(-1., subset=valuableColumns)  
dataset

In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [33]:
dataset=dataset.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
dataset.show()

In [34]:
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dataset)

In [35]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3])

In [36]:
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

In [37]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

In [38]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)


In [39]:
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

In [40]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [41]:
treeModel = model.stages[1]
# summary only
print(treeModel)

In [42]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [43]:
grid = ParamGridBuilder() \
  .addGrid(dt.maxDepth, [2, 3, 4, 5, 6, 7, 8]) \
  .addGrid(dt.maxBins, [2, 4, 8]) \
  .build()

In [44]:
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)

In [45]:
# Explicitly create a new run.
# This allows this cell to be run multiple times.
# If you omit mlflow.start_run(), then this cell could run once,
# but a second run would hit conflicts when attempting to overwrite the first run.
import mlflow
import mlflow
import mlflow.mleap
import pyspark
#import pyspark.ml.mleap.SparkUtil 
#import mlflow.mleap.SparkUtil 
import mlflow.mleap
with mlflow.start_run():
  cvModel = cv.fit(trainingData)
  mlflow.set_tag('owner_team', 'UX Data Science') # Logs user-defined tags
  test_metric = evaluator.evaluate(cvModel.transform(testData))
  mlflow.log_metric('testData_' + evaluator.getMetricName(), test_metric) # Logs additional metrics
  mlflow.mleap.log_model(spark_model=cvModel.bestModel, sample_input=testData, artifact_path='dbfs:/databricks/mlflow/2835302286394144') # Logs the best model via mleap


In [46]:
import mlflow
import mlflow.mleap
from mlflow import log_metric, log_param, log_artifacts
def fit_model():
  import mlflow
  import mlflow.mleap
  from mlflow import log_metric, log_param, log_artifacts
  # Start a new MLflow run
  with mlflow.start_run() as run:
    # Fit the model, performing cross validation to improve accuracy
    grid = ParamGridBuilder().addGrid(dt.maxDepth, [2, 3, 4, 5, 6, 7, 8]).addGrid(dt.maxBins, [2, 4, 8]).build()
    #paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [1000, 2000]).build()
    cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)
    #cv = CrossValidator(estimator=pipeline, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)
    cvModel = cv.fit(trainingData)
    #cvModel = cv.fit(df)
    model = cvModel.bestModel
  
    # Log the model within the MLflow run
    mlflow.mleap.log_model(spark_model=model, sample_input=trainingData, artifact_path="dbfs:/databricks/mlflow/2835302286394144")

In [47]:
fit_model()