# Create predictive model
<br>
## Task
Construct model that is going to predict if an influencer is going to publish a post next day or not. Model it as binary classification.

## Data
* use two datasets about influencers
* the first dataset contains basic information about each influencer
* the second dataset contains posting history for each influncer for the past 6 months

## Notes
* the posting history is for the period 1.1.2018 - 1.8.2018
* assume it is 31.7.2017 and make a prediction for the next day
* extract the labels for 1.8. to constract the training and test dataset
* extract some features from the available data
* experiment with these models: Logistic regression (lr), decision tree (dt), random forest (rf)
* Try to construct some basic model first and than improve it by adding some more features

## Documentation
<br>
* Pyspark documentation of DataFrame API is <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html">here</a>

* Pyspark documentation of ML Pipelines library is <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html">here</a>

* Prezentation slides are accessed <a target="_blank" href = "https://docs.google.com/presentation/d/1XNKIfE5Atj_Mzse0wjmbwLecmVs2YkWm9cqOLqDVWPo/edit?usp=sharing">here</a>

### Import functions and modules

In [4]:
from pyspark.sql.functions import col, max, datediff, count, desc, array_contains, broadcast, explode, length, first, when, expr, regexp_replace, row_number, coalesce, lit, coalesce, size

from pyspark.sql import Window


from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

### Load Data

In [6]:
infl = spark.table('mlprague.influencers')

posts_history = spark.table('mlprague.infl_posting_history')

### You may want to do some exploratory analytics first

hint:
* see how many records you have
* what is the schema of the dataset
* see some records
* use can use printSchema(), show(), count(), or proprietaray function display()

In [8]:
# your code here:
infl.count()

In [9]:
posts_history.count()

In [10]:
infl.printSchema()

In [11]:
posts_history.printSchema()

In [12]:
display(infl)

influencer_id,languages,interests
85194,List(en),"List(Family and relationships, Entertainment)"
67648,List(fr),List(Shopping and fashion)
184917,List(en),"List(Hobbies and activities, Food and drink, Family and relationships)"
370126,List(en),List(Hobbies and activities)
246388,List(es),"List(Sports and outdoors, Fitness and wellness)"
228457,List(en),"List(Entertainment, Hobbies and activities)"
122770,List(id),"List(Business and industry, Hobbies and activities)"
41916,List(ru),"List(Food and drink, Business and industry, Family and relationships)"
269152,List(hi),"List(Family and relationships, Hobbies and activities)"
204386,List(ru),List(Hobbies and activities)


In [13]:
display(posts_history)

influencer_id,post_date
55769,2018-05-28
185791,2018-03-11
272845,2018-06-17
174005,2018-06-08
320590,2018-07-22
154641,2018-07-16
326720,2018-04-07
349143,2018-04-30
291156,2018-06-04
244638,2018-02-25


### Extract the label

hint:
* use the posts history dataset and see what influencers posted on 1.8.2018 and assign them label 1
 * use withColumn() transormation together with lit(1) which adds a column with constant value 1
 * see lit() function in <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit">docs</a> with example
* left join this on the influencers and those records with null value will have label 0

In [15]:
label = (
  posts_history
  .filter(col('post_date') == '2018-08-01')
  .select('influencer_id')
  .distinct()
  .withColumn('label', lit(1))
)

influencers_with_label = (
  infl
  .join(label, 'influencer_id', 'left')
  .withColumn('label', coalesce('label', lit(0)))
)

### You may also want to check how many datapoints you have for each class

hint
* use groupBy('label').count()

In [17]:
display(
  influencers_with_label
  .groupBy('label')
  .agg(count('*').alias('ct'))
)

label,ct
1,123810
0,265743


In [18]:
display(influencers_with_label)

