<a href="https://colab.research.google.com/github/ZHAbotorabi/Apache-Spark/blob/main/News_Headline_Categorization_using_Spark_NLP(Classification_Clustering).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Using Apache Spark. Here's a step-by-step guide to start:

## Step 1: Install Java and PySpark
Install Java and PySpark in your Colab environment.

In [4]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
!pip install pyspark



## Step 2: Set Up Environment Variables
Set up the environment variables for Java.

In [5]:
import os

# Set the environment variable for Java
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"

## Step 3: Start a Spark Session
Now that everything is set up, create a Spark session.

In [6]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("ColabSpark") \
    .getOrCreate()

# Check the Spark version
print(spark.version)


3.5.3


## Step 4: Use Spark as a simple sample

In [7]:
# Sample data
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Show the DataFrame
df.show()

# Perform some operations
df.filter(df.Age > 30).show()

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



## Step 5: Use Spark as a real project

# Project: News Headline Categorization using Spark NLP
This complete workflow includes loading the data, preprocessing (including lowercasing, tokenization, stopword removal), feature extraction (using TF-IDF), and training a logistic regression model for classification. Finally, it evaluates the model and displays the accuracy and predictions.

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NewsHeadlinesCategorization") \
    .getOrCreate()


In [12]:
import requests
import pandas as pd

# Your NewsAPI key
API_KEY = '36d...d091fe2f34c...e5d6cc0ac5c2f87...'

# Fetch top headlines from the US
url = f"https://newsapi.org/v2/top-headlines?country=us&apiKey={API_KEY}"
response = requests.get(url)
data = response.json()

# Check if the request was successful
if data["status"] != "ok":
    print("Failed to fetch data:", data)
    exit()

# Extract relevant information (headline + manually assigned category)
headlines = []
for article in data["articles"]:
    title = article["title"]
    # Basic category assignment (you can customize this)
    if any(keyword in title.lower() for keyword in ["sport", "game", "match"]):
        category = "Sports"
    elif any(keyword in title.lower() for keyword in ["tech", "apple", "google", "ai", "software"]):
        category = "Technology"
    elif any(keyword in title.lower() for keyword in ["politics", "government", "election"]):
        category = "Politics"
    else:
        category = "Other"

    headlines.append({"headline": title, "category": category})

# Create a DataFrame and save it as a CSV file
df = pd.DataFrame(headlines)
df.to_csv("news_headlines.csv", index=False)

print("CSV file 'news_headlines.csv' created successfully!")
df.head(10)

CSV file 'news_headlines.csv' created successfully!


Unnamed: 0,headline,category
0,Stock market today: Nasdaq leads stocks lower ...,Other
1,"NFL Fantasy 2024 Start 'Em, Sit 'Em: Quarterba...",Other
2,"Four killed, 14 injured in terror attack on Tu...",Other
3,How long can you stand on one foot? The answer...,Other
4,Maps and charts: Tracking the early votes in k...,Politics
5,Apple Releases First Betas of iOS 18.2 and Mor...,Technology
6,Apple and Goldman Sachs fined millions for mis...,Technology
7,Chinese influence operation targets US down ba...,Other
8,Iranian hacker group focuses on US election we...,Politics
9,Boeing reports $6 billion quarterly loss as st...,Other


In [13]:
# Load the CSV file
data = spark.read.csv("news_headlines.csv", header=True, inferSchema=True)

# Check schema and sample data
data.printSchema()
data.show(10)


root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+----------+
|            headline|  category|
+--------------------+----------+
|Stock market toda...|     Other|
|NFL Fantasy 2024 ...|     Other|
|Four killed, 14 i...|     Other|
|How long can you ...|     Other|
|Maps and charts: ...|  Politics|
|Apple Releases Fi...|Technology|
|Apple and Goldman...|Technology|
|Chinese influence...|     Other|
|Iranian hacker gr...|  Politics|
|Boeing reports $6...|     Other|
+--------------------+----------+
only showing top 10 rows



In [14]:
# Lowercasing the headlines
data = data.withColumn("headline", F.lower(data.headline))
# Check schema and sample data
data.printSchema()
data.show(10)

root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+----------+
|            headline|  category|
+--------------------+----------+
|stock market toda...|     Other|
|nfl fantasy 2024 ...|     Other|
|four killed, 14 i...|     Other|
|how long can you ...|     Other|
|maps and charts: ...|  Politics|
|apple releases fi...|Technology|
|apple and goldman...|Technology|
|chinese influence...|     Other|
|iranian hacker gr...|  Politics|
|boeing reports $6...|     Other|
+--------------------+----------+
only showing top 10 rows



