# F5.news Trending News - Machine Learning Exploration

- News Article Sentiment
- Predict Trending Topics
- Topic Categorization

### Installs & Imports

In [1]:
%pip install -q -U boto3 hvac mlflow "pyspark==3.2.4" python-dotenv "pymongo[srv]"

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import hvac
import mlflow

from dotenv import load_dotenv
from datetime import datetime, timedelta

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col

load_dotenv()

True

### Connect to Vault for Mongo connection values

In [3]:
client = hvac.Client(
    url=os.environ.get('VAULT_ADDR'),
    token=os.environ.get('VAULT_TOKEN'),
)

print(client.is_authenticated())

if client.is_authenticated():
    try:
        secret_resp = client.secrets.kv.v2.read_secret_version(
            mount_point='kv', 
            path='f5.news', 
            raise_on_deleted_version=False
        )
        
        if secret_resp['data'] is not None:
            secret_values = secret_resp['data']['data']
            for secret, value in secret_values.items():
                os.environ[str(secret)] = str(value)
        else:
            print("The secret does not exist.")
    except hvac.exceptions.InvalidPath:
        print("The path is invalid or the permission is denied.")
    except hvac.exceptions.Forbidden:
        print("The permission is denied.")
    except hvac.exceptions.VaultError as e:
        print(f"Vault error occurred: {e}")
else:
    print("Failed to connect to HashiVault")

True


### Configs

In [4]:
# General
DEBUG = False
REG_PARAM_VALUE = 0.1 # Experimenting with this value can improve final accuracy
MAX_ITER = 20
DATASET_SPLIT = [0.85, 0.15] # Portion of data to split between training and test datasets
os.environ["PYSPARK_PIN_THREAD"] = "false" # TODO: Move to .env

# Spark

SPARK_MASTER = "spark://localhost:7077"
SPARK_MEMORY = "4g"

# Mongo
URI = os.environ['mongo_uri']
DATABASE = os.environ['database']
COLLECTION = os.environ['collection']

# MLflow
MLFLOW_API = "http://localhost:5000"
EXPERIMENT_NAME = "f5news_upvote_bucket_prediction"

# Minio S3
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://localhost:9000"
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = "minio123" # TODO: Move all of these to .env

### Pull F5 records using pymongo client

