# Installations
1. Pyspark
2. Confluent Kafka
3. News API
4. Sckit-learn
5. Pandas
6. Numpy
7. Matplotlib
8. PyTorch
9. Transformers
10. SpaCy
11. NLTK

In [None]:
!pip install pyspark
!pip install confluent_kafka
!pip install newsapi-python
!pip install scikit-learn
!pip install pandas
!pip install numpy
!pip install matplotlib
!pip install torch
!pip install transformers
!pip install spacy
!python -m spacy download en_core_web_sm
!pip install nltk

Collecting confluent_kafka
  Downloading confluent_kafka-2.6.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (2.3 kB)
Downloading confluent_kafka-2.6.0-cp310-cp310-manylinux_2_28_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.6.0
Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl.metadata (1.2 kB)
Downloading newsapi_python-0.2.7-py2.py3-none-any.whl (7.9 kB)
Installing collected packages: newsapi-python
Successfully installed newsapi-python-0.2.7
Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m64.5 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installatio

In [None]:




# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and Install Apache Spark
# Download and Install Apache Spark with progress
!wget -q --show-progress https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz


# Install Findspark
!pip install -q findspark




In [None]:
import os

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


In [None]:
import findspark

# Initialize findspark
findspark.init()


# Kafka and NewsAPI Setup

## Setup NewsAPI Client

In [None]:
# Import the NewsAPI library
from newsapi import NewsApiClient

# Step 1: Initialize the NewsAPI client with your API key
newsapi = NewsApiClient(api_key='69cf4a552be3454480899c38dd924e74')  # Replace with your actual NewsAPI key

# Initialize global variables to keep track of page number and articles within a page
page_num = 1
articles_on_page = []
article_index = 0

# Step 2: Function to fetch one article at a time using pagination
def fetch_single_article():
    global page_num, articles_on_page, article_index

    # If we have fetched all articles from the current page, fetch the next page
    if article_index >= len(articles_on_page):
        # Fetch the next page of articles
        response = newsapi.get_top_headlines(language='en', page_size=5, page=page_num)

        # Extract articles from the response
        articles_on_page = response['articles']
        page_num += 1  # Move to the next page in the next call
        article_index = 0  # Reset article index for the new page

    # Extract the next article from the list
    article = articles_on_page[article_index]
    article_index += 1  # Move to the next article in the list

    # Extract title, description, and content
    title = article.get('title', 'No title available')
    description = article.get('description', 'No description available')
    content = article.get('content', 'No content available')

    # Return the article as a dictionary
    return {
        'title': title,
        'description': description,
        'content': content
    }

## Configure Confluent Kafka Producer



In [None]:


# Import Kafka Producer
from confluent_kafka import Producer

# Step 1: Configure Kafka producer
conf = {
    'bootstrap.servers': 'pkc-12576z.us-west2.gcp.confluent.cloud:9092',  # Replace with your Confluent Cloud Kafka bootstrap server
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username':  'FUWTPXPMVYH3PZMX',  # Replace with your Confluent Cloud API Key
    'sasl.password': '3uj19vFPs2WvHC7jaqlaXerAQPbN5HRz/kYsc4tz8DGi1ubTCtjL1MHPU0fJEGww',  # Replace with your Confluent Cloud API Secret
    'client.id': 'news-producer'
}

# Create Kafka producer
producer = Producer(conf)

# Callback function for Kafka delivery reports
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')


## Sends News Data to Kafka

In [None]:
import json

# Step 2: Function to send news article to Kafka
def produce_news_to_kafka():
    topic = 'topic_0'  # Replace with your Kafka topic name

    # Fetch a single article
    article = fetch_single_article()

    # Send the article to Kafka
    producer.produce(topic, json.dumps(article).encode('utf-8'), callback=delivery_report)

    # Wait for any outstanding messages to be sent
    producer.flush()


## Run the Producer periodically

In [None]:
import time

