In [0]:
from pyspark.sql.functions import col, max
df = spark.read.table("default.full_data")

df = df.withColumn("frontpage", col("frontpage").cast("integer"))
df.groupBy('frontpage').count().show()
# Group by 'aid' and find the maximum values for 'comment', 'vote', and 'frontpage'
max_values_df = df.groupBy("aid").agg(
    max("comments").alias("max_comments"),
    max("votes").alias("max_votes"),
    max("frontpage").alias("max_frontpage")
)

# Join the original DataFrame with max_values_df
joined_df = df.join(max_values_df, "aid")

# Filter out rows where 'comment', 'vote', or 'frontpage' are lower than the maximum values
filtered_df = joined_df.filter(
    (col("comments") == col("max_comments")) &
    (col("votes") == col("max_votes")) &
    (col("frontpage") == col("max_frontpage"))
)

df = filtered_df.dropDuplicates(["aid"])

df.show()
# Show the resulting DataFrame
df.groupBy('frontpage').count().show()

+---------+-----+
|frontpage|count|
+---------+-----+
|        1| 2129|
|        0| 9252|
+---------+-----+

+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----+------------+---------+-------------+
|     aid|comments|              domain|frontpage|          posted_at|         source_text|        source_title|               title|                 url|         user|votes|max_comments|max_votes|max_frontpage|
+--------+--------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----+------------+---------+-------------+
|39949266|       0|       economist.com|        0|2024-04-06 01:40:43|The AI doctor wil...|The AI doctor wil...|The AI doctor wil...|https://www.econo...|        jdkee|    1|           0|        1|            0|
|39949302|       2|       bloomberg.com|   

In [0]:
from pyspark.sql.functions import col, hour, udf
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer, StopWordsRemover
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Convert posted_at to timestamp
df = df.withColumn("posted_at", col("posted_at").cast("timestamp"))
df = df.withColumn("frontpage", col("frontpage").cast("integer"))


# Extract hour from posted_at
hour_udf = udf(lambda x: x.hour if x else None, IntegerType())
df = df.withColumn("posted_hour", hour_udf(col("posted_at")))
df = df.fillna({"title": "", "source_text": "", "posted_at": ""})


# Impute missing values for numerical columns
imputer_numeric = Imputer(inputCols=["comments","votes"], outputCols=["comments_imputed","votes_imputed"], strategy="mean")

# Standardize numerical columns
assembler_numeric = VectorAssembler(inputCols=["comments_imputed", "votes_imputed"], outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="numeric_features_scaled")

#Text-mining
tokenizer_source_text = Tokenizer(inputCol="source_text", outputCol="source_text_words")
remover_source_text = StopWordsRemover(inputCol="source_text_words", outputCol="source_text_filtered")
word2vec_source_text = Word2Vec(inputCol="source_text_filtered", outputCol="source_text_w2v", vectorSize=128)

tokenizer_title = Tokenizer(inputCol="title", outputCol="title_words")
remover_title = StopWordsRemover(inputCol="title_words", outputCol="title_filtered")
word2vec_title = Word2Vec(inputCol="title_filtered", outputCol="title_w2v", vectorSize=128)

# Assemble all features

# Assemble all features
assembler_all = VectorAssembler(inputCols=["posted_hour", "numeric_features_scaled", "source_text_w2v", "title_w2v"],
                                outputCol="features")

# Create the pipeline
pipeline = Pipeline(stages=[imputer_numeric, assembler_numeric, scaler, 
                             tokenizer_source_text, remover_source_text, word2vec_source_text,
                             tokenizer_title, remover_title, word2vec_title,
                             assembler_all])


# Split the data into training, validation, and test sets
train_pre_mod, test = df.randomSplit([0.7, 0.3], seed=42)

# Split the DataFrame into majority class (frontpage=0) and minority class (frontpage=1)
majority_df = train_pre_mod.filter(col("frontpage") == 0)
minority_df = train_pre_mod.filter(col("frontpage") == 1)

# Sample the majority class DataFrame to match the size of the minority class DataFrame
undersampled_majority_df = majority_df.sample(withReplacement=False, fraction=1/4, seed=42)

# Concatenate the sampled majority class DataFrame with the minority class DataFrame
train = undersampled_majority_df.union(minority_df)

# Define models
lr = LogisticRegression(featuresCol="features", labelCol="frontpage")
rf = RandomForestClassifier(featuresCol="features", labelCol="frontpage")
gbt = GBTClassifier(featuresCol="features", labelCol="frontpage")

# Create param grids for hyperparameter tuning

paramGridLR = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.1, 0.01])
               .addGrid(lr.elasticNetParam, [0.0, 0.5])  # Adding elasticNetParam
               .addGrid(lr.maxIter, [10, 20])
               .build())

paramGridRF = (ParamGridBuilder()
               .addGrid(rf.numTrees, [10, 20])  # Adding numTrees
               .addGrid(rf.maxDepth, [5, 10])        # Adding maxDepth
               .build())

