# Classification model

[Pyspark blog](https://bryancutler.github.io/)

[ML w/ Pyspark](https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa)

[GitHub event types](https://developer.github.com/v3/activity/events/types/)

[Churn modeling](https://www.urbanairship.com/blog/churn-prediction-our-machine-learning-model)

[Interpreting Trees](https://towardsdatascience.com/interpretable-machine-learning-with-xgboost-9ec80d148d27)

[Intro to Boosted Trees](https://xgboost.readthedocs.io/en/latest/tutorials/model.html#why-introduce-the-general-principle)

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
import pandas as pd
import pickle

from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Binarizer#, OneHotEncoderEstimator, StringIndexer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, GBTClassifier, RandomForestClassifier
from pyspark.sql import functions as F

from pyspark.sql.functions import to_timestamp, datediff, unix_timestamp

import helper as h

### Model name and filter flags for segmentation

In [2]:
company_filter = 0
high_low_filter = 1
if company_filter:
    model_name = 'company_' + str(company_filter)
else:
    model_name = 'company_' + str(company_filter) + 'high_low_' + str(high_low_filter)

## Data ingestion 

In [None]:
_data = h.get_merged_data('classification')

In [None]:
#_data = _data.withColumn("end_date", to_timestamp('2016-06-02 23:59:59+00:00'))
#_data = _data.withColumn("T", datediff(_data.end_data, _data.created_at))    

In [None]:
_data.printSchema()

In [None]:
#_data.head()

### Load K-Means classifier

Classify users as high or low use.

In [None]:
_data = h.add_high_low_flag(_data)

### Scale data

In [None]:
_data = h.feature_scaling(_data)

### Filter and segment users

In [None]:
churn_data = _data.filter((_data.company == company_filter)) 
if not company_filter:
    churn_data = churn_data[churn_data.high_low_user == high_low_filter]

### Summarize

In [None]:
h.print_user_churn(churn_data)

In [None]:
numeric_features = [t[0] for t in churn_data.dtypes if t[1] == 'int' or t[1] == 'double']
numeric_features.remove('second_period_event_count')
numeric_features.remove('frequency')
numeric_features.remove('non_passive_events')
numeric_features.remove('public_repos_gists')
numeric_features.remove('high_low_user')
numeric_features.remove('company')
numeric_features.remove('time_between_first_last_event')
numeric_features.remove('recency')
#churn_data.select(numeric_features).describe().toPandas().transpose()

## Build PySpark pipeline

In [None]:
stages = []
# binarizer needs double type or it throws an error.
churn_data = churn_data.withColumn("second_period_event_count", 
                                   churn_data.second_period_event_count.cast(DoubleType()))
binarizer = Binarizer(threshold=0.5, 
                      inputCol="second_period_event_count", 
                      outputCol="label")

stages += [binarizer]
assembler = VectorAssembler(inputCols=numeric_features, 
                            outputCol="features").setHandleInvalid("skip")
stages += [assembler]

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(churn_data)
churn_data = pipelineModel.transform(churn_data)
selectedCols = ['label', 'features'] + numeric_features
churn_data = churn_data.select(selectedCols)
churn_data.printSchema()

In [None]:
# save pipeline
pipeline.write().overwrite().save('pipeline')

In [None]:
#examples = pd.DataFrame(churn_data.take(5), columns=churn_data.columns).transpose()
#examples

### Training/Test split

In [None]:
train, test = churn_data.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: {0}".format(train.count()))
print("Test Dataset Count: {0}".format(test.count()))

## Logistic Regression

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=50,
                        #regParam=0.3, elasticNetParam=0.08
                       )
lrModel = lr.fit(train)

In [None]:
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
pd.DataFrame({'features': numeric_features, # np.asarray(numeric_features)[lrModel.coefficients.indices], 
              'weights': lrModel.coefficients.values}
            ).sort_values(by='weights', ascending=False)


In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()

fig, ax = plt.subplots(1,1)
fig.set_tight_layout(True)
pd.DataFrame({'features': numeric_features, # np.asarray(numeric_features)[lrModel.coefficients.indices], 
              'weights': lrModel.coefficients.values}
            ).sort_values(by='weights', ascending=False)
ax.plot(roc['FPR'],roc['TPR'])
ax.set_ylabel('True Positive Rate')
ax.set_xlabel('False Positive Rate')
#ax.title('ROC Curve')
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))


fig.savefig('figures/logistic_regression_ROC.png')

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.ylim([0, 1])
plt.xlim([0, 1])
plt.show()

In [None]:
#lrModel._java_obj.setThreshold(0.75)
print('Threshold: {0}'.format(lrModel._java_obj.getThreshold()))
predictions = lrModel.transform(test)
show_cols = ['followers_count', 'blog', 
             'label', 'rawPrediction', 'prediction', 'probability']
predictions.select(show_cols).show(10)

In [None]:
h.eval_metrics(predictions)

In [None]:
lrModel.write().overwrite().save('lrModel_' + model_name)

## Gradient Boosted Trees

[Gradient Boost vs XGboost](https://datascience.stackexchange.com/questions/16904/gbm-vs-xgboost-key-differences)

In [25]:
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
h.write_tree_to_file(gbtModel.toDebugString, 'gbt_trees_' + model_name)

predictions = gbtModel.transform(test)
predictions.select(show_cols).show(10)

h.eval_metrics(predictions)

Saved to fullfile
+-------------------+-------+----+-----+--------------------+----------+--------------------+
|    followers_count|company|blog|label|       rawPrediction|prediction|         probability|
+-------------------+-------+----+-----+--------------------+----------+--------------------+
|0.47712125471966244|      0|   1|  0.0|[0.27226970042930...|       0.0|[0.63286776479089...|
| 1.0413926851582251|      0|   0|  0.0|[-0.1774756260228...|       1.0|[0.41218227251353...|
| 0.6020599913279624|      0|   0|  0.0|[0.28653914035891...|       0.0|[0.63947316852387...|
| 0.3010299956639812|      0|   0|  0.0|[0.44483160176320...|       0.0|[0.70882066470747...|
|0.47712125471966244|      0|   0|  0.0|[-0.0731294011844...|       1.0|[0.46350034184256...|
|0.47712125471966244|      0|   0|  0.0|[0.83078297981233...|       0.0|[0.84044810251930...|
|                0.0|      0|   1|  0.0|[-0.3728070131871...|       1.0|[0.32177773262117...|
|                0.0|      0|   0|  0.0|[0

### Feature importances
[pyspark feature imortances doc](http://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.classification.GBTClassificationModel.featureImportances):

Estimate of the importance of each feature.

Each feature’s importance is the average of its importance across all trees in the ensemble. The importance vector is normalized to sum to 1. This method is suggested by Hastie et al. (Hastie, Tibshirani, Friedman. “The Elements of Statistical Learning, 2nd Edition.” 2001.) and follows the implementation from scikit-learn.

In [26]:
print(gbtModel.featureImportances.indices)
numeric_features = np.asarray(numeric_features)
pd.DataFrame([numeric_features[gbtModel.featureImportances.indices], 
              gbtModel.featureImportances.values]).T.sort_values(by=[1], ascending=False)

[ 0  1  2  3  4  5  6  7  8  9 10 12 13 14 15 16 17 18 19 20]


Unnamed: 0,0,1
3,public_repos_count,0.221444
6,time_between_first_last_event,0.170778
8,CreateEvent_count,0.131313
1,following_count,0.0890643
12,IssuesEvent_count,0.0853046
10,ForkEvent_count,0.0680775
17,PushEvent_count,0.056275
19,WatchEvent_count,0.0502538
11,IssueCommentEvent_count,0.0398236
0,followers_count,0.0360821


In [59]:
'''evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(
    evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))'''

'evaluator = BinaryClassificationEvaluator()\nprint("Test Area Under ROC: " + str(\n    evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))'

## Random forest classification

In [60]:
'''rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
h.write_tree_to_file(rfModel.toDebugString, 'rf_trees_' + model_name)
predictions = rfModel.transform(test)
predictions.select(show_cols).show(10)
h.eval_metrics(predictions)'''

"rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')\nrfModel = rf.fit(train)\nh.write_tree_to_file(rfModel.toDebugString, 'rf_trees_' + model_name)\npredictions = rfModel.transform(test)\npredictions.select(show_cols).show(10)\nh.eval_metrics(predictions)"

## Grid search with cross validation

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()

paramGrid = (ParamGridBuilder()
             .addGrid(lrModel.elasticNetParam, [0, 0.8, 0.08])
             .addGrid(lrModel.regParam, [0, 0.3, 0.003])
             .addGrid(lrModel.maxIter, [50, 100])
             .build())
cv = CrossValidator(estimator=lrModel, estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

Py4JJavaError: An error occurred while calling o908.cache.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at scala.collection.immutable.HashSet$HashTrieSet.updated0(HashSet.scala:557)
	at scala.collection.immutable.HashSet.$plus(HashSet.scala:84)
	at scala.collection.immutable.HashSet.$plus(HashSet.scala:35)
	at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
	at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:20)
	at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
	at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:20)
	at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
	at scala.collection.AbstractTraversable.to(Traversable.scala:104)
	at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:304)
	at scala.collection.AbstractTraversable.toSet(Traversable.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88)
	at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:88)
	at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$1(TreeNode.scala:213)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$1$$anonfun$apply$6.apply(TreeNode.scala:224)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$1.apply(TreeNode.scala:224)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:217)
	at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized$lzycompute(Expression.scala:193)
	at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:191)
	at org.apache.spark.sql.catalyst.expressions.ExpressionSet.add(ExpressionSet.scala:63)
	at org.apache.spark.sql.catalyst.expressions.ExpressionSet$$anonfun$$plus$plus$1.apply(ExpressionSet.scala:79)


In [None]:
pd.DataFrame({'features': numeric_features, # np.asarray(numeric_features)[lrModel.coefficients.indices], 
              'weights': lrModel.coefficients.values}
            ).sort_values(by='weights', ascending=False)

In [None]:
h.eval_metrics(predictions)