In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, to_timestamp, col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator

# Initialize Spark Session
spark = SparkSession.builder.appName("NewsArticleCategorization").getOrCreate()

# Load the dataset (adjust the file path as needed)
data = spark.read.json("alakhbar.json")
data.printSchema()
data.show(truncate=80)

# Combine text fields (title, description, content) into a single column "text"
data = data.withColumn("text", concat_ws(" ", "title", "description", "content"))

# Preprocessing: Tokenize the text using RegexTokenizer
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# Remove stop words (for Arabic, you may need to supply a custom list of Arabic stop words)
arabic_stopwords = ["في", "من", "على", "و", "إلى", "عن", "أن", "كان", "مع", "هذا", "هذه", "لم", "لا", "ما"]  # Extend as needed
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=arabic_stopwords)

# Feature Extraction: Convert words to term frequency vectors
vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize=10000, minDF=2)

# Compute TF-IDF from the term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Clustering: Use KMeans with k=5 (predefined number of categories)
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=5, seed=42)

# Create a pipeline to streamline the steps
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, kmeans])

# Train the pipeline model on the entire dataset
model = pipeline.fit(data)

# Transform the dataset to get clusters
clustered_data = model.transform(data)
clustered_data.select("id", "cluster", "pub_date", "title").show(truncate=80)

# Evaluate clustering quality using Silhouette Score
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster", metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(clustered_data)
print(f"Silhouette Score: {silhouette}")

# OPTIONAL: Analyze top terms per cluster by exploring the vocabulary
# (This might involve extracting cluster centers and mapping indices to words from vectorizer.vocabulary)

# Demonstration: Classify two most recent articles based on pub_date
# Convert pub_date to timestamp if not already
data = data.withColumn("timestamp", to_timestamp("pub_date"))
recent_articles = data.orderBy(col("timestamp").desc()).limit(2)
recent_predictions = model.transform(recent_articles)
recent_predictions.select("id", "title", "cluster").show(truncate=80)


root
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- title: string (nullable = true)

+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+-----+-----------------------------------+-------------------------+------------------------------------------------------------------------------+
|                                                                         content|                                                                     description|   id|                               link|                 pub_date|                                                                         title|
+--------------------------------------------------------------------------------+-------------------------------------------------------------

In [None]:
# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, to_timestamp, col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator

# 1. Initialize Spark Session
spark = SparkSession.builder.appName("NewsArticleCategorization").getOrCreate()

# 2. Load the dataset from a JSON file (update the file path accordingly)
data = spark.read.json("alakhbar.json")
data.printSchema()  # Verify the schema
data.show(truncate=80)  # Preview the dataset

# 3. Combine text fields (title, description, content) into one column "text"
data = data.withColumn("text", concat_ws(" ", "title", "description", "content"))

# 4. Text Preprocessing
# 4a. Tokenization: Split the combined text into words
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# 4b. Remove Stop Words: Use a custom list for Arabic (you can extend this list as needed)
arabic_stopwords = ["في", "من", "على", "و", "إلى", "عن", "أن", "كان", "مع", "هذا", "هذه", "لم", "لا", "ما"]
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=arabic_stopwords)

# 5. Feature Extraction
# 5a. CountVectorizer: Transform filtered tokens into numerical term frequency vectors
vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize=10000, minDF=2)

# 5b. TF-IDF: Scale the term frequency vectors to emphasize important words
idf = IDF(inputCol="rawFeatures", outputCol="features")

# 6. Clustering: Using KMeans to group articles into 5 clusters (k=5)
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=5, seed=42)

# 7. Create a Pipeline to chain all steps together
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, kmeans])

# 8. Train the model on the dataset
model = pipeline.fit(data)

# 9. Transform the data to assign clusters to each article
clustered_data = model.transform(data)
clustered_data.select("id", "cluster", "pub_date", "title").show(truncate=80)

# 10. Evaluate the clustering performance using the Silhouette Score
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster",
                                metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(clustered_data)
print("Silhouette Score:", silhouette)

# 11. Demonstration: Classify two of the most recent articles
# Convert pub_date to timestamp for ordering
data = data.withColumn("timestamp", to_timestamp("pub_date"))
# Order articles by the most recent publication date and select two
recent_articles = data.orderBy(col("timestamp").desc()).limit(2)
# Use the trained pipeline model to predict the cluster for these articles
recent_predictions = model.transform(recent_articles)
recent_predictions.select("id", "title", "cluster").show(truncate=80)


root
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- title: string (nullable = true)

+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+-----+-----------------------------------+-------------------------+------------------------------------------------------------------------------+
|                                                                         content|                                                                     description|   id|                               link|                 pub_date|                                                                         title|
+--------------------------------------------------------------------------------+-------------------------------------------------------------

In [None]:
from pyspark.sql.functions import concat_ws

# New article details
new_title = "وزير الخارجية يعقد مؤتمر صحفي حول العلاقات الثنائية"

# Create a DataFrame with the expected columns
new_article = spark.createDataFrame(
    [(new_title, "", "")],
    ["title", "description", "content"]
)

# Create the "text" column exactly as done during training
new_article = new_article.withColumn("text", concat_ws(" ", "title", "description", "content"))

# Use the trained model's pipeline to predict the cluster for this new article
prediction = model.transform(new_article)