In [5]:
# Create a new client and connect to the server
client = MongoClient(URI, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Successfully connected to MongoDB...")
except Exception as e:
    print(e)

try:
    database = client[DATABASE]
    collection = database[COLLECTION]

    # Query all documents in the collection
    documents = collection.find({"sub": "politics"}).sort({"upvoteCount": -1, "fetchedAt": -1})

    if(DEBUG == True):
        # Iterate over the cursor to access the documents
        for doc in documents:
            print(doc["title"])
            print(doc["fetchedAt"])
            print(doc["upvoteCount"], "upvotes")
            print()
    else:
        print("Mongo documents loaded successfully!")
except Exception as e:
    print(e)

Successfully connected to MongoDB...
Mongo documents loaded successfully!


### Setup MLflow runner

In [6]:
global_run_name = None
start_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# Set MLflow configs
mlflow.set_tracking_uri(MLFLOW_API)
mlflow.set_experiment(EXPERIMENT_NAME)

def start_mlflow_run(run_name: str = None):
    global global_run_name, start_time
    if run_name is None:
        run_name = start_time
    else:
        run_name = run_name + start_time
    global_run_name = run_name
    mlflow.start_run(run_name=run_name, description=EXPERIMENT_NAME)

### Connect to Spark and load dataset

In [7]:
# Create MLflow Run Instance
start_mlflow_run()

# Log parameters
start_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
mlflow.log_param("start_time", start_time)

try:
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("F5news") \
        .master(SPARK_MASTER) \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.mlflow:mlflow-spark:2.8.1") \
        .getOrCreate()
    
    # Setup Spark AutoLog
    mlflow.autolog()

    # Get Spark version
    spark_version = spark.version
    print("Spark Version:", spark_version)

    # Get the SparkContext from the SparkSession
    sc = spark.sparkContext

    # Get the master URL from the SparkContext
    master_url = sc.master

    # Check if the master URL indicates local mode or a specific cluster mode
    if "local" in master_url:
        print("PySpark is running in local mode.")
    else:
        print("PySpark is running in cluster mode with master URL:", master_url)

    # Load data from MongoDB into a DataFrame
    df = spark.read.format("mongo").option("uri", URI).option("database", DATABASE).option("collection", COLLECTION).load()
    print("Data loaded successfully from MongoDB!")
except Exception as e:
    # Error occurred during data loading or model training
    print("Error:", str(e))

    # Stop SparkSession
    spark.stop()

    # End MLflow run
    mlflow.end_run()

Ivy Default Cache set to: /home/mgmtadmin/.ivy2/cache
The jars for the packages stored in: /home/mgmtadmin/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.mlflow#mlflow-spark added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-162f9160-45e1-43a6-9807-a3c7f8ce9671;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.mlflow#mlflow-spark;2.8.1 in central
	found org.slf4j#slf4j-api;1.7.25 in central
:: resolution report :: resolve 83ms :: artifacts dl 4ms
	:: modules in use:
	org.mlflow#mlflow-spark;2.8.1 from central in [default]
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.sp

:: loading settings :: url = jar:file:/home/mgmtadmin/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


24/03/17 04:34:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024/03/17 04:34:37 INFO mlflow.spark.autologging: Autologging successfully enabled for spark.
2024/03/17 04:34:37 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2024/03/17 04:34:37 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


Spark Version: 3.2.4
PySpark is running in local mode.


                                                                                

Data loaded successfully from MongoDB!


### Show Loaded Data

In [8]:
if DEBUG:
    df.show(5,truncate=False)

### Filter Out Recent Posts

In [9]:
# Get document initial count
print('Documents Loaded:', df.count())
mlflow.log_param("loaded_documents", df.count())

# Convert to SQL for familiar data query ability
df.createOrReplaceTempView("temp")
df = spark.sql("SELECT title, upvoteCount, fetchedAt from temp") 

# Filter out new posts
oneDayAgo = d = datetime.today() - timedelta(days=1)
df = df.filter(df.fetchedAt < oneDayAgo)
print('Total Filtered Documents:', df.count())

mlflow.log_param("filtered_documents", df.count())

                                                                                

Documents Loaded: 5520


                                                                                

Total Filtered Documents: 5205


5205

### Bucketize by Upvote Count

In [10]:
def upvoteCategorizer(upvotes):
    if upvotes < 1000:
        return "0-999"
    if upvotes < 5000:
        return "1000-4999"
    if upvotes < 10000:
        return "5000-9999"
    elif upvotes < 25000:
        return "10000-24999"
    elif upvotes < 50000:
        return "25000-49000"
    else: 
        return "50000+"
    
bucket_udf = udf(upvoteCategorizer, StringType() )
df = df.withColumn("bucket", bucket_udf("upvoteCount"))

if DEBUG:
    df.groupBy("bucket").count().orderBy(col("count").desc()).show()

### Preview a Random Sample of Bucketized Dataset

In [11]:
if DEBUG:
    sample_count = 10
    pandas_random_sample = df.toPandas().sample(n=sample_count) # Convert to pandas dataframe to take sample
    pyspark_random_sample = spark.createDataFrame(pandas_random_sample) # Convert back to pyspark dataframe
    pyspark_random_sample.show()

### Define Data Prep Pipeline Steps

- **Regular Expression Tokenizer**: Breaks title into array of words via regex
- **Stop Words Remover**: Removes undesireable words from Regex Tokenizer output
- **Bag of Words Counter**: Creates vector representation of the array of words extracted from original title string
- **Create Label**: Maps all possible values in bucket columns to numeric values (their index position in an array of unique bucket values)

In [12]:
# Regular Expression Tokenizer
regexTokenizer = RegexTokenizer(inputCol="title", outputCol="words", pattern="\\W")

# Stop Words Remover
add_stopwords = ["http","https","amp","rt","t","c","the"] # TODO: Update stopwords to match dataset
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# Bag of Words Counter
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

# Create Label
label_stringIdx = StringIndexer(inputCol = "bucket", outputCol = "label")

### Assemble Data Prep Pipeline

Creates the `features` columns. We split titles to words, remove the words we don't want, vectorize the resulting array of words, then label based on bucket column.

In [13]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

### Run the Data Prep Pipeline

In [14]:
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)