influencer_id,languages,interests,label
148,List(pt),List(Fitness and wellness),0
463,List(de),"List(Food and drink, Business and industry, Sports and outdoors)",0
471,List(ru),List(Fitness and wellness),0
496,List(en),"List(Hobbies and activities, Business and industry, Entertainment)",0
833,List(es),List(Fitness and wellness),0
1088,List(ru),"List(Business and industry, Hobbies and activities, Shopping and fashion)",1
1238,List(es),"List(Family and relationships, Fitness and wellness)",1
1342,List(en),"List(Hobbies and activities, Fitness and wellness, Business and industry)",1
1580,List(en),List(Hobbies and activities),0
1591,List(en),List(Shopping and fashion),0


### Construct some basic features

hint:
* you may try number of interests, number of languages, age
* interests and language cols are of ArrayType
 * you can use <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.size">size</a> function to count number of its elements
 * the slide 48 in the prezentation might be useful for using functions on arrays

In [20]:
data_with_basic_features = (
  influencers_with_label
  .withColumn('num_interests', size('interests'))
  .withColumn('num_languages', size('languages'))
)

In [21]:
display(data_with_basic_features)

influencer_id,languages,interests,label,num_interests,num_languages
148,List(pt),List(Fitness and wellness),0,1,1
463,List(de),"List(Food and drink, Business and industry, Sports and outdoors)",0,3,1
471,List(ru),List(Fitness and wellness),0,1,1
496,List(en),"List(Hobbies and activities, Business and industry, Entertainment)",0,3,1
833,List(es),List(Fitness and wellness),0,1,1
1088,List(ru),"List(Business and industry, Hobbies and activities, Shopping and fashion)",1,3,1
1238,List(es),"List(Family and relationships, Fitness and wellness)",1,2,1
1342,List(en),"List(Hobbies and activities, Fitness and wellness, Business and industry)",1,3,1
1580,List(en),List(Hobbies and activities),0,1,1
1591,List(en),List(Shopping and fashion),0,1,1


### Split the data for training and testing

hint
* use the function <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit">randomSplit</a>
* see the slide 99 in the presentation

In [23]:
(train, test) = data_with_basic_features.randomSplit([0.7, 0.3], 24)

### Construct & fit the pipeline

hint:
* use <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler">VectorAssembler</a> to create the input features 
* choose your model 
 * for LR use <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier">RandomForestClassifier</a> 
 * for RF use <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression">LogisticRegression</a> 
 * for DT use <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier">DecisionTreeClassifier</a> 
* the slide 104 in the prezentation might be useful for constructing the pipeline
* use train data for training

In [25]:
# features:
features_array = ['num_interests', 'num_languages']

# Assambler:
assembler = VectorAssembler(inputCols=(features_array), outputCol='features')

# Classifier:
rf = RandomForestClassifier(labelCol='label', featuresCol='features', seed=42)

pipeline = Pipeline(stages=[assembler, rf])

rf_model = pipeline.fit(train)

### Evaluate the model

hint: 
* use the <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.BinaryClassificationEvaluator">BinaryClassificationEvaluator</a> 
* the slide 106 in the prezentation might be useful for evaluating binary classification
* use the test data for evaluation

In [27]:
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')

predictions = rf_model.transform(test)

evaluator.evaluate(predictions)

In [28]:
display(predictions)