# Show the predicted cluster label along with the title
prediction.select("title", "cluster").show(truncate=False)


+---------------------------------------------------+-------+
|title                                              |cluster|
+---------------------------------------------------+-------+
|وزير الخارجية يعقد مؤتمر صحفي حول العلاقات الثنائية|0      |
+---------------------------------------------------+-------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, udf, collect_list, expr
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA
import re
import numpy as np

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("ArabicNewsTopicModeling") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# 1. Data Loading
# Replace with the path to your dataset
data = spark.read.json("alakhbar.json")
print("Dataset Schema:")
data.printSchema()
print("Sample Data:")
data.show(5, truncate=80)

# 2. Text Preprocessing for Arabic
# 2.1 Combine fields into one text field
data = data.withColumn("text", concat_ws(" ", col("title"), col("description"), col("content")))

# 2.2 Arabic Text Normalization and Cleaning
def normalize_arabic(text):
    """Normalize Arabic text by removing diacritics, normalizing forms, and keeping only Arabic characters"""
    if text is None:
        return ""

    # Normalize Alef variations
    text = text.replace("أ", "ا").replace("إ", "ا").replace("آ", "ا")

    # Normalize Ya variations
    text = text.replace("ى", "ي")

    # Remove diacritics and tatweel
    diac_pattern = re.compile(r'[\u064B-\u065F\u0640]')
    text = re.sub(diac_pattern, '', text)

    # Keep only Arabic letters and spaces
    arabic_pattern = re.compile(r'[^\u0600-\u06FF\s]')
    text = re.sub(arabic_pattern, ' ', text)

    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

normalize_udf = udf(normalize_arabic, StringType())
data = data.withColumn("norm_text", normalize_udf(col("text")))

# 2.3 Tokenization - Using a pattern that better preserves Arabic words
tokenizer = RegexTokenizer(inputCol="norm_text", outputCol="words", pattern="[^\\p{L}\\p{N}_]+", gaps=True)
tokenized_data = tokenizer.transform(data)

# 2.4 Remove Arabic stopwords
# Extended list of Arabic stopwords
arabic_stopwords = [
    "في", "من", "على", "إلى", "عن", "مع", "هذا", "هذه", "أن", "و", "ب", "ل", "الى",
    "إن", "ثم", "لكن", "أو", "كان", "كانت", "يكون", "وكان", "وكانت", "هو", "هي",
    "كل", "بعض", "تم", "لا", "لم", "لن", "ما", "هناك", "كما", "قال", "قالت", "يقول",
    "عندما", "عند", "حيث", "منذ", "خلال", "بعد", "قبل", "حتى", "اذا", "إذا", "بين",
    "منها", "منه", "له", "لها", "التي", "الذي", "فيه", "فيها", "عنه", "عنها"
]
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=arabic_stopwords)
filtered_data = stopwords_remover.transform(tokenized_data)

# 2.5 Apply stemming with a simple UDF approach
def simple_arabic_stem(word_list):
    """Simple Arabic stemmer that removes common prefixes and suffixes"""
    if word_list is None:
        return []

    result = []

    for word in word_list:
        if len(word) <= 3:  # Don't stem short words
            result.append(word)
            continue

        # Remove prefixes
        if word.startswith("ال") and len(word) > 4:
            word = word[2:]
        elif word.startswith(("و", "ف", "ب", "ي", "ت")) and len(word) > 3:
            word = word[1:]

        # Remove suffixes
        if len(word) > 4:
            if word.endswith(("ون", "ين", "ات", "ان")):
                word = word[:-2]
            elif word.endswith(("تي", "يه", "ية", "ته", "ها", "هم", "هن", "نا")):
                word = word[:-2]
            elif word.endswith(("ه", "ة", "ت")):
                word = word[:-1]

        result.append(word)

    return result

stem_udf = udf(simple_arabic_stem, ArrayType(StringType()))
stemmed_data = filtered_data.withColumn("stemmed", stem_udf(col("filtered")))

# 3. Feature Extraction for LDA
# Using CountVectorizer to create document-term matrix
cv = CountVectorizer(inputCol="stemmed", outputCol="features", vocabSize=10000, minDF=5, minTF=2)
cv_model = cv.fit(stemmed_data)
vectorized_data = cv_model.transform(stemmed_data)

# 4. Build and Train LDA Model
# Setting the number of topics to 5 (for the five target categories)
lda = LDA(k=5, maxIter=50, featuresCol="features", seed=42, optimizer='em', learningDecay=0.5)
lda_model = lda.fit(vectorized_data)

# 5. Topic Extraction and Interpretation
# Get vocabulary from CountVectorizer
vocab = cv_model.vocabulary

# Get top terms for each topic from LDA model
print("Top terms for each topic:")
topics = lda_model.describeTopics(maxTermsPerTopic=20)
topics_with_terms = topics.rdd.map(
    lambda topic: (
        topic.topic,
        [vocab[idx] for idx in topic.termIndices]
    )
).toDF(["topic", "top_terms"])
topics_with_terms.show(truncate=False)

