# Pyspark set up and data downloading

In [None]:
import os
os.environ["JAVA_HOME"] = "/lib/jvm/java-11-openjdk-amd64"

# Because otherwise custom modules import errors
import sys
sys.path.append('../')
os.makedirs("../figures_report/enrich_dataset", exist_ok=True)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from itertools import chain

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

import plotly.graph_objs as go
import plotly.offline as py
import plotly.express as px



In [None]:
from src.ranking_helpers import *
from src.make_and_plot import *
from src.pages_groups_extraction import *
from src.data_aggregation import*

In [None]:
conf = pyspark.SparkConf().setMaster("local[5]").setAll([
                                   ('spark.driver.memory','120G'),
                                   ('spark.executor.memory', '120G'),
                                   ('spark.driver.maxResultSize', '0'),
                                    ('spark.executor.cores', '5'),
                                    ('spark.local.dir', '/scratch/descourt/spark')
                                ])
# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# create the context
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [None]:
dfs = spark.read.parquet("/scratch/descourt/processed_data/en/pageviews_en_2015-2023.parquet").withColumn('project', lit('en'))

In [None]:
df_high_volume = extract_volume(dfs, high=True).filter('date = "2022-11"').select('page_id').cache()

In [None]:
df_low_volume = extract_volume(dfs, high=False).filter('date = "2022-11"').select('page_id').cache()

## Additional datasets

In [None]:
df_topics_sp = spark.read.parquet('/scratch/descourt/metadata/topics/topic_en/topics-enwiki-20230320-parsed.parquet')

In [None]:
df_metadata = spark.read.parquet('/scratch/descourt/metadata/akhils_data/wiki_nodes_bsdk_phili_2022-11_en.parquet')

In [None]:
df_metadata.where(~col('page_creation_timestamp').isNull()).count()

In [None]:
df_nodes = spark.read.parquet('/scratch/descourt/metadata/akhils_data/wiki_nodes_2022-11_en.parquet')

In [None]:
with open("wikipedia_core_events_semantic/topics_list.txt", 'r') as f:
    lines = f.read()
topics_list = lines.replace('\n', '').replace("'", '').split(',')
topics_mapping = {t: i for t, i in zip(topics_list, range(len(topics_list)))}
topics_mapping_sp = create_map([lit(x) for x in chain(*topics_mapping.items())])

In [None]:
topics = df_topics_sp.select('page_id', col('topics_specific_unique').alias('topic')).distinct()\
                     .withColumn('topic', topics_mapping_sp[col("topic")]).cache()

# Prepare dataset

In [None]:
df_high_volume = df_high_volume.join(df_metadata.drop('wiki_db', 'item_id', 'page_title', 'page_creation_unix', 'page_creation_timestamp'), 'page_id')\
                               .join(df_nodes.drop('wiki_db', 'item_id', 'page_title'), 'page_id')\
                               .withColumn('is_core', lit(1))\
                               .withColumn('age_in_months',
                                           months_between(to_date(lit("2022-11"), 'yyyy-MM'),col('creation_date'))).cache()
df_low_volume = df_low_volume.join(df_metadata.drop('wiki_db', 'item_id', 'page_title', 'page_creation_unix', 'page_creation_timestamp'), 'page_id')\
                               .join(df_nodes.drop('wiki_db', 'item_id', 'page_title'), 'page_id')\
                               .withColumn('is_core', lit(0))\
                               .withColumn('age_in_months',
                                           months_between(to_date(lit("2022-11"), 'yyyy-MM'),col('creation_date'))).cache()

In [None]:
df_all = df_high_volume.unionAll(df_low_volume).dropna()

In [None]:
from pyspark.ml.feature import VectorAssembler

# Gather features into vector
numericCols =  ['is_orphan', 'is_bot_created', 'pred_qual', 'in_main', 'out_main', 'age_in_months'] # 'is_disambiguation']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
df_all = assembler.transform(df_all)

In [None]:
# Split into train test
train, test = df_all.randomSplit([0.7, 0.3], seed = 2023)

In [None]:
train = train.withColumn('weightCol', when(col('is_core') == 1, 1 / 0.24).otherwise(1 / 1.76))

# Training

* 1 shot
* cross validation

In [None]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# mllib = RDD ! 
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorSlicer

from pyspark.ml.regression import LinearRegression

## Random forest

### CV and best model selection

In [None]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'is_core', weightCol='weightCol')

In [None]:
paramGrid = (ParamGridBuilder()\
            .addGrid(rf.maxDepth, [5, 10, 15])
            .addGrid(rf.numTrees,[5, 20, 50])
            .build())

crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid,
                             evaluator=MulticlassClassificationEvaluator(labelCol="is_core", predictionCol="prediction", metricName='f1'), #, metricLabel=1.0),
                             numFolds=3)
fitModelRF2 = crossval_rf.fit(train)
BestModelRF2= fitModelRF2.bestModel

In [None]:
BestModelRF.explainParam('maxDepth')

In [None]:
BestModelRF.explainParam('numTrees')