influencer_id,languages,interests,label,num_interests,num_languages,features,rawPrediction,probability,prediction
148,List(pt),List(Fitness and wellness),0,1,1,"List(1, 2, List(), List(1.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
463,List(de),"List(Food and drink, Business and industry, Sports and outdoors)",0,3,1,"List(1, 2, List(), List(3.0, 1.0))","List(1, 2, List(), List(13.392566048503358, 6.607433951496642))","List(1, 2, List(), List(0.6696283024251679, 0.3303716975748321))",0.0
496,List(en),"List(Hobbies and activities, Business and industry, Entertainment)",0,3,1,"List(1, 2, List(), List(3.0, 1.0))","List(1, 2, List(), List(13.392566048503358, 6.607433951496642))","List(1, 2, List(), List(0.6696283024251679, 0.3303716975748321))",0.0
1088,List(ru),"List(Business and industry, Hobbies and activities, Shopping and fashion)",1,3,1,"List(1, 2, List(), List(3.0, 1.0))","List(1, 2, List(), List(13.392566048503358, 6.607433951496642))","List(1, 2, List(), List(0.6696283024251679, 0.3303716975748321))",0.0
1238,List(es),"List(Family and relationships, Fitness and wellness)",1,2,1,"List(1, 2, List(), List(2.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
1829,List(zh),List(Hobbies and activities),0,1,1,"List(1, 2, List(), List(1.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
2122,List(en),"List(Fitness and wellness, Hobbies and activities)",0,2,1,"List(1, 2, List(), List(2.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
2142,List(pt),List(Hobbies and activities),0,1,1,"List(1, 2, List(), List(1.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
2659,"List(en, pl)",List(Hobbies and activities),0,1,2,"List(1, 2, List(), List(1.0, 2.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0
3749,List(en),List(Food and drink),0,1,1,"List(1, 2, List(), List(1.0, 1.0))","List(1, 2, List(), List(13.714821754258566, 6.2851782457414345))","List(1, 2, List(), List(0.6857410877129283, 0.3142589122870717))",0.0


The accuracy is not very great. Perhaps we can improve it by some more predictors

### Try to improve the model

hint:
* you may try also some categorical features like the value of the interest
* the slide 88, 94 in the prezentation might be useful for OneHotEncoder and StringIndexer

In [31]:
data_with_catagorical_feature = (
  data_with_basic_features.withColumn('interest', col('interests')[0])
)

In [32]:
display(data_with_catagorical_feature)

influencer_id,languages,interests,label,num_interests,num_languages,interest
148,List(pt),List(Fitness and wellness),0,1,1,Fitness and wellness
463,List(de),"List(Food and drink, Business and industry, Sports and outdoors)",0,3,1,Food and drink
471,List(ru),List(Fitness and wellness),0,1,1,Fitness and wellness
496,List(en),"List(Hobbies and activities, Business and industry, Entertainment)",0,3,1,Hobbies and activities
833,List(es),List(Fitness and wellness),0,1,1,Fitness and wellness
1088,List(ru),"List(Business and industry, Hobbies and activities, Shopping and fashion)",1,3,1,Business and industry
1238,List(es),"List(Family and relationships, Fitness and wellness)",1,2,1,Family and relationships
1342,List(en),"List(Hobbies and activities, Fitness and wellness, Business and industry)",1,3,1,Hobbies and activities
1580,List(en),List(Hobbies and activities),0,1,1,Hobbies and activities
1591,List(en),List(Shopping and fashion),0,1,1,Shopping and fashion


In [33]:
(train, test) = data_with_catagorical_feature.randomSplit([0.7, 0.3], 24)

In [34]:
# features:

features_array = ['num_interests', 'num_languages']

# indexer
interestIndexer = StringIndexer(inputCol='interest', outputCol='indexedInterest')

# OneHotEncoders:
interestEncoder = OneHotEncoder(inputCol='indexedInterest', outputCol='interestVec')

# Assambler:
assembler = VectorAssembler(inputCols=(features_array + ['interestVec']), outputCol='features')

# Classifier:
rf = RandomForestClassifier(featuresCol='features', seed=42)

pipeline = Pipeline(stages=[interestIndexer, interestEncoder, assembler, rf])

rf_model = pipeline.fit(train)

In [35]:
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')

predictions = rf_model.transform(test)

evaluator.evaluate(predictions)

The accuracy is slightly better but still not very good. Let's see if we can improve it even better:

### Improve the model even more

hint:
* construct some features that capture how frequently the influencer posts
* extract these features from the posting history