# 6. Automatic Topic Labeling
# Define category keywords (expanded with more terms per category)
category_keywords = {
    "Politics": [
        "رئيس", "وزير", "حكومة", "انتخابات", "سياسة", "برلمان", "دولة", "قرار", "سفير", "خارجية",
        "رئاسة", "مجلس", "سلطة", "دبلوماسية", "سياسي", "زيارة", "قمة", "اجتماع", "وفد", "سفارة",
        "دستور", "تصريح", "عهد", "امير", "ملك", "سلطان", "امن", "رئاسي", "وزارة", "حزب", "نائب"
    ],
    "Economy": [
        "اقتصاد", "سوق", "مال", "شركة", "بنك", "استثمار", "مليون", "دولار", "تجارة", "ميزانية",
        "تمويل", "اسهم", "مصرف", "انتاج", "سعر", "اسعار", "صناعة", "نفط", "ثروة", "معدن",
        "ذهب", "منجم", "مشروع", "تنمية", "سندات", "صندوق", "راس مال", "تضخم", "عملة", "بورصة"
    ],
    "Sports": [
        "مباراة", "لاعب", "فريق", "كرة", "منتخب", "بطولة", "دوري", "هدف", "مدرب", "فوز",
        "قدم", "سلة", "يد", "طائرة", "سباق", "ملعب", "استاد", "تدريب", "مونديال", "اولمبياد",
        "كاس", "جولة", "رياضي", "نادي", "مهاجم", "مدافع", "حارس", "مرمى", "تسجيل", "احراز"
    ],
    "Disasters": [
        "امطار", "سيول", "فيضان", "عاصفة", "اعصار", "جفاف", "حريق", "كارثة", "ضحايا", "سد",
        "انهيار", "غرق", "طوفان", "تضرر", "خسائر", "منكوبة", "اضرار", "انقاذ", "اغاثة", "طوارئ",
        "مياه", "محاصر", "حادث", "وفاة", "مصاب", "اصابة", "كسر", "تحطم", "مطر", "رياح"
    ],
    "Social": [
        "مجتمع", "صحة", "تعليم", "مدرسة", "طلاب", "مستشفى", "مرض", "دواء", "جامعة", "عائلة",
        "طفل", "اطفال", "نساء", "رجال", "شباب", "كبار", "خدمات", "تطوع", "حملة", "توعية",
        "فعالية", "مبادرة", "مهرجان", "احتفال", "منظمة", "جمعية", "هيئة", "مؤسسة", "عامة", "ثقافة"
    ]
}

# UDF to assign category based on keyword overlap with topic terms
def assign_category(top_terms):
    if not top_terms:
        return "Unknown"

    scores = {}
    for category, keywords in category_keywords.items():
        # Count the number of topic terms that appear in each category's keywords
        score = sum(1 for term in top_terms if term in keywords)
        scores[category] = score

    # Return the category with the highest score
    if max(scores.values()) > 0:
        return max(scores.items(), key=lambda x: x[1])[0]
    else:
        return "Unknown"

# Register UDF
assign_category_udf = udf(assign_category, StringType())

# Apply UDF to get category labels for each topic
labeled_topics = topics_with_terms.withColumn("category", assign_category_udf(col("top_terms")))
print("Topics with assigned categories:")
labeled_topics.show(truncate=False)

# Create mapping dictionary from topic ID to category
topic_to_category_df = labeled_topics.select("topic", "category")
topic_to_category = {row["topic"]: row["category"] for row in topic_to_category_df.collect()}
print("Topic to Category Mapping:", topic_to_category)

# 7. Transform the dataset to get topic distributions for all documents
topics_data = lda_model.transform(vectorized_data)

# 8. Function to extract dominant topic
def get_dominant_topic(topic_distribution):
    """Get the index of the topic with highest probability"""
    return int(np.argmax(topic_distribution))

# Register UDF
get_dominant_topic_udf = udf(get_dominant_topic, StringType())

# Get dominant topic for each document
categorized_data = topics_data.withColumn("dominant_topic", get_dominant_topic_udf(col("topicDistribution")))

# 9. Map dominant topic to category
def map_topic_to_category(topic_id):
    """Map topic ID to the assigned category"""
    return topic_to_category.get(int(topic_id), "Unknown")

# Register UDF
map_topic_udf = udf(map_topic_to_category, StringType())

# Apply the mapping to get final category for each document
final_data = categorized_data.withColumn("category", map_topic_udf(col("dominant_topic")))

# 10. Show results
print("Sample of categorized articles:")
final_data.select("id", "title", "dominant_topic", "category").show(10, truncate=80)

# 11. Evaluate the distribution of articles by category
category_distribution = final_data.groupBy("category").count().orderBy("count", ascending=False)
print("Distribution of articles by category:")
category_distribution.show()

# 12. Demonstration on two most recent articles
# Convert pub_date to timestamp and select the two most recent articles
from pyspark.sql.functions import to_timestamp
data_with_timestamp = data.withColumn("timestamp", to_timestamp(col("pub_date")))
recent_articles = data_with_timestamp.orderBy(col("timestamp").desc()).limit(2)

# Use the existing pipeline stages to process these articles
recent_tokenized = tokenizer.transform(recent_articles)
recent_filtered = stopwords_remover.transform(recent_tokenized)
recent_stemmed = recent_filtered.withColumn("stemmed", stem_udf(col("filtered")))
recent_vectorized = cv_model.transform(recent_stemmed)
recent_topics = lda_model.transform(recent_vectorized)

# Get dominant topic and category
recent_categorized = recent_topics \
    .withColumn("dominant_topic", get_dominant_topic_udf(col("topicDistribution"))) \
    .withColumn("category", map_topic_udf(col("dominant_topic")))

