In [1]:
import os

os.environ['SPARK_HOME'] = '/home/envmodules/lib/spark-2.2.0-bin-hadoop2.7/'
import findspark

findspark.init()

from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col

import pandas as pd

spark = SparkSession.builder.appName('733').getOrCreate()

# Using the integrated file to start working on
integrated_df = spark.read.parquet('/user/vcs/annual_integrated_dataset_with_labels_ibes_fix_v2.parquet').cache()


def find_performance_metrics(res, model_used):
    res = res.withColumn('correct', res.label == res.prediction)

    num_rows = res.count()
    accuracy = res.filter(res.label == res.prediction).count() / res.count()

    # positive class (misstatements)
    true_positives_df = res.filter(res.prediction == 1.0).filter(res.label == 1.0)
    ground_truth_positives_df = res.filter(res.label == 1.0)
    misstatement_recall = true_positives_df.count() / ground_truth_positives_df.count()

    new_all_predicted_positive_df = res.filter(res.prediction == 1.0)
    misstatement_precision = true_positives_df.count() / new_all_predicted_positive_df.count()

    # negative class (non misstatements)
    true_negative_df = res.filter(res.prediction == 0.0).filter(res.label == 0.0)
    ground_truth_negative_df = res.filter(res.label == 0.0)
    non_misstatement_recall = true_negative_df.count() / ground_truth_negative_df.count()

    new_all_predicted_negative_df = res.filter(res.prediction == 0.0)
    non_misstatement_precision = true_negative_df.count() / new_all_predicted_negative_df.count()

    d = {'model_used': model_used, 'accuracy': accuracy, \
         'misstatement_precision': misstatement_precision, \
         'misstatement_recall': misstatement_recall}
    df = pd.DataFrame(data=d, index=[0])
    file_name = "performance_metrics" + "".join(model_used.split()) + ".csv"
    df.to_csv(file_name, encoding='utf-8')

    print("Using {}".format(model_used))
    print('accuracy is {}'.format(accuracy))
    print('misstatement_precision is {}, misstatement recall is {}'.format(misstatement_precision, misstatement_recall))
    print('non_misstatement_precision is {}, non_misstatement recall is {}'.format(non_misstatement_precision,
                                                                                   non_misstatement_recall))


# Downsampling:
misstated_df = integrated_df.filter(integrated_df.label == 1.0)
misstated_count = misstated_df.count()
non_misstated_df = integrated_df.filter(integrated_df.label == 0.0).limit(misstated_count)
integrated_df = misstated_df.union(non_misstated_df).cache()

# Using nullcounts to filter columns to keep
nullcounts = integrated_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in integrated_df.columns])
nc = list(nullcounts.first())

# Services-packaged software category selection (from EDA)
services_prepacked_software = integrated_df  # .filter(integrated_df.sic == '7372')
print('Total records in integrated file: ', integrated_df.count())
print('Number of records in Services-packaged software industrial category: ', services_prepacked_software.count())

# Reusing preprocessing steps implemented by Vincent
# filling nulls and nones with zeroes.
some_dict = {}
for x in services_prepacked_software.columns:
    some_dict[x] = 0

nwdf = services_prepacked_software.fillna(some_dict)

good_columns = []
for i in range(0, len(nc)):
    if nc[i] == 0:
        good_columns.append(i)

great_columns = [nwdf.columns[i] for i in good_columns]
great_columns.append('rea')
nwdf = nwdf.fillna(some_dict)

# dropping all string columns
non_string_columns = [k for (k, v) in nwdf.dtypes if v != 'string']
nwdf_no_strings = nwdf.select(*non_string_columns)
feature_columns = [item for item in nwdf_no_strings.columns if item not in ['rea', 'features', 'label', 'rea_label']]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
final_df = assembler.transform(nwdf_no_strings)
final_final_df = final_df.drop(*feature_columns).cache()

# String indexing not required
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(final_final_df)
td = si_model.transform(final_final_df)

# Evaluators
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
eval = BinaryClassificationEvaluator()

# RandomForest classifier
rf = RandomForestClassifier(numTrees=100, maxDepth=16, labelCol="indexed", seed=42)
model = rf.fit(td)
result = model.transform(final_final_df)
print('Accuracy on training data: ', evaluator.evaluate(result))

# Train test split for model evaluation
train, test = final_final_df.randomSplit([0.7, 0.3], seed=12345)
train.cache()
test.cache()

Total records in integrated file:  3388
Number of records in Services-packaged software industrial category:  3388
Accuracy on training data:  0.9902597402597403


DataFrame[rea: double, label: double, features: vector]

In [7]:
final_final_df.count()

3388

In [2]:
# ---------------
# Random Forest:
# ---------------