# Step 3: Run the producer periodically to fetch and send news to Kafka
try:
    while True:
        print("Fetching and sending news to Kafka...")
        produce_news_to_kafka()

        # Fetch and send an article every 60 seconds
        time.sleep(10)

except KeyboardInterrupt:
    print("Stopped fetching and sending articles.")


Fetching and sending news to Kafka...
Message delivered to topic_0 [5]
Fetching and sending news to Kafka...
Message delivered to topic_0 [5]
Fetching and sending news to Kafka...
Message delivered to topic_0 [1]
Fetching and sending news to Kafka...
Message delivered to topic_0 [4]
Fetching and sending news to Kafka...
Message delivered to topic_0 [3]
Fetching and sending news to Kafka...
Message delivered to topic_0 [3]
Fetching and sending news to Kafka...
Message delivered to topic_0 [5]
Stopped fetching and sending articles.


## Kafka Consumer

In [None]:
from confluent_kafka import Consumer, KafkaException
import json
import time

# Kafka consumer configuration
conf = {
    'bootstrap.servers': 'pkc-12576z.us-west2.gcp.confluent.cloud:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'FUWTPXPMVYH3PZMX',  # Replace with your API Key
    'sasl.password': '3uj19vFPs2WvHC7jaqlaXerAQPbN5HRz/kYsc4tz8DGi1ubTCtjL1MHPU0fJEGww',  # Replace with your API Secret
    'group.id': 'news-consumer-group',
    'auto.offset.reset': 'latest'
}

# Create Kafka consumer
consumer = Consumer(conf)
consumer.subscribe(['topic_0'])  # Subscribe to your Kafka topic

# List to store all received articles
articles = []

# Function to consume and store the articles
timeout_counter = 0  # Counter to detect if there are no new articles

try:
    while True:
        # Poll Kafka for new messages
        msg = consumer.poll(timeout=1.0)

        # If no message is received, increase the timeout counter and break if no new articles are coming
        if msg is None:
            timeout_counter += 1
            if timeout_counter > 10:  # Stop after 10 consecutive polls without new messages
                print("No new articles received. Stopping consumer.")
                break
            continue

        # Reset timeout counter when new message is received
        timeout_counter = 0

        # Check for any errors
        if msg.error():
            if msg.error().code() == KafkaException._PARTITION_EOF:
                continue
            else:
                raise KafkaException(msg.error())

        # Decode and store the message (article)
        article = json.loads(msg.value().decode('utf-8'))
        articles.append(article)  # Store article in the list

except KeyboardInterrupt:
    print("Consumer stopped.")
finally:
    # Close the consumer cleanly
    consumer.close()



No new articles received. Stopping consumer.


# Defining the article categories using RoBERTa Model

In [None]:
# Print all received articles after consumer stops
print("All Articles Received:")
for i, article in enumerate(articles, start=1):
    print(f"Article {i}: {json.dumps(article, indent=4)}")