# Show the results for the recent articles
print("Categorization of the two most recent articles:")
recent_categorized.select("id", "title", "dominant_topic", "category", "pub_date").show(truncate=80)

# Save the models for future use
lda_model.save("arabic_news_lda_model")
cv_model.save("arabic_news_cv_model")
print("Models saved successfully.")

spark.stop()

Dataset Schema:
root
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- title: string (nullable = true)

Sample Data:
+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+-----+-----------------------------------+-------------------------+------------------------------------------------------------------------+
|                                                                         content|                                                                     description|   id|                               link|                 pub_date|                                                                   title|
+--------------------------------------------------------------------------------+--------------------------------------------

In [None]:
from typing import List, Dict, Any
import re
import numpy as np
import unittest

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import concat_ws, col, udf, to_timestamp
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA

# --- Module-level UDF helpers ---

def normalize_arabic(text: str) -> str:
    if not text:
        return ""
    text = (text.replace("أ", "ا")
                .replace("إ", "ا")
                .replace("آ", "ا")
                .replace("ى", "ي"))
    text = re.sub(r'[\u064B-\u065F\u0640]', '', text)
    text = re.sub(r'[^\u0600-\u06FF\s]', ' ', text)
    return re.sub(r'\s+', ' ', text).strip()

normalize_udf = udf(normalize_arabic, StringType())

def simple_arabic_stem(word_list: List[str]) -> List[str]:
    if not word_list:
        return []
    result = []
    for word in word_list:
        if len(word) <= 3:
            result.append(word)
            continue
        if word.startswith("ال") and len(word) > 4:
            word = word[2:]
        elif word[0] in ("و","ف","ب","ي","ت") and len(word) > 3:
            word = word[1:]
        if len(word) > 4:
            if word.endswith(("ون","ين","ات","ان")):
                word = word[:-2]
            elif word.endswith(("تي","يه","ية","ته","ها","هم","هن","نا")):
                word = word[:-2]
            elif word.endswith(("ه","ة","ت")):
                word = word[:-1]
        result.append(word)
    return result

stem_udf = udf(simple_arabic_stem, ArrayType(StringType()))

def assign_category_udf_factory(category_keywords: Dict[str,List[str]]):
    def assign_category(top_terms: List[str]) -> str:
        if not top_terms:
            return "Unknown"
        scores = {
            cat: sum(1 for term in top_terms if term in kw)
            for cat, kw in category_keywords.items()
        }
        best, best_score = max(scores.items(), key=lambda x: x[1])
        return best if best_score > 0 else "Unknown"
    return udf(assign_category, StringType())

def get_dominant_topic(dist: List[float]) -> str:
    return str(int(np.argmax(dist)))

get_dominant_topic_udf = udf(get_dominant_topic, StringType())

# --- Main class ---