rf = RandomForestClassifier(numTrees=100, maxDepth=16, labelCol="label", seed=42)
print('Training RandomForest model on training set. \n Model parameters: {}'.format(rf._paramMap))
trained_model = rf.fit(train)
res = trained_model.transform(test)
metrics = MulticlassMetrics(res.select(['label', 'prediction']).rdd)
print('Accuracy on test set: ', evaluator.evaluate(res))
print('Area under ROC curve: ', eval.evaluate(res))
find_performance_metrics(res, "random forest")

Training RandomForest model on training set. 
 Model parameters: {Param(parent='RandomForestClassifier_43fe9cefe089525d2408', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 16, Param(parent='RandomForestClassifier_43fe9cefe089525d2408', name='numTrees', doc='Number of trees to train (>= 1).'): 100, Param(parent='RandomForestClassifier_43fe9cefe089525d2408', name='seed', doc='random seed.'): 42, Param(parent='RandomForestClassifier_43fe9cefe089525d2408', name='labelCol', doc='label column name.'): 'label'}
Accuracy on test set:  0.8317214700193424
Area under ROC curve:  0.9129208749480402
Using random forest
accuracy is 0.8317214700193424
misstatement_precision is 0.8044692737430168, misstatement recall is 0.8622754491017964
non_misstatement_precision is 0.8611670020120724, non_misstatement recall is 0.8030018761726079


In [3]:
# ---------------
# Logistic regression:
# ---------------
print('Training LogisticRegression model on training set.')
logistic = LogisticRegression(regParam=0.1, labelCol="label")  # , thresholds = [0.2, 0.5])
trained_model = logistic.fit(train)
res = trained_model.transform(test)
metrics = MulticlassMetrics(res.select(['label', 'prediction']).rdd)
print('Accuracy on test set: ', evaluator.evaluate(res))
print('Area under ROC curve: ', eval.evaluate(res))
find_performance_metrics(res, "logistic regression")

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = trained_model.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']).select('threshold').head()[
    'threshold']
logistic.setThreshold(bestThreshold)
print('best threshold is:' + str(bestThreshold))

print("For Logistic regression:")
trained_model = logistic.fit(train)
res = trained_model.transform(test)
metrics = MulticlassMetrics(res.select(['label', 'prediction']).rdd)
print('Accuracy on test set: ', evaluator.evaluate(res))
print('Area under ROC curve: ', eval.evaluate(res))
# find_performance_metrics(res, "logistic regression")
find_performance_metrics(res, "logistic regression with best threshold")

df = pd.DataFrame(
    {'lr_coeff': trained_model.coefficients,
     'feature_column': feature_columns,
     })

df['abs_lr_coeff'] = df['lr_coeff'].abs()
df = df = df.sort_values('abs_lr_coeff', ascending=False).reset_index()
print(df.head())

Training LogisticRegression model on training set.
Accuracy on test set:  0.7156673114119922
Area under ROC curve:  0.79554586886265
Using logistic regression
accuracy is 0.7156673114119922
misstatement_precision is 0.6757215619694398, misstatement recall is 0.7944111776447106
non_misstatement_precision is 0.7685393258426966, non_misstatement recall is 0.6416510318949343
objectiveHistory:
0.693054780889867
0.6832815629719761
0.6665432637919997
0.6566143166735775
0.6523768157731895
0.6340166196417919
0.6038257285703489
0.5999231777516096
0.5987003389535266
0.5983496107895722
0.5973352637699861
0.5957638683527767
0.5936116465903114
0.5930597269206699
0.5926519647087672
0.5924898476386136
0.5922993564142169
0.591472652293314
0.5911468466977239
0.5910659212121527
0.591032521735034
0.5910105861790761
0.5909677331851787
0.5909506214723904
0.5909388641217875
0.5909379351749218
0.5909357218358776
0.5909217636609192
0.590919500921314
0.5909174359066834
0.5909109936801681
0.5908988636347207
0.59

In [None]:
# ------------------------------------------------------------
# Code for making use of validation set for parameter tuning
train, test = final_final_df.randomSplit([0.9, 0.1], seed=12345)

lr = LogisticRegression()

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.2, 0.15, 0.1, 0.01]) \
    .addGrid(lr.threshold, [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 1.0])\
    .build()

# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
res = model.transform(test)
find_performance_metrics(res, 'logistic_with_validation')

In [None]:
# ---------------------------------------------------------
# For RandomForest
rf = RandomForestClassifier()

paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [50, 100, 150, 200]) \
    .addGrid(rf.maxDepth, [4, 8, 12, 16, 18, 20])\
    .build()
    
tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
model = tvs.fit(train)
res = model.transform(test)
find_performance_metrics(res, 'rf_with_validation')