In [15]:
# Tokenization
tokenizer = Tokenizer(inputCol="headline", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Create a pipeline for transformations
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

# Fit the pipeline to the data
model = pipeline.fit(data)
processed_data = model.transform(data)

# Show the processed data
processed_data.select("headline", "filtered").show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|headline                                                                                                                 |filtered                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|stock market today: nasdaq leads stocks lower as wall street braces for tesla earnings - yahoo finance                   |[stock, market, today:, nasdaq, leads, stocks, lower, wall, street, braces, tesla, earnings, -, yahoo, finance]       

In [16]:
# Feature Extraction using TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(processed_data)

# Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Show the features
tfidf_data.select("headline", "features").show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|headline                                                                                                                 |features                                                                                                                                                                                                                                                |
+-------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------

In [17]:
# Encode the labels (categories)
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="label")
tfidf_data = indexer.fit(tfidf_data).transform(tfidf_data)


In [19]:
# Train-Test Split
train_data, test_data = tfidf_data.randomSplit([0.8, 0.2], seed=1234)

# Initialize Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

# Fit the model
lr_model = lr.fit(train_data)

# Predictions
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")

# Show some predictions
predictions.select("headline", "category", "prediction").show(10, truncate=False)

Test Accuracy: 0.00
+------------------------------------------------------------------------------------+----------+----------+
|headline                                                                            |category  |prediction|
+------------------------------------------------------------------------------------+----------+----------+
|apple and goldman sachs fined millions for misleading apple card holders - the verge|Technology|0.0       |
|apple’s reportedly slowing down vision pro production, for now - the verge          |Technology|0.0       |
+------------------------------------------------------------------------------------+----------+----------+



## News Headline Categorization using Classificatin in PySpark
This document provides a step-by-step overview of the process to categorize news headlines using Apache Spark in Google Colab.

Step 1: Initialize Spark Session
Start by initializing a Spark session. This is necessary to work with Spark DataFrames and perform data processing tasks.

Step 2: Load Data
Read the CSV file containing news headlines and their corresponding categories into a Spark DataFrame. This DataFrame serves as the main dataset for processing.

Step 3: Check Data Schema and Sample Data
Inspect the schema and display the first few rows of the DataFrame to understand its structure. This ensures that the columns are correctly identified and the data is in the expected format.

Step 4: Lowercase Headlines
Convert all headlines to lowercase to ensure uniformity. This helps reduce variations in text when analyzing or processing the data.

Step 5: Tokenization
Split the headlines into individual words (tokens). This is crucial for text analysis, as it breaks the text into manageable parts.

Step 6: Remove Stop Words
Filter out common stop words (like "and," "the," etc.) from the list of tokens. Stop words typically do not carry significant meaning and can be removed to enhance the quality of the analysis.

Step 7: Create a Pipeline
Set up a processing pipeline that includes both the tokenization and stop words removal steps. This pipeline allows for a streamlined transformation of the data.

Step 8: Fit the Pipeline
Apply the pipeline to the DataFrame to execute the transformations. This results in a new DataFrame that contains the processed text data (tokens without stop words).

Step 9: Feature Extraction using TF-IDF
Use the Term Frequency-Inverse Document Frequency (TF-IDF) method to convert the text data into numerical feature vectors. This helps quantify the importance of words in relation to the entire dataset, enabling machine learning algorithms to interpret the text.

Step 10: Show Features
Display the processed feature vectors alongside the original headlines for verification and understanding of how the text has been transformed into numerical data.

Step 11: Encode Labels
Convert the categorical labels (news categories) into numerical format using a StringIndexer. This step is essential for machine learning models, which require numerical input for both features and labels.

Step 12: Train-Test Split
Split the dataset into training and testing sets. The training set will be used to train the classification model, while the testing set will be reserved for evaluating the model’s performance.

Step 13: Model Training
Proceed to train a classification model (e.g., **Logistic Regression** and **Random Forest**) using the training data. This step is crucial for learning to categorize the headlines.

Step 14: Evaluation
After training, evaluate the model’s performance on the test data to understand its accuracy and effectiveness in categorizing news headlines.

### LogisticRegression

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark Session (if not already done)
spark = SparkSession.builder.appName("NewsCategory").getOrCreate()

# Load the CSV file
data = spark.read.csv("news_headlines.csv", header=True, inferSchema=True)

# Check schema and sample data
data.printSchema()
data.show(10)

# Lowercasing the headlines
data = data.withColumn("headline", F.lower(data.headline))

# Tokenization
tokenizer = Tokenizer(inputCol="headline", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Create a pipeline for transformations
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

# Fit the pipeline to the data
model = pipeline.fit(data)
processed_data = model.transform(data)

# Show the processed data
processed_data.select("headline", "filtered").show(10, truncate=False)

# Feature Extraction using TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(processed_data)

# Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Show the features
tfidf_data.select("headline", "features").show(10, truncate=False)

# Encode the labels (categories)
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="label")
tfidf_data = indexer.fit(tfidf_data).transform(tfidf_data)