class ArabicNewsTopicModeler:
    def __init__(self, app_name: str = "ArabicNewsTopicModeling", memory: str = "4g"):
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.driver.memory", memory) \
            .getOrCreate()
        self.category_keywords = {
            "Politics": ["رئيس","وزير","حكومة","انتخابات","سياسة","برلمان","دولة","قرار","سفير","خارجية","رئاسة","مجلس","سلطة"],
            "Economy":  ["اقتصاد","سوق","مال","شركة","بنك","استثمار","مليون","دولار","تجارة","ميزانية","تمويل","اسهم","مصرف"],
            "Sports":   ["مباراة","لاعب","فريق","كرة","منتخب","بطولة","دوري","هدف","مدرب","فوز","قدم","سلة","يد"],
            "Disasters":["امطار","سيول","فيضان","عاصفة","اعصار","جفاف","حريق","كارثة","ضحايا","سد","انهيار","غرق","طوفان"],
            "Social":   ["مجتمع","صحة","تعليم","مدرسة","طلاب","مستشفى","مرض","دواء","جامعة","عائلة","طفل","اطفال","نساء"]
        }
        self.assign_category_udf = assign_category_udf_factory(self.category_keywords)

    def preprocess_data(self, input_path: str) -> DataFrame:
        df = self.spark.read.json(input_path)
        df = df.withColumn("text", concat_ws(" ", col("title"), col("description"), col("content")))
        return df.withColumn("norm_text", normalize_udf(col("text")))

    def tokenize_and_filter(self, df: DataFrame) -> DataFrame:
        tokenizer = RegexTokenizer(inputCol="norm_text", outputCol="words", pattern="[^\\p{L}\\p{N}_]+")
        df = tokenizer.transform(df)
        stopwords = ["في","من","على","إلى","عن","مع","هذا","هذه","أن","و","ب","ل","الى","إن","ثم","لكن","أو","كان","كانت"]
        remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stopwords)
        df = remover.transform(df)
        return df.withColumn("stemmed", stem_udf(col("filtered")))

    def extract_topics(self, df: DataFrame, num_topics: int = 5):
        cv = CountVectorizer(inputCol="stemmed", outputCol="features", vocabSize=10000, minDF=5, minTF=2)
        cv_model = cv.fit(df)
        vect = cv_model.transform(df)
        lda = LDA(k=num_topics, maxIter=50, featuresCol="features", seed=42, optimizer='em', learningDecay=0.5)
        lda_model = lda.fit(vect)
        return lda_model, cv_model, vect

    def label_topics(self, lda_model, cv_model) -> DataFrame:
        vocab = cv_model.vocabulary
        topics = lda_model.describeTopics(maxTermsPerTopic=20).collect()
        rows = []
        for t in topics:
            terms = [vocab[idx] for idx in t.termIndices]
            rows.append((t.topic, terms))
        topics_df = self.spark.createDataFrame(rows, ["topic","top_terms"])
        return topics_df.withColumn("category", self.assign_category_udf(col("top_terms")))

    def categorize_documents(self, df_with_dist: DataFrame, topic_to_cat: Dict[int,str]) -> DataFrame:
        df2 = df_with_dist.withColumn("dominant_topic", get_dominant_topic_udf(col("topicDistribution")))
        def map_topic(tid): return topic_to_cat.get(int(tid), "Unknown")
        map_udf = udf(map_topic, StringType())
        return df2.withColumn("category", map_udf(col("dominant_topic")))

    # ... (Other methods like run_topic_modeling, tests, etc., remain unchanged) ...


    def run_topic_modeling(self, input_path: str) -> None:
        """
        Full pipeline for Arabic news topic modeling

        :param input_path: Path to input JSON file
        """
        # Preprocess data
        preprocessed_data = self.preprocess_data(input_path)

        # Tokenize and filter
        stemmed_data = self.tokenize_and_filter(preprocessed_data)

        # Extract topics
        lda_model, cv_model, vectorized_data = self.extract_topics(stemmed_data)

        # Label topics
        labeled_topics = self.label_topics(lda_model, cv_model)
        labeled_topics.show(truncate=False)

        # Create topic to category mapping
        topic_to_category = {
            row["topic"]: row["category"]
            for row in labeled_topics.collect()
        }
        print("Topic to Category Mapping:", topic_to_category)

        # Categorize documents
        final_data = self.categorize_documents(
            lda_model.transform(vectorized_data),
            lda_model,
            topic_to_category
        )

        # Show results
        print("Sample of categorized articles:")
        final_data.select("id", "title", "dominant_topic", "category").show(10, truncate=80)

        # Analyze category distribution
        category_distribution = final_data.groupBy("category").count().orderBy("count", ascending=False)
        print("Distribution of articles by category:")
        category_distribution.show()

        # Optional: Demonstrate on recent articles
        self._analyze_recent_articles(
            preprocessed_data,
            lda_model,
            cv_model,
            topic_to_category
        )

        # Save models
        lda_model.save("arabic_news_lda_model")
        cv_model.save("arabic_news_cv_model")
        print("Models saved successfully.")

    def _analyze_recent_articles(self,
                                 data: DataFrame,
                                 lda_model: Any,
                                 cv_model: Any,
                                 topic_to_category: Dict[int, str]) -> None:
        """
        Analyze the two most recent articles

        :param data: Original DataFrame
        :param lda_model: Trained LDA model
        :param cv_model: Trained CountVectorizer model
        :param topic_to_category: Mapping of topic IDs to categories
        """
        # Add timestamp column and select recent articles
        data_with_timestamp = data.withColumn("timestamp", to_timestamp(col("pub_date")))
        recent_articles = data_with_timestamp.orderBy(col("timestamp").desc()).limit(2)

        # Reuse the existing preprocessing pipeline
        recent_tokenized = self.tokenize_and_filter(recent_articles)
        recent_vectorized = cv_model.transform(recent_tokenized)
        recent_topics = lda_model.transform(recent_vectorized)

        # Categorize recent articles
        recent_categorized = self.categorize_documents(
            recent_topics,
            lda_model,
            topic_to_category
        )

        # Show results for recent articles
        print("Categorization of the two most recent articles:")
        recent_categorized.select("id", "title", "dominant_topic", "category", "pub_date").show(truncate=80)