paramGridGBT = (ParamGridBuilder()
                .addGrid(gbt.maxIter, [10, 20])   # Adding maxIter
                .addGrid(gbt.maxDepth, [5, 10])       # Adding maxDepth
                .build())

# Define evaluators
evaluator = MulticlassClassificationEvaluator(labelCol='frontpage', predictionCol='prediction', metricName='f1')

# Define cross-validators
crossvalLR = CrossValidator(estimator=lr,
                            estimatorParamMaps=paramGridLR,
                            evaluator=evaluator,
                            numFolds=5)

crossvalRF = CrossValidator(estimator=rf,
                            estimatorParamMaps=paramGridRF,
                            evaluator=evaluator,
                            numFolds=5)

crossvalGBT = CrossValidator(estimator=gbt,
                             estimatorParamMaps=paramGridGBT,
                             evaluator=evaluator,
                             numFolds=5)

# Create pipeline models with cross-validation
pipelineLR = Pipeline(stages=[pipeline, crossvalLR])
pipelineRF = Pipeline(stages=[pipeline, crossvalRF])
pipelineGBT = Pipeline(stages=[pipeline, crossvalGBT])

# Train models
modelLR = pipelineLR.fit(train)
modelRF = pipelineRF.fit(train)
modelGBT = pipelineGBT.fit(train)

# Evaluate models
predictionsLR = modelLR.transform(train)
predictionsRF = modelRF.transform(train)
predictionsGBT = modelGBT.transform(train)

aucLR = evaluator.evaluate(predictionsLR)
aucRF = evaluator.evaluate(predictionsRF)
aucGBT = evaluator.evaluate(predictionsGBT)

print(f"Logistic Regression AUC: {aucLR}")
print(f"Random Forest AUC: {aucRF}")
print(f"Gradient Boosted Trees AUC: {aucGBT}")

test_predictionsLR = modelLR.transform(test)
test_predictionsRF = modelRF.transform(test)
test_predictionsGBT = modelGBT.transform(test)

test_aucLR = evaluator.evaluate(test_predictionsLR)
test_aucRF = evaluator.evaluate(test_predictionsRF)
test_aucGBT = evaluator.evaluate(test_predictionsGBT)

print(f"Test Logistic Regression AUC: {test_aucLR}")
print(f"Test Random Forest AUC: {test_aucRF}")
print(f"Test Gradient Boosted Trees AUC: {test_aucGBT}")

Logistic Regression AUC: 0.8941716278027485
Random Forest AUC: 0.9517453976078563
Gradient Boosted Trees AUC: 0.9563703659366094
Test Logistic Regression AUC: 0.9439384913156973
Test Random Forest AUC: 0.9431844195460382
Test Gradient Boosted Trees AUC: 0.9479390377520399


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="frontpage")
multi_evaluator = MulticlassClassificationEvaluator(labelCol='frontpage', predictionCol='prediction')

# Calculate metrics
def evaluate_model(predictions, model_name):
    auc = evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'})
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: 'accuracy'})
    f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: 'f1'})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: 'recallByLabel'})
    return {
        'Model': model_name,
        'AUC': auc,
        'Accuracy': accuracy,
        'F1 Score': f1,
        'Recall': recall
    }

# Evaluate all models
metricsLR = evaluate_model(test_predictionsLR, 'Logistic Regression')
metricsRF = evaluate_model(test_predictionsRF, 'Random Forest')
metricsGBT = evaluate_model(test_predictionsGBT, 'Gradient-Boosted Trees')

# Display comparison
import pandas as pd

results = pd.DataFrame([metricsLR, metricsRF, metricsGBT])
print(results)

# Show confusion matrix for each model
from sklearn.metrics import confusion_matrix

def compute_confusion_matrix(predictions):
    y_true = predictions.select("frontpage").rdd.flatMap(lambda x: x).collect()
    y_pred = predictions.select("prediction").rdd.flatMap(lambda x: x).collect()
    return confusion_matrix(y_true, y_pred)

cmLR = compute_confusion_matrix(test_predictionsLR)
cmRF = compute_confusion_matrix(test_predictionsRF)
cmGBT = compute_confusion_matrix(test_predictionsGBT)

print(f"Confusion Matrix for Logistic Regression:\n{cmLR}")
print(f"Confusion Matrix for Random Forest:\n{cmRF}")
print(f"Confusion Matrix for Gradient-Boosted Trees:\n{cmGBT}")


                    Model       AUC  Accuracy  F1 Score    Recall
0     Logistic Regression  0.981078  0.944796  0.943938  0.975197
1           Random Forest  0.986876  0.940724  0.943184  0.930665
2  Gradient-Boosted Trees  0.984733  0.945701  0.947939  0.934047
Confusion Matrix for Logistic Regression:
[[1730   44]
 [  78  358]]