# Train-Test Split
train_data, test_data = tfidf_data.randomSplit([0.8, 0.2], seed=1234)

# Initialize Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

# Fit the model
lr_model = lr.fit(train_data)

# Predictions
predictions = lr_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")

# Show some predictions
predictions.select("headline", "category", "prediction").show(10, truncate=False)

root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+----------+
|            headline|  category|
+--------------------+----------+
|Stock market toda...|     Other|
|NFL Fantasy 2024 ...|     Other|
|Four killed, 14 i...|     Other|
|How long can you ...|     Other|
|Maps and charts: ...|  Politics|
|Apple Releases Fi...|Technology|
|Apple and Goldman...|Technology|
|Chinese influence...|     Other|
|Iranian hacker gr...|  Politics|
|Boeing reports $6...|     Other|
+--------------------+----------+
only showing top 10 rows

+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+
|headline                                                                                                                 |filtered                            

## RandomForestClassifier

In [21]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, StringIndexer, RegexTokenizer, StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Assuming df is your DataFrame with 'headline' and 'category'
#df = df.withColumnRenamed("headline", "text").withColumnRenamed("category", "label")

# Initialize Spark Session (if not already done)
spark = SparkSession.builder.appName("NewsClassification").getOrCreate()

# Load the CSV file
data = spark.read.csv("news_headlines.csv", header=True, inferSchema=True)

# Check schema and sample data
data.printSchema()
data.show(10)

# Assuming df is your DataFrame with 'headline' and 'category'
df = data.withColumnRenamed("headline", "text").withColumnRenamed("category", "label")

# Lowercasing the headlines
data = data.withColumn("headline", F.lower(data.headline))

# Index labels, adding metadata to the label column
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")

# Tokenize the text
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Convert text to numerical features
vectorizer = CountVectorizer(inputCol="filtered", outputCol="features")

# Initialize Random Forest Classifier
rf = RandomForestClassifier(labelCol="labelIndex", featuresCol="features", numTrees=10)

# Create a pipeline
pipeline = Pipeline(stages=[indexer, tokenizer, remover, vectorizer, rf])

# Fit the model
model = pipeline.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy:.2f}")

# Show some predictions
predictions.select("text", "label", "prediction", "probability").show(10, truncate=False)


root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+----------+
|            headline|  category|
+--------------------+----------+
|Stock market toda...|     Other|
|NFL Fantasy 2024 ...|     Other|
|Four killed, 14 i...|     Other|
|How long can you ...|     Other|
|Maps and charts: ...|  Politics|
|Apple Releases Fi...|Technology|
|Apple and Goldman...|Technology|
|Chinese influence...|     Other|
|Iranian hacker gr...|  Politics|
|Boeing reports $6...|     Other|
+--------------------+----------+
only showing top 10 rows

Accuracy: 0.80
+-------------------------------------------------------------------------------------------------------------------------+----------+----------+-------------------------------------------------+
|text                                                                                                                     |label     |prediction|probability                                      |
+----

# News Headline Categorization using **Clustering** in PySpark
If you want to approach the news headline categorization project using clustering instead of classification, you can use techniques like K-Means clustering. In this mode, the goal is to group similar headlines together without predefined categories. Here’s how to adapt the project for clustering using PySpark.

Overview of Clustering News Headlines
Data Preparation: Load and preprocess the data (similar to the classification approach).
Feature Extraction: Convert the text data into numerical format using techniques like TF-IDF.
Clustering: Use a clustering algorithm (e.g., K-Means) to group the headlines based on similarity.
Evaluation and Interpretation: Analyze the clusters to understand the topics represented.
Step-by-Step Implementation
Here’s a step-by-step outline of how to implement the clustering approach:

Step 1: Initialize Spark Session
Start by initializing a Spark session to work with PySpark.

Step 2: Load Data
Load the CSV file containing the news headlines into a Spark DataFrame.

Step 3: Check Data Schema and Sample Data
Inspect the schema and display the first few rows of the DataFrame to understand its structure.

Step 4: Lowercase Headlines
Convert all headlines to lowercase for consistency.

Step 5: Tokenization
Split the headlines into individual words (tokens).

Step 6: Remove Stop Words
Filter out common stop words from the list of tokens.

Step 7: Create a Pipeline for Preprocessing
Set up a processing pipeline that includes tokenization and stop words removal.

Step 8: Fit the Pipeline
Apply the pipeline to transform the data into a processed format.

Step 9: Feature Extraction using TF-IDF
Convert the processed text data into numerical feature vectors using the TF-IDF method.