All Articles Received:
Article 1: {
    "title": "Orionids meteor shower hits peak activity this weekend: When and where to watch - USA TODAY",
    "description": "Skygazers should have a chance to see the Orionids, one of the year's most beautiful meteor showers, just in time for Halloween.",
    "content": "As long as the moon and skies are all treats and no tricks, skygazers should have a chance to see the Orionids, one of the year's most striking meteor showers, just in time for Halloween.\r\nThe Orioni\u2026 [+4195 chars]"
}
Article 2: {
    "title": "North Korean troops in Ukraine war called \u2018huge\u2019 escalation risk - POLITICO Europe",
    "description": "South Korea\u2019s National Intelligence Service said on Friday that North Korea had already deployed 1,500 special forces troops to Russia.",
    "content": "Both the Kremlin and Pyongyang deny they have engaged in military transfers. And NATO Secretary-General Mark Rutte\u00a0said on Friday that he could not confirm re

In [None]:
from transformers import pipeline

In [None]:
import pandas as pd
import csv
import os
from transformers import pipeline
descriptions = [article.get('description', 'No description available') for article in articles]

# Initialize the zero-shot classifier
classifier = pipeline("zero-shot-classification")

# Define candidate labels for classification
candidate_labels = ['entertainment', 'politics', 'business', 'sports', 'climate', 'science']

# List to store classification results
classified_data = []

# Classify each description and get the strongest label
for description in descriptions:
    if description==None:
      continue
    result = classifier(description, candidate_labels=candidate_labels)

    # Pick the label with the highest score
    strongest_label = result['labels'][0]  # The label with the highest score is always at index 0

    # Append the description and its category to the list
    classified_data.append([description, strongest_label])

# Define the CSV file name
csv_file = 'article_descriptions_with_labels.csv'

# Check if the file exists
file_exists = os.path.isfile(csv_file)

# Open the CSV file in append mode ('a' mode) and write the descriptions and categories
with open(csv_file, mode='a', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)

    # If the file does not exist, write the header first
    if not file_exists:
        writer.writerow(['Description', 'Category'])  # Writing the header

    # Write each description and category to the CSV
    writer.writerows(classified_data)

print(f"Descriptions and categories saved to {csv_file}")




No model was supplied, defaulted to facebook/bart-large-mnli and revision c626438 (https://huggingface.co/facebook/bart-large-mnli).
Using a pipeline without specifying a model name and revision in production is not recommended.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.15k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]



Descriptions and categories saved to article_descriptions_with_labels.csv


In [None]:
df=pd.read_csv('article_descriptions_with_labels.csv')
df.head()

Unnamed: 0,Description,Category
0,Israel is expanding its bombardment of militan...,politics
1,The unique communal structure of the kibbutz p...,climate
2,As the anniversary of the Hamas attack on Isra...,politics
3,Less than 10 days after Hurricane Helene made ...,climate
4,The Northern Lights can potentially be seen fr...,science


# NLP Pipeline

## Text Cleaning

In [None]:
import re

# Function to clean text
def clean_text(text):
    # Remove URLs
    text = re.sub(r'http\S+', '', text)

    # Remove HTML tags (if any)
    text = re.sub(r'<.*?>', '', text)

    # Remove special characters and punctuation (keeping only letters and spaces)
    text = re.sub(r'[^a-zA-Z\s]', '', text)

    # Convert to lowercase
    text = text.lower()

    # Remove numbers
    text = re.sub(r'\d+', '', text)

    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text




In [None]:
df['cleaned_text'] = df['Description'].apply(clean_text)
df.head()

Unnamed: 0,Description,Category,cleaned_text
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...


## Tokenization

In [None]:
import nltk
from nltk.tokenize import word_tokenize

In [None]:
nltk.download('punkt') # sentence tokenizer in nltk

def tokenize_text(text):
    return word_tokenize(text)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
df['tokenized_text'] = df['cleaned_text'].apply(tokenize_text)
df.head()

Unnamed: 0,Description,Category,cleaned_text,tokenized_text
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...,"[israel, is, expanding, its, bombardment, of, ..."
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...,"[the, unique, communal, structure, of, the, ki..."
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...,"[as, the, anniversary, of, the, hamas, attack,..."
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...,"[less, than, days, after, hurricane, helene, m..."
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...,"[the, northern, lights, can, potentially, be, ..."


## Stopword Removal

In [None]:
from nltk.corpus import stopwords
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
def remove_stopwords(tokens):
    stop_words = set(stopwords.words('english'))
    return [word for word in tokens if word.lower() not in stop_words]

In [None]:
df['tokenized_without_stopwords'] = df['tokenized_text'].apply(remove_stopwords)
df.head()

Unnamed: 0,Description,Category,cleaned_text,tokenized_text,tokenized_without_stopwords
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...,"[israel, is, expanding, its, bombardment, of, ...","[israel, expanding, bombardment, militant, gro..."
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...,"[the, unique, communal, structure, of, the, ki...","[unique, communal, structure, kibbutz, plays, ..."
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...,"[as, the, anniversary, of, the, hamas, attack,...","[anniversary, hamas, attack, israel, approache..."
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...,"[less, than, days, after, hurricane, helene, m...","[less, days, hurricane, helene, made, landfall..."
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...,"[the, northern, lights, can, potentially, be, ...","[northern, lights, potentially, seen, several,..."


## Lemmatization and Stemming

In [None]:
from nltk.stem import WordNetLemmatizer, PorterStemmer
nltk.download('wordnet')

lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()

def lemmatize_and_stem(tokens):
    return [stemmer.stem(lemmatizer.lemmatize(token)) for token in tokens]


[nltk_data] Downloading package wordnet to /root/nltk_data...


In [None]:
df['lemmatized_and_stemmed_tokens'] = df['tokenized_without_stopwords'].apply(lemmatize_and_stem)
df.head()

Unnamed: 0,Description,Category,cleaned_text,tokenized_text,tokenized_without_stopwords,lemmatized_and_stemmed_tokens
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...,"[israel, is, expanding, its, bombardment, of, ...","[israel, expanding, bombardment, militant, gro...","[israel, expand, bombard, milit, group, lebano..."
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...,"[the, unique, communal, structure, of, the, ki...","[unique, communal, structure, kibbutz, plays, ...","[uniqu, commun, structur, kibbutz, play, vital..."
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...,"[as, the, anniversary, of, the, hamas, attack,...","[anniversary, hamas, attack, israel, approache...","[anniversari, hama, attack, israel, approach, ..."
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...,"[less, than, days, after, hurricane, helene, m...","[less, days, hurricane, helene, made, landfall...","[le, day, hurrican, helen, made, landfal, flor..."
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...,"[the, northern, lights, can, potentially, be, ...","[northern, lights, potentially, seen, several,...","[northern, light, potenti, seen, sever, northe..."


## Vectorization

### TF-IDF Vector Embeddings

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer

# Identity tokenizer to avoid re-tokenization
def identity_tokenizer(text):
    return text

# Function to vectorize a column of preprocessed articles (each article is a list of tokens)
def vectorize_articles(df, column_name):
    # Initialize the TF-IDF Vectorizer
    vectorizer = TfidfVectorizer(tokenizer=identity_tokenizer, lowercase=False)  # Use identity_tokenizer to avoid re-tokenization

    # Fit the vectorizer on the entire column (the column contains lists of words)
    tfidf_matrix = vectorizer.fit_transform(df[column_name].apply(lambda x: ' '.join(x)))  # Join tokens to form a sentence

    # Get the feature names (terms) and the TF-IDF matrix as an array
    feature_names = vectorizer.get_feature_names_out()
    tfidf_vectors = tfidf_matrix.toarray()

    return tfidf_vectors, feature_names



# Vectorize the column of lemmatized and stemmed tokens
tfidf_vectors, feature_names = vectorize_articles(df, 'lemmatized_and_stemmed_tokens')

# Add the TF-IDF vectors to a new column in the DataFrame
df['tfidf_vector'] = list(tfidf_vectors)

# Output the DataFrame with the new column
df.head()






Unnamed: 0,Description,Category,cleaned_text,tokenized_text,tokenized_without_stopwords,lemmatized_and_stemmed_tokens,tfidf_vector
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...,"[israel, is, expanding, its, bombardment, of, ...","[israel, expanding, bombardment, militant, gro...","[israel, expand, bombard, milit, group, lebano...","[0.5416213553764374, 0.2523719095682392, 0.195..."
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...,"[the, unique, communal, structure, of, the, ki...","[unique, communal, structure, kibbutz, plays, ...","[uniqu, commun, structur, kibbutz, play, vital...","[0.542552107193305, 0.24344242894315962, 0.120..."
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...,"[as, the, anniversary, of, the, hamas, attack,...","[anniversary, hamas, attack, israel, approache...","[anniversari, hama, attack, israel, approach, ...","[0.40619035625656275, 0.6378998066779937, 0.0,..."
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...,"[less, than, days, after, hurricane, helene, m...","[less, days, hurricane, helene, made, landfall...","[le, day, hurrican, helen, made, landfal, flor...","[0.5591297314988245, 0.3628811391104068, 0.106..."
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...,"[the, northern, lights, can, potentially, be, ...","[northern, lights, potentially, seen, several,...","[northern, light, potenti, seen, sever, northe...","[0.47517026366304854, 0.28783118188820866, 0.0..."


### BERT Embedding Model


In [None]:
import torch
import numpy as np
import pandas as pd
from transformers import BertTokenizer, BertModel

# Function to load the BERT tokenizer and model
def load_bert_model():
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = BertModel.from_pretrained('bert-base-uncased')
    return tokenizer, model

# Function to get BERT embeddings for a list of texts
def get_bert_embeddings(text_list, tokenizer, model):
    embeddings = []
    for text in text_list:
        # Tokenize and encode the input text
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)

        # Get the hidden states from BERT model
        with torch.no_grad():
            outputs = model(**inputs)

        # Get the embedding of [CLS] token (first token in the sequence)
        cls_embedding = outputs.last_hidden_state[:, 0, :].numpy()  # (batch_size, hidden_size)
        embeddings.append(cls_embedding)

    # Stack all embeddings into a single array
    return np.vstack(embeddings)



