In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType, TimestampType
import os

# Initialize the SparkSession
spark = SparkSession.builder \
    .appName("FrontpagePrediction") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.cores", "2") \
    .getOrCreate()


# Define the data schema from the json
schema = StructType([
    StructField("aid", StringType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("votes", LongType(), True),
    StructField("user", StringType(), True),
    StructField("posted_at", StringType(), True),
    StructField("comments", LongType(), True),
    StructField("source_title", StringType(), True),
    StructField("source_text", StringType(), True),
    StructField("frontpage", BooleanType(), True)
])


In [9]:
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Read data from local
data = spark.read.schema(schema).json("/Users/xiaodi/anaconda3/spark/notebooks/A data/*/part*")

# Drop the Nan value
data = data.na.drop()

# Convert frontpage from boolean to string
data = data.withColumn("frontpage", col("frontpage").cast("string"))

# Add a new feature 'text_length'
data = data.withColumn("text_length", length(col("source_text")))

# Tokenize the 'source_text' column into words
tokenizer = Tokenizer(inputCol="source_text", outputCol="words")

# Remove stopwords from the tokenized words
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Compute term frequencies using HashingTF
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="tf_features", numFeatures=10000)

# Compute inverse document frequencies to get TF-IDF features
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")

# Convert 'domain' into numerical feature
domain_indexer = StringIndexer(inputCol="domain", outputCol="domain_index", handleInvalid="keep")

# One-hot encode 'domain' column
domain_encoder = OneHotEncoder(inputCol="domain_index", outputCol="domain_vec")

# Convert 'frontpage' into numerical labels
label_indexer = StringIndexer(inputCol="frontpage", outputCol="label", handleInvalid="keep")

# Assemble all the feature columns into a single feature vector
assembler = VectorAssembler(
    inputCols=["tfidf_features", "text_length", "votes", "comments", "domain_vec"],
    outputCol="features"
)

# Create a pipeline for future data preprocessing
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf, domain_indexer, domain_encoder, assembler, label_indexer])

# Check the ratio of positive and negative samples
positive_samples = data.filter(col("frontpage") == "true").count()
negative_samples = data.filter(col("frontpage") == "false").count()

print(f"Positive samples: {positive_samples}, Negative samples: {negative_samples}")

# If the samples are imbalanced, perform upsampling or downsampling
if positive_samples < negative_samples:
    # Upsample positive samples
    ratio = negative_samples / positive_samples
    sampled_positive_data = data.filter(col("frontpage") == "true").sample(withReplacement=True, fraction=ratio)
    balanced_data = sampled_positive_data.union(data.filter(col("frontpage") == "false"))
else:
    # Downsample negative samples
    ratio = positive_samples / negative_samples
    sampled_negative_data = data.filter(col("frontpage") == "false").sample(withReplacement=True, fraction=ratio)
    balanced_data = sampled_negative_data.union(data.filter(col("frontpage") == "true"))

# Fit the pipeline model
pipeline_model = pipeline.fit(balanced_data)
balanced_data = pipeline_model.transform(balanced_data)

# Split the dataset into training and test sets
train_data, test_data = balanced_data.randomSplit([0.8, 0.2], seed=42)

# Initialize the Gradient Boosted Trees classifier
gbt = GBTClassifier(featuresCol="features", labelCol="label")

# Train the model and make predictions
gbt_model = gbt.fit(train_data)
predictions = gbt_model.transform(test_data)
predictions.select("source_text", "probability", "prediction", "label").show(n=20)

# Initialize the evaluator for binary classification
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

# Calculate the ROC
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print("Test Area Under ROC: ", accuracy)

# Calculate other evaluation metrics
tp = predictions[(predictions.label == 1) & (predictions.prediction == 1)].count()  # True Positives
tn = predictions[(predictions.label == 0) & (predictions.prediction == 0)].count()  # True Negatives
fp = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()  # False Positives
fn = predictions[(predictions.label == 1) & (predictions.prediction == 0)].count()  # False Negatives

precision = tp / (tp + fp) if tp + fp != 0 else 0  # Precision
recall = tp / (tp + fn) if tp + fn != 0 else 0  # Recall
f1 = 2 * precision * recall / (precision + recall) if precision + recall != 0 else 0  # F1 Score

print("Precision: ", precision)
print("Recall: ", recall)
print("F1 Score: ", f1)

# Save Pipeline and Model
pipeline_model.write().overwrite().save("/Users/xiaodi/anaconda3/spark/notebooks/saved_pipeline")
gbt_model.write().overwrite().save("/Users/xiaodi/anaconda3/spark/notebooks/saved_gbt_model")

                                                                                

Positive samples: 650, Negative samples: 3403


24/05/25 20:33:43 WARN DAGScheduler: Broadcasting large task binary with size 1315.3 KiB
24/05/25 20:33:45 WARN DAGScheduler: Broadcasting large task binary with size 1315.4 KiB
24/05/25 20:33:57 WARN DAGScheduler: Broadcasting large task binary with size 1419.8 KiB
24/05/25 20:34:31 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:34:48 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:34:52 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:34:57 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:35:02 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:35:08 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:35:13 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:35:17 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:35:22 WARN DAGScheduler: Broadcas

24/05/25 20:44:17 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:44:24 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:44:35 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/05/25 20:44:48 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/25 20:44:53 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/25 20:44:58 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/25 20:45:05 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/25 20:45:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/05/25 20:45:25 WARN DAGScheduler: Broadcasting large task binary with size 1402.2 KiB
24/05/25 20:45:27 WARN DAGScheduler: Broadcasting large task binary with size 1402.2 KiB
                                                                                

+--------------------+--------------------+----------+-----+
|         source_text|         probability|prediction|label|
+--------------------+--------------------+----------+-----+
|Android 15 can te...|[0.84810569857494...|       0.0|  0.0|
|Federal Appeals C...|[0.93147330635496...|       0.0|  0.0|
|Federal Appeals C...|[0.93147330635496...|       0.0|  0.0|
|New Video of Stro...|[0.94334250077501...|       0.0|  0.0|
|New Video of Stro...|[0.94334250077501...|       0.0|  0.0|
|Google Common Lis...|[0.94221426536922...|       0.0|  0.0|
|Google Common Lis...|[0.94221426536922...|       0.0|  0.0|
|Russian TOR-M2U A...|[0.94221426536922...|       0.0|  0.0|
|Russian TOR-M2U A...|[0.94221426536922...|       0.0|  0.0|
|The invisible sea...|[0.94334250077501...|       0.0|  0.0|
|Inside the disinf...|[0.89022944107950...|       0.0|  0.0|
|Amazon ebooks: Ar...|[0.94334250077501...|       0.0|  0.0|
|Amazon ebooks: Ar...|[0.94334250077501...|       0.0|  0.0|
|NetBSD turns 30 a...|[0

24/05/25 20:45:29 WARN DAGScheduler: Broadcasting large task binary with size 1397.9 KiB
                                                                                

Test Area Under ROC:  0.9882362779223767


24/05/25 20:45:44 WARN DAGScheduler: Broadcasting large task binary with size 1403.0 KiB
24/05/25 20:45:59 WARN DAGScheduler: Broadcasting large task binary with size 1403.0 KiB
24/05/25 20:46:14 WARN DAGScheduler: Broadcasting large task binary with size 1403.0 KiB
24/05/25 20:46:28 WARN DAGScheduler: Broadcasting large task binary with size 1403.0 KiB
                                                                                

Precision:  1.0
Recall:  0.9252615844544095
F1 Score:  0.9611801242236024