Step 10: **K-Means Clustering**
Use the K-Means algorithm to cluster the headlines based on their feature vectors.
You will need to decide on the number of clusters (K) based on the nature of your data.

Step 11: Analyze Clusters
After clustering, you can analyze the resulting clusters to identify common themes or topics.
Display a few samples from each cluster to interpret the results.

Step 12: Visualization (Optional)
You can visualize the clusters using techniques like **t-SNE** or **PCA** to reduce dimensions and plot the data.

Sample Code for Clustering
Here’s a sample code structure you can follow in Google Colab using PySpark:

In [22]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans

# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("NewsClustering").getOrCreate()

# Step 2: Load Data
data = spark.read.csv("news_headlines.csv", header=True, inferSchema=True)

# Step 3: Check Data Schema
data.printSchema()
data.show(10)

# Step 4: Lowercase Headlines
data = data.withColumn("headline", F.lower(data.headline))

# Step 5: Tokenization
tokenizer = Tokenizer(inputCol="headline", outputCol="words")

# Step 6: Remove Stop Words
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Step 7: Create a Pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

# Step 8: Fit the Pipeline
model = pipeline.fit(data)
processed_data = model.transform(data)

# Step 9: Feature Extraction using TF-IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashingTF.transform(processed_data)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Step 10: K-Means Clustering
kmeans = KMeans(k=5, seed=1)  # Adjust 'k' based on your data
model = kmeans.fit(tfidf_data)

# Step 11: Make Predictions
predictions = model.transform(tfidf_data)

# Step 12: Show Cluster Results
predictions.select("headline", "prediction").show(10, truncate=False)


root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)

+--------------------+----------+
|            headline|  category|
+--------------------+----------+
|Stock market toda...|     Other|
|NFL Fantasy 2024 ...|     Other|
|Four killed, 14 i...|     Other|
|How long can you ...|     Other|
|Maps and charts: ...|  Politics|
|Apple Releases Fi...|Technology|
|Apple and Goldman...|Technology|
|Chinese influence...|     Other|
|Iranian hacker gr...|  Politics|
|Boeing reports $6...|     Other|
+--------------------+----------+
only showing top 10 rows

+-------------------------------------------------------------------------------------------------------------------------+----------+
|headline                                                                                                                 |prediction|
+-------------------------------------------------------------------------------------------------------------------------+----------+
|stock 

In [24]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns
"""
features_array=processed_data.select('features').collect()
tsne = TSNE(n_components=2, perplexity=5, random_state=42)  # Adjust perplexity
tsne_result = tsne.fit_transform(features_array)
"""

"\nfeatures_array=processed_data.select('features').collect()\ntsne = TSNE(n_components=2, perplexity=5, random_state=42)  # Adjust perplexity\ntsne_result = tsne.fit_transform(features_array)\n"

In [None]:
print(features_array.shape)

(20, 10000)


In [2]:
from sklearn.decomposition import PCA
"""
# Reduce to 50 dimensions with PCA
pca = PCA(n_components=50, random_state=42)
features_reduced = pca.fit_transform(features_array)

# Now apply t-SNE on the reduced features
tsne = TSNE(n_components=2, perplexity=5, random_state=42)
tsne_result = tsne.fit_transform(features_reduced)
"""

'\n# Reduce to 50 dimensions with PCA\npca = PCA(n_components=50, random_state=42)\nfeatures_reduced = pca.fit_transform(features_array)\n\n# Now apply t-SNE on the reduced features\ntsne = TSNE(n_components=2, perplexity=5, random_state=42)\ntsne_result = tsne.fit_transform(features_reduced)\n'

In [1]:
from sklearn.manifold import TSNE
"""
tsne = TSNE(n_components=2, random_state=42)
tsne_result = tsne.fit_transform(features_array)
pandas_df['tsne1'] = tsne_result[:, 0]
pandas_df['tsne2'] = tsne_result[:, 1]

plt.figure(figsize=(10, 7))
sns.scatterplot(data=pandas_df, x='tsne1', y='tsne2', hue='prediction', palette='viridis', s=50)
plt.title("K-Means Clusters Visualized with t-SNE")
plt.show()
"""

'\ntsne = TSNE(n_components=2, random_state=42)\ntsne_result = tsne.fit_transform(features_array)\npandas_df[\'tsne1\'] = tsne_result[:, 0]\npandas_df[\'tsne2\'] = tsne_result[:, 1]\n\nplt.figure(figsize=(10, 7))\nsns.scatterplot(data=pandas_df, x=\'tsne1\', y=\'tsne2\', hue=\'prediction\', palette=\'viridis\', s=50)\nplt.title("K-Means Clusters Visualized with t-SNE")\nplt.show()\n'