24/03/17 04:34:52 WARN StringIndexerModel: Input column bucket does not exist during transformation. Skip StringIndexerModel for this column.
24/03/17 04:34:52 WARN StringIndexerModel: Input column bucket does not exist during transformation. Skip StringIndexerModel for this column.
24/03/17 04:34:52 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.m

### Preview Dataset Before Training

In [15]:
if DEBUG:
    dataset.show(5)

### Split Data into Training and Test datasets

In [16]:
(trainingData, testData) = dataset.randomSplit(DATASET_SPLIT, seed = 123456)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

mlflow.log_metric("trainingData", trainingData.count())
mlflow.log_metric("testData", testData.count())

                                                                                

Training Dataset Count: 4444
Test Dataset Count: 761


                                                                                

### Train a Logistic Regression Model (with CrossValidation)

In [17]:
# Init linear regression model with column names
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Create ParamGrid for Cross Validation
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
    .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
    .addGrid(lr.maxIter, [10, 20, 50]) #Number of iterations
    .build()
)

# define evaluator for cross validator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Create 5-fold CrossValidator
cv = CrossValidator(
    estimator=lr, \
    estimatorParamMaps=paramGrid, \
    evaluator=evaluator, \
    numFolds=5
)

# Train using cross validator and pick the best model
lr_model = cv.fit(trainingData) # TODO: How do we extract the params that were chose as the 'best_model'

24/03/17 04:36:03 WARN TaskSetManager: Lost task 0.0 in stage 45.0 (TID 33) (172.31.0.7 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 2878, in pipeline_func
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 636, in func
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/mlflow/data/spark_dataset.py", line -1, in <lambda>
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/mlflow/data/spark_dataset.py", line -1, in <genexpr>
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in load_stream
    yield self._read_with_length(stream)
  File "/opt/bitn

### Evaluate the Model Using Test Data

- **Bucket 1**: 0 - 999 upvotes
- **Bucket 2**: 1,000 - 4,999 upvotes
- **Bucket 3**: 5,000 - 9,999 upvotes
- **Bucket 4**: 10,000 - 24,999 upvotes
- **Bucket 5**: 25,000 - 49,999 upvotes
- **Bucket 6**: > 50,000 upvotes

In [18]:
# Make Predictions for entire test data set
predictions = lr_model.transform(testData)

# Show a few predictions
# - change filter params such as prediction == 1 # TODO: Document what this does
if DEBUG:
    predictions.filter(predictions['prediction'] == 1).select("title","bucket","probability","label","prediction") \
    .orderBy("probability", ascending=False).show(n = 10, truncate = 50)

# Calculate & Log RMSE
rmse = predictions.selectExpr("sqrt(avg(pow(label - prediction, 2))) as RMSE").collect()[0]["RMSE"]
print("Root Mean Squared Error (RMSE) on Test Data:", rmse) # TODO: Determine output label
mlflow.log_metric("rmse", rmse)

# Calculate & Log Accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
lr_accuracy = evaluator.evaluate(predictions)
print("Logistical Regression Accuracy:", lr_accuracy)
mlflow.log_metric("lr_accuracy", lr_accuracy)

                                                                                

Root Mean Squared Error (RMSE) on Test Data: 0.7586557029970062


24/03/17 04:39:52 WARN TaskSetManager: Lost task 0.0 in stage 4116.0 (TID 4104) (172.31.0.6 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
    out_iter = func(split_index, iterator)
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 3070, in pipeline_func
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/pyspark/rdd.py", line 636, in func
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/mlflow/data/spark_dataset.py", line -1, in <lambda>
  File "/home/mgmtadmin/.local/lib/python3.10/site-packages/mlflow/data/spark_dataset.py", line -1, in <genexpr>
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in load_stream
    yield self._read_with_length(stream)
  File "/opt/

Logistical Regression Accuracy: 0.7152008983922494


### Log Final Model to MLflow

In [19]:
# Log trained model
mlflow.spark.log_model(lr_model, "model")



### Save the Final Model to Disk

In [None]:
top_level_dir = "models"
os.makedirs(top_level_dir, exist_ok=True)

model_dir = os.path.join(top_level_dir, EXPERIMENT_NAME)
os.makedirs(model_dir, exist_ok=True)

lr_model.save(os.path.join(model_dir, start_time))

### Close Out Sessions

In [None]:
# Stop SparkSession
try:
    spark.stop()
except:
    pass

# End MLflow run
mlflow.end_run()