# Load BERT model and tokenizer
tokenizer, model = load_bert_model()

# Get BERT embeddings for the 'description' column
bert_embeddings = get_bert_embeddings(df['Description'].tolist(), tokenizer, model)

# Store the embeddings in a new column in the DataFrame
df['bert_embeddings'] = list(bert_embeddings)

df.head()





tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]



model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Unnamed: 0,Description,Category,cleaned_text,tokenized_text,tokenized_without_stopwords,lemmatized_and_stemmed_tokens,tfidf_vector,bert_embeddings
0,Israel is expanding its bombardment of militan...,politics,israel is expanding its bombardment of militan...,"[israel, is, expanding, its, bombardment, of, ...","[israel, expanding, bombardment, militant, gro...","[israel, expand, bombard, milit, group, lebano...","[0.5416213553764374, 0.2523719095682392, 0.195...","[-0.75433934, 0.06642044, -0.36684126, -0.3348..."
1,The unique communal structure of the kibbutz p...,climate,the unique communal structure of the kibbutz p...,"[the, unique, communal, structure, of, the, ki...","[unique, communal, structure, kibbutz, plays, ...","[uniqu, commun, structur, kibbutz, play, vital...","[0.542552107193305, 0.24344242894315962, 0.120...","[-0.19932495, 0.25479653, -0.24113928, -0.6140..."
2,As the anniversary of the Hamas attack on Isra...,politics,as the anniversary of the hamas attack on isra...,"[as, the, anniversary, of, the, hamas, attack,...","[anniversary, hamas, attack, israel, approache...","[anniversari, hama, attack, israel, approach, ...","[0.40619035625656275, 0.6378998066779937, 0.0,...","[-0.16050641, 0.029564342, -0.35920808, -0.341..."
3,Less than 10 days after Hurricane Helene made ...,climate,less than days after hurricane helene made lan...,"[less, than, days, after, hurricane, helene, m...","[less, days, hurricane, helene, made, landfall...","[le, day, hurrican, helen, made, landfal, flor...","[0.5591297314988245, 0.3628811391104068, 0.106...","[-0.29291564, -0.47060636, 0.5671201, -0.23981..."
4,The Northern Lights can potentially be seen fr...,science,the northern lights can potentially be seen fr...,"[the, northern, lights, can, potentially, be, ...","[northern, lights, potentially, seen, several,...","[northern, light, potenti, seen, sever, northe...","[0.47517026366304854, 0.28783118188820866, 0.0...","[0.0869678, -0.14454554, 0.8187698, -0.4012647..."