class ArabicNewsTopicModelingTest(unittest.TestCase):
    """
    Test suite for Arabic News Topic Modeling Pipeline
    """

    @classmethod
    def setUpClass(cls):
        """
        Set up resources for the entire test suite
        """
        cls.modeler = ArabicNewsTopicModeler()

        # Prepare a small sample dataset for testing
        sample_data = [
            {
                "id": "1",
                "title": "سياسة الاقتصاد في المنطقة",
                "description": "تحليل شامل للوضع الاقتصادي",
                "content": "في ظل التغيرات السياسية الحالية، يواجه الاقتصاد تحديات كبيرة",
                "pub_date": "2024-01-15"
            },
            {
                "id": "2",
                "title": "منتخب كرة القدم يحقق فوزًا تاريخيًا",
                "description": "فوز مثير في تصفيات كأس العالم",
                "content": "حقق منتخبنا الوطني فوزًا مهمًا على المنتخب المنافس",
                "pub_date": "2024-01-16"
            }
        ]

        # Save sample data to a temporary file for testing
        cls.temp_json_path = "test_arabic_news.json"
        cls.spark = ArabicNewsTopicModeler().spark
        df = cls.spark.createDataFrame(sample_data)
        df.write.json(cls.temp_json_path, mode="overwrite")

    @classmethod
    def tearDownClass(cls):
        """
        Clean up resources after testing
        """
        # Remove temporary test file
        import shutil
        try:
            shutil.rmtree(cls.temp_json_path)
        except Exception:
            pass

        # Stop Spark session
        cls.modeler.spark.stop()

    def test_text_normalization(self):
        """
        Test Arabic text normalization method
        """
        test_cases = [
            ("أحمد", "احمد"),  # Test Alef variations
            ("مُحَمَّد", "محمد"),  # Test diacritics removal
            ("سَلَام", "سلام"),  # Test various diacritics
        ]

        for input_text, expected in test_cases:
            normalized = self.modeler._normalize_arabic(input_text)
            self.assertEqual(normalized, expected,
                             f"Failed to normalize: {input_text}")

    def test_arabic_stemming(self):
        """
        Test Arabic simple stemming method
        """
        test_cases = [
            (["المدرسة"], ["درسة"]),  # Test prefix removal
            (["يكتبون"], ["كتب"]),  # Test suffix removal
            (["كتاب"], ["كتاب"]),  # Test short word preservation
        ]

        for input_words, expected in test_cases:
            stemmed = self.modeler._simple_arabic_stem(input_words)
            self.assertEqual(stemmed, expected,
                             f"Failed to stem: {input_words}")

    def test_category_assignment(self):
        """
        Test topic category assignment
        """
        test_cases = [
            (["رئيس", "وزير", "حكومة"], "Politics"),
            (["مباراة", "لاعب", "كرة"], "Sports"),
            (["اقتصاد", "سوق", "مال"], "Economy"),
            ([], "Unknown")
        ]

        for terms, expected_category in test_cases:
            category = self.modeler._assign_category(terms)
            self.assertEqual(category, expected_category,
                             f"Failed to categorize: {terms}")

    def test_pipeline_end_to_end(self):
        """
        Test the full topic modeling pipeline
        """
        try:
            # Preprocess data
            preprocessed_data = self.modeler.preprocess_data(self.temp_json_path)

            # Tokenize and filter
            stemmed_data = self.modeler.tokenize_and_filter(preprocessed_data)

            # Extract topics
            lda_model, cv_model, vectorized_data = self.modeler.extract_topics(stemmed_data)

            # Label topics
            labeled_topics = self.modeler.label_topics(lda_model, cv_model)

            # Create topic to category mapping
            topic_to_category = {
                row["topic"]: row["category"]
                for row in labeled_topics.collect()
            }

            # Categorize documents
            final_data = self.modeler.categorize_documents(
                lda_model.transform(vectorized_data),
                lda_model,
                topic_to_category
            )

            # Assertions
            self.assertIsNotNone(final_data)
            self.assertTrue(final_data.count() > 0)

            # Check if categories are assigned
            categories = final_data.select("category").distinct().collect()
            self.assertTrue(len(categories) > 0)

        except Exception as e:
            self.fail(f"Pipeline test failed: {e}")

def run_pipeline_validation(input_path: str):
    """
    Validate the topic modeling pipeline with detailed logging

    :param input_path: Path to the input JSON file
    """
    print("Starting Pipeline Validation...")

    # Initialize the topic modeler
    modeler = ArabicNewsTopicModeler()

    try:
        # Preprocess data
        print("1. Preprocessing Data...")
        preprocessed_data = modeler.preprocess_data(input_path)
        print(f"   Total documents: {preprocessed_data.count()}")

        # Tokenize and filter
        print("2. Tokenizing and Filtering...")
        stemmed_data = modeler.tokenize_and_filter(preprocessed_data)
        print(f"   Processed documents: {stemmed_data.count()}")

        # Extract topics
        print("3. Extracting Topics...")
        lda_model, cv_model, vectorized_data = modeler.extract_topics(stemmed_data)

        # Label topics
        print("4. Labeling Topics...")
        labeled_topics = modeler.label_topics(lda_model, cv_model)
        print("   Topic Labels:")
        labeled_topics.show(truncate=False)

        # Create topic to category mapping
        topic_to_category = {
            row["topic"]: row["category"]
            for row in labeled_topics.collect()
        }
        print("   Topic to Category Mapping:", topic_to_category)

        # Categorize documents
        print("5. Categorizing Documents...")
        final_data = modeler.categorize_documents(
            lda_model.transform(vectorized_data),
            lda_model,
            topic_to_category
        )

        # Show category distribution
        print("6. Category Distribution:")
        category_distribution = final_data.groupBy("category").count().orderBy("count", ascending=False)
        category_distribution.show()

        print("Pipeline Validation Completed Successfully!")

    except Exception as e:
        print(f"Pipeline Validation Failed: {e}")

    finally:
        # Stop Spark session
        modeler.spark.stop()

def main():
    """
    Main function to demonstrate the Arabic News Topic Modeling pipeline
    """
    # Uncomment and modify the path as needed
    input_path = "alakhbar.json"  # Replace with your actual input file path

    # Option 1: Run unit tests
    print("Running Unit Tests...")
    unittest.main(exit=False)

    # Option 2: Validate pipeline with sample data
    print("\nValidating Pipeline...")
    run_pipeline_validation(input_path)

    # Option 3: Full topic modeling
    print("\nRunning Full Topic Modeling...")
    modeler = ArabicNewsTopicModeler()
    modeler.run_topic_modeling(input_path)

if __name__ == "__main__":
    main()


E
ERROR: /root/ (unittest.loader._FailedTest./root/)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/root/'

----------------------------------------------------------------------
Ran 1 test in 0.031s

FAILED (errors=1)