## Logistic regression

### CV and best model selection

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'is_core', weightCol='weightCol')

In [None]:
paramGridLR = (ParamGridBuilder()\
                .addGrid(lr.regParam, [0, 0.01, 0.1])
                .addGrid(lr.elasticNetParam, [0, 0.01, 0.1])
                .build())

crossval_lr = CrossValidator(estimator=lr,
                             estimatorParamMaps=paramGridLR,
                             evaluator=MulticlassClassificationEvaluator(labelCol="is_core", predictionCol="prediction", metricName='f1'), # metricName='truePositiveRateByLabel',  metricLabel=1.0),
                             numFolds=3)
fitModelLR = crossval_lr.fit(train)
BestModelLR= fitModelLR.bestModel

In [None]:
BestModelLR.explainParam('regParam')

In [None]:
BestModelLR.explainParam('elasticNetParam')

# Evaluation

* Features importance
* Accuracy
* Confusion matrix -> because highly unbalanced

In [None]:
def evaluate(predictions, evaluator):
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    print("Accuracy = %s" % (accuracy))
    print("Test Error = %s" % (1.0 - accuracy))
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"}) # Weighted precision and recall metric
    print("F1 score = %s" % (f1))
    p = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"}) # How many retrieved items are relevant : TP / TP + FP
    print("Precision = %s" % (p))
    r = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"}) # How many relevant items are retrieved : TP / TP + FN
    print("Recall = %s" % (r))

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="is_core", predictionCol="prediction")

## Random forest

In [None]:
bestPredictions = BestModelRF.transform(test)

In [None]:
df_features = pd.DataFrame({'importance': BestModelRF.featureImportances.toArray(), 'features': numericCols})
df_features.sort_values('importance', ascending=False)

In [None]:
df_features['text'] = df_features['importance'].apply(lambda i : np.round(i * 100) / 100)
fig = px.bar(df_features.sort_values('importance'), x='importance', y='features', orientation='h', text='text')
fig.update_layout(
     height=600,
                  width=600,
                  yaxis_title=dict(font=dict(size=20), text='Features'),
                  xaxis_title=dict(font=dict(size=20), text='Features importance'),
                yaxis = dict( tickfont = dict(size=20)),
                xaxis = dict( tickfont = dict(size=20)),)

fig.update_traces(textposition='outside', textfont=dict(size=20))
fig.update_xaxes(range=[0, 0.65])
fig.show()
fig.write_image("../figures_report/enrich_dataset/rf_feat.pdf")

In [None]:
evaluate(bestPredictions, evaluator)

In [None]:
preds_and_labels = bestPredictions.select(['prediction','is_core']).withColumn('is_core', col('is_core').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','is_core'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
confmat_rf = metrics.confusionMatrix().toArray()
print(metrics.confusionMatrix().toArray())

In [None]:
print("True Negative Rate % s" % (confmat_rf[0][0] / (confmat_rf[0][0] + confmat_rf[1][0])))

In [None]:
print("True Positive Rate % s" % (confmat_rf[1][1] / (confmat_rf[1][0] + confmat_rf[1][1])))

## Logistic regression

In [None]:
bestPredictionsLR = BestModelLR.transform(test)

In [None]:
df_features_lr = pd.DataFrame({'importance': BestModelLR.coefficients.toArray(), 'features': numericCols})
df_features_lr.sort_values('importance', ascending=False)

In [None]:
df_features_lr['text'] = df_features_lr['importance'].apply(lambda i : np.round(i * 100) / 100)
fig = px.bar(df_features_lr.sort_values('importance'), x='importance', y='features', orientation='h', text='text')
fig.update_layout(
     height=600,
                  width=600,
                  yaxis_title=dict(font=dict(size=20), text='Features'),
                  xaxis_title=dict(font=dict(size=20), text='Features importance'),
                yaxis = dict( tickfont = dict(size=20)),
                xaxis = dict( tickfont = dict(size=20)),)

fig.update_traces(textposition='outside', textfont=dict(size=20))
fig.update_xaxes(range=[-5, 9])
fig.show()
fig.write_image("../figures_report/enrich_dataset/lr_feat.pdf")

In [None]:
evaluate(bestPredictionsLR, evaluator)

preds_and_labels = bestPredictionsLR.select(['prediction','is_core']).withColumn('is_core', col('is_core').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','is_core'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
confmat = metrics.confusionMatrix().toArray()
print("True Negative Rate % s" % (confmat[0][0] / (confmat[0][0] + confmat[1][0])))
print("True Positive Rate % s" % (confmat[1][1] / (confmat[1][0] + confmat[1][1])))

# Linear Regression

## Train

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression

In [None]:
glr = GeneralizedLinearRegression(featuresCol = 'features', labelCol='is_core', weightCol='weightCol', family='binomial', link='logit')
glr_model = glr.fit(train)

## Evaluate

In [None]:
glr_model.summary

In [None]:
print(f"$\chi^2$ test : {glr_model.summary.nullDeviance - glr_model.summary.deviance}")