# ML Pipeline

### Splitting the data into Train and Test sets

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors

# Initialize Spark Session
spark = SparkSession.builder.appName("ML Pipeline").getOrCreate()
labels=df['Category'].tolist()

# Convert the embeddings and labels to PySpark DataFrame
data = [(y, Vectors.dense(x)) for x, y in zip(bert_embeddings, labels)]
# Create a PySpark DataFrame with string labels and features
df_spark = spark.createDataFrame(data, ["label", "features"])

# Use StringIndexer to convert string labels to numeric labels
indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
df_spark_indexed = indexer.fit(df_spark).transform(df_spark)

# Split into training and testing sets
train_bert, test_bert = df_spark_indexed.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors

labels=df['Category'].tolist()

# Convert the embeddings and labels to PySpark DataFrame
data2 = [(y, Vectors.dense(x)) for x, y in zip(tfidf_vectors, labels)]
# Create a PySpark DataFrame with string labels and features
df_spark2 = spark.createDataFrame(data2, ["label", "features"])

# Use StringIndexer to convert string labels to numeric labels
indexer = StringIndexer(inputCol="label", outputCol="label_indexed")
df_spark_indexed2 = indexer.fit(df_spark2).transform(df_spark2)

# Split into training and testing sets
train_tfidf, test_tfidf = df_spark_indexed2.randomSplit([0.8, 0.2], seed=42)