Running Unit Tests...

Validating Pipeline...
Starting Pipeline Validation...
1. Preprocessing Data...
   Total documents: 7800
2. Tokenizing and Filtering...
   Processed documents: 7800
3. Extracting Topics...
4. Labeling Topics...
   Topic Labels:
+-----+-------------------------------------------------------------------------------------------------------------------------+---------+
|topic|top_terms                                                                                                                |category |
+-----+-------------------------------------------------------------------------------------------------------------------------+---------+
|0    |[اخبار, رئيس, نواكشوط, ان, يوم, مالي, علي, الي, اتحاد, قال, اعلن, خلال, يان, مجموع, التي, امن, لجنة, بلاد, دول, افريقيا] |Politics |
|1    |[علي, ان, الي, مم, ما, لا, التي, الله, الذي, ذلك, او, كل, ولا, لم, ولد, حزب, انه, محمد, هو, بين]                         |Unknown  |
|2    |[ان, علي, الي, ولد, التي, شركة, انه, ما, ا

TypeError: ArabicNewsTopicModeler.categorize_documents() takes 3 positional arguments but 4 were given

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, col, udf, collect_list, expr
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.clustering import LDA
import re
import numpy as np

spark = SparkSession.builder \
    .appName("ArabicNewsTopicModeling") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


data = spark.read.json("alakhbar.json")
print("Structure of the loaded dataset:")
data.printSchema()
print("Preview of the first 5 records:")
data.show(5, truncate=80)


data = data.withColumn("text", concat_ws(" ", col("title"), col("description"), col("content")))

def normalize_arabic(text):
    """Normalize Arabic text by removing diacritics, normalizing forms, and keeping only Arabic characters"""
    if text is None:
        return ""

    text = text.replace("أ", "ا").replace("إ", "ا").replace("آ", "ا")

    text = text.replace("ى", "ي")

    diac_pattern = re.compile(r'[\u064B-\u065F\u0640]')
    text = re.sub(diac_pattern, '', text)

    arabic_pattern = re.compile(r'[^\u0600-\u06FF\s]')
    text = re.sub(arabic_pattern, ' ', text)

    text = re.sub(r'\s+', ' ', text).strip()

    return text

normalize_udf = udf(normalize_arabic, StringType())
data = data.withColumn("norm_text", normalize_udf(col("text")))

tokenizer = RegexTokenizer(inputCol="norm_text", outputCol="words", pattern="[^\\p{L}\\p{N}_]+", gaps=True)
tokenized_data = tokenizer.transform(data)


arabic_stopwords = [
    "في", "من", "على", "إلى", "عن", "مع", "هذا", "هذه", "أن", "و", "ب", "ل", "الى",
    "إن", "ثم", "لكن", "أو", "كان", "كانت", "يكون", "وكان", "وكانت", "هو", "هي",
    "كل", "بعض", "تم", "لا", "لم", "لن", "ما", "هناك", "كما", "قال", "قالت", "يقول",
    "عندما", "عند", "حيث", "منذ", "خلال", "بعد", "قبل", "حتى", "اذا", "إذا", "بين",
    "منها", "منه", "له", "لها", "التي", "الذي", "فيه", "فيها", "عنه", "عنها"
]
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=arabic_stopwords)
filtered_data = stopwords_remover.transform(tokenized_data)

def simple_arabic_stem(word_list):
    """Simple Arabic stemmer that removes common prefixes and suffixes"""
    if word_list is None:
        return []

    result = []

    for word in word_list:
        if len(word) <= 3:
            result.append(word)
            continue

        if word.startswith("ال") and len(word) > 4:
            word = word[2:]
        elif word.startswith(("و", "ف", "ب", "ي", "ت")) and len(word) > 3:
            word = word[1:]

        if len(word) > 4:
            if word.endswith(("ون", "ين", "ات", "ان")):
                word = word[:-2]
            elif word.endswith(("تي", "يه", "ية", "ته", "ها", "هم", "هن", "نا")):
                word = word[:-2]
            elif word.endswith(("ه", "ة", "ت")):
                word = word[:-1]

        result.append(word)

    return result

stem_udf = udf(simple_arabic_stem, ArrayType(StringType()))
stemmed_data = filtered_data.withColumn("stemmed", stem_udf(col("filtered")))


cv = CountVectorizer(inputCol="stemmed", outputCol="features", vocabSize=10000, minDF=5, minTF=2)
cv_model = cv.fit(stemmed_data)
vectorized_data = cv_model.transform(stemmed_data)


lda = LDA(k=5, maxIter=50, featuresCol="features", seed=42, optimizer='em', learningDecay=0.5)
lda_model = lda.fit(vectorized_data)


vocab = cv_model.vocabulary

print("Most representative keywords for each identified topic:")
topics = lda_model.describeTopics(maxTermsPerTopic=20)
topics_with_terms = topics.rdd.map(
    lambda topic: (
        topic.topic,
        [vocab[idx] for idx in topic.termIndices]
    )
).toDF(["topic", "top_terms"])
topics_with_terms.show(truncate=False)