Confusion Matrix for Random Forest:
[[1651  123]
 [   8  428]]
Confusion Matrix for Gradient-Boosted Trees:
[[1657  117]
 [   3  433]]


In [0]:
# Save the model to DBFS
model_path_LR = "/dbfs/models/logistic_regression_modelw2v"
model_path_RF = "/dbfs/models/random_forest_modelw2v"
model_path_GBT = "/dbfs/models/gradient_boosted_modelw2v"
modelLR.write().overwrite().save(model_path_LR)
modelRF.write().overwrite().save(model_path_RF)
modelGBT.write().overwrite().save(model_path_GBT)


In [0]:
# Move model from DBFS to workspace directory
dbutils.fs.cp(model_path_LR, "dbfs:/FileStore/models/logistic_regression_modelw2v", recurse=True)
dbutils.fs.cp(model_path_RF, "dbfs:/FileStore/models/random_forest_modelw2v", recurse=True)
dbutils.fs.cp(model_path_GBT, "dbfs:/FileStore/models/gradient_boosted_modelw2v", recurse=True)


True

In [0]:
from pyspark.ml.pipeline import PipelineModel
import shap
# Load the pipeline model from DBFS
pipeline_model = PipelineModel.load("dbfs:/FileStore/models/gradient_boosted_modelw2v")
model = pipeline_model.stages[-1]
# Generate SHAP values using the loaded pipeline model
explainer = shap.Explainer(model)
shap_values = explainer(test)

# Continue with the remaining code
shap.summary_plot(shap_values)
# shap.plots.waterfall(shap_values[1])
# shap.initjs()
# shap.plots.force(shap_values[0])
# shap.plots.force(shap_values[1])
# shap.plots.force(shap_values[:500])
# shap.plots.beeswarm(shap_values)
# shap.plots.bar(shap_values)
# shap.plots.scatter(shap_values[:, "Handset_9"], color=shap_values)

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-777522522509054>, line 7[0m
[1;32m      5[0m model [38;5;241m=[39m pipeline_model[38;5;241m.[39mstages[[38;5;241m-[39m[38;5;241m1[39m]
[1;32m      6[0m [38;5;66;03m# Generate SHAP values using the loaded pipeline model[39;00m
[0;32m----> 7[0m explainer [38;5;241m=[39m shap[38;5;241m.[39mExplainer(model)
[1;32m      8[0m shap_values [38;5;241m=[39m explainer(test)
[1;32m     10[0m [38;5;66;03m# Continue with the remaining code[39;00m

File [0;32m/databricks/python/lib/python3.10/site-packages/shap/explainers/_explainer.py:174[0m, in [0;36mExplainer.__init__[0;34m(self, model, masker, link, algorithm, output_names, feature_names, linearize_link, seed, **kwargs)[0m
[1;32m    170[0m             algorithm [38;5;241m=[39m [38;5;124m"[39m[38;5;124mpermutation[

In [0]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [0]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import IntegerType
from pyspark.ml.pipeline import PipelineModel
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pipeline_model = PipelineModel.load("dbfs:/FileStore/models/gradient_boosted_modelw2v")

def process(time, rdd):
    if rdd.isEmpty():
        return

    df = spark.read.json(rdd)
    print("========= %s =========" % str(time))    
    # Convert posted_at to timestamp
    df = df.withColumn("posted_at", col("posted_at").cast("timestamp"))
    df = df.withColumn("frontpage", col("frontpage").cast("integer"))

    # Extract hour from posted_at
    hour_udf = udf(lambda x: x.hour if x else None, IntegerType())
    df = df.withColumn("posted_hour", hour_udf(col("posted_at")))
    df = df.fillna({"title": "", "source_text": "", "posted_at": ""})

    #some preprocessing done when training model
    #applying the pipeline to it
    df_with_preds = pipeline_model.transform(df)
    df_with_preds.select('features','frontpage','predictions').show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
    [0;31m[... skipping hidden 1 frame][0m

File [0;32m<command-777522522509101>, line 8[0m
[1;32m      7[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m [38;5;28;01mimport[39;00m SparkSession
[0;32m----> 8[0m spark [38;5;241m=[39m SparkSession[38;5;241m.[39mbuilder[38;5;241m.[39mgetOrCreate()
[1;32m      9[0m pipeline_model [38;5;241m=[39m PipelineModel[38;5;241m.[39mload([38;5;124m"[39m[38;5;124mdbfs:/FileStore/models/gradient_boosted_modelw2v[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/sql/session.py:544[0m, in [0;36mSparkSession.Builder.getOrCreate[0;34m(self)[0m
[1;32m    542[0m [38;5;28;01melse[39;00m:
[1;32m    543[0m     [38;5;28mgetattr[39m(
[0;32m--> 544[0m         [38;5;28;43mgetattr[

In [0]:
ssc = StreamingContext(sc, 10)

In [0]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [0]:
ssc_t = StreamingThread(ssc)
ssc_t.start()