## Naive Bayes with TF-IDF Vectors

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Define function to train Naive Bayes model with TF-IDF features
def train_naive_bayes(train_tfidf):
    # Initialize NaiveBayes model with the "multinomial" option (as we're using TF-IDF)
    nb = NaiveBayes(labelCol="label_indexed", featuresCol="features", modelType="multinomial")

    # Train the model on the training set
    model = nb.fit(train_tfidf)
    return model

# Step 2: Define function to evaluate the Naive Bayes model
def evaluate_model(model, test_tfidf):
    # Predict the labels for the test set
    predictions = model.transform(test_tfidf)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print the accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Train the Naive Bayes model on the TF-IDF features
model = train_naive_bayes(train_tfidf)

# Evaluate the model on the test set
evaluate_model(model, test_tfidf)


Accuracy: 0.2500


## Random Forest with BERT

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the Random Forest model
def train_random_forest(train_bert):
    # Initialize the RandomForestClassifier
    rf = RandomForestClassifier(labelCol="label_indexed", featuresCol="features", numTrees=100)

    # Train the Random Forest model
    model = rf.fit(train_bert)
    return model

# Step 2: Evaluate the Random Forest model
def evaluate_model(model, test_bert):
    # Predict the labels for the test set
    predictions = model.transform(test_bert)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Assuming you have the BERT embeddings as a PySpark DataFrame with `label` and `features`
# and you have already converted labels to numeric labels with StringIndexer, and split into train_bert and test_bert

# Train the Random Forest model
model = train_random_forest(train_bert)

# Evaluate the Random Forest model on the test set
evaluate_model(model, test_bert)


Accuracy: 0.7083


## Random Forest With TF-IDF vectors

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the Random Forest model
def train_random_forest(train_tfidf):
    # Initialize the RandomForestClassifier
    rf = RandomForestClassifier(labelCol="label_indexed", featuresCol="features", numTrees=100)

    # Train the Random Forest model
    model = rf.fit(train_tfidf)
    return model

# Step 2: Evaluate the Random Forest model
def evaluate_model(model, test_tfidf):
    # Predict the labels for the test set
    predictions = model.transform(test_tfidf)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Assuming you have already converted labels to numeric labels with StringIndexer, and split into train_tfidf and test_tfidf

# Train the Random Forest model on the TF-IDF features
model = train_random_forest(train_tfidf)

# Evaluate the Random Forest model on the test set
evaluate_model(model, test_tfidf)


Accuracy: 0.4583


## SVM with BERT