In [38]:
history_for_features = (
  posts_history
  .filter(col('post_date') <= '2018-07-31')
)

time_from_last_post = (
  history_for_features
  .groupBy('influencer_id')
  .agg(
    max('post_date').alias('last_post')
  )
  .withColumn('time_from_last_post', datediff(lit('2018-07-31'), col('last_post')))
  .select('influencer_id', 'time_from_last_post')
)

number_of_posts = (
  history_for_features
  .groupBy('influencer_id')
  .agg(
    count('*').alias('number_of_posts')
  )
)

In [39]:
data_features_improved = (
  data_with_catagorical_feature
  .join(time_from_last_post, 'influencer_id')
  .join(number_of_posts, 'influencer_id')
)

In [40]:
display(data_features_improved)

influencer_id,languages,interests,label,num_interests,num_languages,interest,time_from_last_post,number_of_posts
148,List(pt),List(Fitness and wellness),0,1,1,Fitness and wellness,17,13
463,List(de),"List(Food and drink, Business and industry, Sports and outdoors)",0,3,1,Food and drink,1,91
471,List(ru),List(Fitness and wellness),0,1,1,Fitness and wellness,1,27
496,List(en),"List(Hobbies and activities, Business and industry, Entertainment)",0,3,1,Hobbies and activities,0,123
833,List(es),List(Fitness and wellness),0,1,1,Fitness and wellness,0,92
1088,List(ru),"List(Business and industry, Hobbies and activities, Shopping and fashion)",1,3,1,Business and industry,1,224
1238,List(es),"List(Family and relationships, Fitness and wellness)",1,2,1,Family and relationships,0,142
1342,List(en),"List(Hobbies and activities, Fitness and wellness, Business and industry)",1,3,1,Hobbies and activities,0,205
1580,List(en),List(Hobbies and activities),0,1,1,Hobbies and activities,21,13
1591,List(en),List(Shopping and fashion),0,1,1,Shopping and fashion,22,52


In [41]:
(train, test) = data_features_improved.randomSplit([0.7, 0.3], 24)

In [42]:
# features:
features_array = ['num_interests', 'num_languages', 'time_from_last_post', 'number_of_posts']

# indexer
interestIndexer = StringIndexer(inputCol='interest', outputCol='indexedInterest')

# OneHotEncoders:
interestEncoder = OneHotEncoder(inputCol='indexedInterest', outputCol='interestVec')

# Assambler:
assembler = VectorAssembler(inputCols=(features_array), outputCol='features')

# Classifier:
rf = RandomForestClassifier(featuresCol='features', seed=42)

pipeline = Pipeline(stages=[interestIndexer, interestEncoder, assembler, rf])

rf_model = pipeline.fit(train)

In [43]:
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

predictions = rf_model.transform(test)

evaluator.evaluate(predictions)

### Try crossvalidation

hint
* the slide 108 in the prezentation might be useful for tunning hyperparameters
* check in the documentation what parameters has your model (maxDepth, numTrees for Random Forrest)

In [45]:
paramGrid = (
  ParamGridBuilder()
  .addGrid(rf.maxDepth, [3, 5, 8])
  .addGrid(rf.numTrees, [50, 100, 150])
  .build()
)

cross_model = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid).fit(train)

rf_model = cross_model.bestModel

In [46]:
predictions = rf_model.transform(test)
evaluator.evaluate(predictions)

## See some properties of the final model

Note
* This depends on the model you are using

Hint
* For Random Forest see the API of the model in <a target="_blank" href="https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassificationModel">docs</a>
* Example to see number of trees:
 * rf_model.stages[n].getNumTrees and here rf_model is your trained model and n is index of RF in your pipeline

In [48]:
rf_model.stages[3].getNumTrees

In [49]:
rf_model.stages[3].totalNumNodes

In [50]:
rf_model.stages[3].trees

In [51]:
rf_model.stages[3].toDebugString