category_keywords = {
    "Politics": [
        "رئيس", "وزير", "حكومة", "انتخابات", "سياسة", "برلمان", "دولة", "قرار", "سفير", "خارجية",
        "رئاسة", "مجلس", "سلطة", "دبلوماسية", "سياسي", "زيارة", "قمة", "اجتماع", "وفد", "سفارة",
        "دستور", "تصريح", "عهد", "امير", "ملك", "سلطان", "امن", "رئاسي", "وزارة", "حزب", "نائب"
    ],
    "Economy": [
        "اقتصاد", "سوق", "مال", "شركة", "بنك", "استثمار", "مليون", "دولار", "تجارة", "ميزانية",
        "تمويل", "اسهم", "مصرف", "انتاج", "سعر", "اسعار", "صناعة", "نفط", "ثروة", "معدن",
        "ذهب", "منجم", "مشروع", "تنمية", "سندات", "صندوق", "راس مال", "تضخم", "عملة", "بورصة"
    ],
    "Sports": [
        "مباراة", "لاعب", "فريق", "كرة", "منتخب", "بطولة", "دوري", "هدف", "مدرب", "فوز",
        "قدم", "سلة", "يد", "طائرة", "سباق", "ملعب", "استاد", "تدريب", "مونديال", "اولمبياد",
        "كاس", "جولة", "رياضي", "نادي", "مهاجم", "مدافع", "حارس", "مرمى", "تسجيل", "احراز"
    ],
    "Disasters": [
        "امطار", "سيول", "فيضان", "عاصفة", "اعصار", "جفاف", "حريق", "كارثة", "ضحايا", "سد",
        "انهيار", "غرق", "طوفان", "تضرر", "خسائر", "منكوبة", "اضرار", "انقاذ", "اغاثة", "طوارئ",
        "مياه", "محاصر", "حادث", "وفاة", "مصاب", "اصابة", "كسر", "تحطم", "مطر", "رياح"
    ],
    "Social": [
        "مجتمع", "صحة", "تعليم", "مدرسة", "طلاب", "مستشفى", "مرض", "دواء", "جامعة", "عائلة",
        "طفل", "اطفال", "نساء", "رجال", "شباب", "كبار", "خدمات", "تطوع", "حملة", "توعية",
        "فعالية", "مبادرة", "مهرجان", "احتفال", "منظمة", "جمعية", "هيئة", "مؤسسة", "عامة", "ثقافة"
    ]
}

def assign_category(top_terms):
    if not top_terms:
        return "Unknown"

    scores = {}
    for category, keywords in category_keywords.items():
        score = sum(1 for term in top_terms if term in keywords)
        scores[category] = score

    if max(scores.values()) > 0:
        return max(scores.items(), key=lambda x: x[1])[0]
    else:
        return "Unknown"

assign_category_udf = udf(assign_category, StringType())

labeled_topics = topics_with_terms.withColumn("category", assign_category_udf(col("top_terms")))
print("Semantic categories assigned to each discovered topic:")
labeled_topics.show(truncate=False)

topic_to_category_df = labeled_topics.select("topic", "category")
topic_to_category = {row["topic"]: row["category"] for row in topic_to_category_df.collect()}
print("Dictionary mapping topic numbers to their content categories:", topic_to_category)

topics_data = lda_model.transform(vectorized_data)

def get_dominant_topic(topic_distribution):
    """Get the index of the topic with highest probability"""
    return int(np.argmax(topic_distribution))

get_dominant_topic_udf = udf(get_dominant_topic, StringType())

categorized_data = topics_data.withColumn("dominant_topic", get_dominant_topic_udf(col("topicDistribution")))


def map_topic_to_category(topic_id):
    """Map topic ID to the assigned category"""
    return topic_to_category.get(int(topic_id), "Unknown")


map_topic_udf = udf(map_topic_to_category, StringType())


final_data = categorized_data.withColumn("category", map_topic_udf(col("dominant_topic")))


print("Example articles with their automatically detected categories:")
final_data.select("id", "title", "dominant_topic", "category").show(10, truncate=80)


category_distribution = final_data.groupBy("category").count().orderBy("count", ascending=False)
print("Summary of article count in each thematic category:")
category_distribution.show()


from pyspark.sql.functions import to_timestamp
data_with_timestamp = data.withColumn("timestamp", to_timestamp(col("pub_date")))
recent_articles = data_with_timestamp.orderBy(col("timestamp").desc()).limit(2)

recent_tokenized = tokenizer.transform(recent_articles)
recent_filtered = stopwords_remover.transform(recent_tokenized)
recent_stemmed = recent_filtered.withColumn("stemmed", stem_udf(col("filtered")))
recent_vectorized = cv_model.transform(recent_stemmed)
recent_topics = lda_model.transform(recent_vectorized)

recent_categorized = recent_topics \
    .withColumn("dominant_topic", get_dominant_topic_udf(col("topicDistribution"))) \
    .withColumn("category", map_topic_udf(col("dominant_topic")))

print("Topic classification for most recently published articles:")
recent_categorized.select("id", "title", "dominant_topic", "category", "pub_date").show(truncate=80)

lda_model.save("arabic_news_lda_model")
cv_model.save("arabic_news_cv_model")
print("LDA model and CountVectorizer successfully persisted to disk.")

spark.stop()

Structure of the loaded dataset:
root
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- title: string (nullable = true)

Preview of the first 5 records:
+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+-----+-----------------------------------+-------------------------+------------------------------------------------------------------------+
|                                                                         content|                                                                     description|   id|                               link|                 pub_date|                                                                   title|
+--------------------------------------------------------------------------------+--------