In [None]:
from pyspark.ml.classification import OneVsRest, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the One-vs-Rest SVM model
def train_one_vs_rest_svm(train_bert):
    # Initialize the base classifier (LinearSVC) for binary classification
    svm = LinearSVC(labelCol="label_indexed", featuresCol="features", maxIter=10)

    # Initialize OneVsRest with LinearSVC
    ovr = OneVsRest(classifier=svm, labelCol="label_indexed", featuresCol="features")

    # Train the One-vs-Rest SVM model
    ovr_model = ovr.fit(train_bert)
    return ovr_model

# Step 2: Evaluate the One-vs-Rest SVM model
def evaluate_model(model, test_bert):
    # Predict the labels for the test set
    predictions = model.transform(test_bert)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Train the One-vs-Rest SVM model on the BERT embeddings
ovr_model = train_one_vs_rest_svm(train_bert)

# Evaluate the One-vs-Rest SVM model on the test set
evaluate_model(ovr_model, test_bert)


Accuracy: 0.7917


## SVM With TF-IDF vectors

In [None]:
from pyspark.ml.classification import OneVsRest, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the One-vs-Rest SVM model
def train_one_vs_rest_svm(train_tfidf):
    # Initialize the base classifier (LinearSVC)
    svm = LinearSVC(labelCol="label_indexed", featuresCol="features", maxIter=10)

    # Initialize OneVsRest with LinearSVC for multi-class classification
    ovr = OneVsRest(classifier=svm, labelCol="label_indexed", featuresCol="features")

    # Train the One-vs-Rest SVM model
    ovr_model = ovr.fit(train_tfidf)
    return ovr_model

# Step 2: Evaluate the One-vs-Rest SVM model
def evaluate_model(model, test_tfidf):
    # Predict the labels for the test set
    predictions = model.transform(test_tfidf)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Assuming you have already prepared the TF-IDF DataFrame with 'features' and 'label_indexed' columns,
# and split the data into train_tfidf and test_tfidf DataFrames

# Train the One-vs-Rest SVM model on the TF-IDF features
ovr_model = train_one_vs_rest_svm(train_tfidf)

# Evaluate the One-vs-Rest SVM model on the test set
evaluate_model(ovr_model, test_tfidf)


Accuracy: 0.4167


## Logistic Regression with BERT

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the Logistic Regression model
def train_logistic_regression(train_bert):
    # Initialize the LogisticRegression model
    lr = LogisticRegression(labelCol="label_indexed", featuresCol="features", maxIter=10)

    # Train the Logistic Regression model
    model = lr.fit(train_bert)
    return model

# Step 2: Evaluate the Logistic Regression model
def evaluate_model(model, test_bert):
    # Predict the labels for the test set
    predictions = model.transform(test_bert)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Assuming you have the BERT embeddings prepared as a PySpark DataFrame with 'features' and 'label_indexed' columns,
# and you have split the data into train_bert and test_bert DataFrames

# Train the Logistic Regression model on the BERT embeddings
model = train_logistic_regression(train_bert)

# Evaluate the Logistic Regression model on the test set
evaluate_model(model, test_bert)


Accuracy: 0.7917


## Logistic Regression with TF-IDF vectors

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Train the Logistic Regression model
def train_logistic_regression(train_tfidf):
    # Initialize the LogisticRegression model
    lr = LogisticRegression(labelCol="label_indexed", featuresCol="features", maxIter=10)

    # Train the Logistic Regression model
    model = lr.fit(train_tfidf)
    return model

# Step 2: Evaluate the Logistic Regression model
def evaluate_model(model, test_tfidf):
    # Predict the labels for the test set
    predictions = model.transform(test_tfidf)

    # Use MulticlassClassificationEvaluator to compute accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol="label_indexed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # Print accuracy
    print(f"Accuracy: {accuracy:.4f}")

# Assuming you have the TF-IDF features prepared as a PySpark DataFrame with 'features' and 'label_indexed' columns,
# and you have split the data into train_tfidf and test_tfidf DataFrames

# Train the Logistic Regression model on the TF-IDF features
model = train_logistic_regression(train_tfidf)

# Evaluate the Logistic Regression model on the test set
evaluate_model(model, test_tfidf)


Accuracy: 0.4167
