# News Topic modeling


## I- Modules import

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import  IDF, HashingTF,CountVectorizer
from pyspark.ml import  Pipeline
from math import ceil,log2
from pyspark.ml.classification import LogisticRegression,NaiveBayes,LogisticRegressionModel
from pyspark.sql.functions import col,explode,split

import numpy as np
from pyspark.ml.clustering import LDA

## II- Spark context and session creation

In [None]:
spark = (SparkSession.builder
    .master("spark://node15:7077")
    .appName("NewsTopicModeling")
    .getOrCreate()
        )
spark

## III- Dataframe preparing

### 1. Load the data

In [None]:
# Load data
df = spark.read.parquet("input/news.parquet", header=True, inferSchema=True)

Py4JJavaError: An error occurred while calling o264.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:834)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:63)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:497)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:132)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:563)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
#spark.stop()

### 2. Partition and cache the dataframe

In [None]:
df.rdd.getNumPartitions()

9

In [None]:
num_partitions=4*20
df= df.repartition(num_partitions).cache()

In [None]:
df.rdd.getNumPartitions()



80

### 3. Preview the data

In [None]:
df.count()

                                                                                

1716608

In [None]:
df.show()

+--------------+--------------------+
|category_label|description_filtered|
+--------------+--------------------+
|           9.0|best baby toddler...|
|          11.0| summer luxury italy|
|          10.0|crop top might ac...|
|           9.0|parent reality le...|
|          11.0|best redness past...|
|          10.0|90 hair relaxer c...|
|           9.0|toddler kernel pu...|
|          11.0|london olympiad d...|
|          11.0|resort encourage ...|
|           9.0|challenge present...|
|          10.0|brooke carapace w...|
|           9.0|allhallows eve co...|
|          10.0|ashlee wallis war...|
|          10.0|fashion flashback...|
|          11.0|hidden mickey spo...|
|           9.0|trooper help deli...|
|          11.0|chef architect di...|
|           9.0|pricey mom prenat...|
|          10.0|toothpaste dry pi...|
|          11.0|work home rabbi d...|
+--------------+--------------------+
only showing top 20 rows



In [None]:
df.printSchema()

root
 |-- category_label: double (nullable = true)
 |-- description_filtered: string (nullable = true)



### 4. Convert filtered descriptions to arrays

In [None]:
# Create a new DataFrame with description_filtered as arrays
df= df.withColumn('description_filtered', split(col('description_filtered'), ' '))
# Show the new DataFrame
df.show(truncate=False)

+--------------+-------------------------------------------------------------------------------------------------------------------+
|category_label|description_filtered                                                                                               |
+--------------+-------------------------------------------------------------------------------------------------------------------+
|9.0           |[best, baby, toddler, product, year]                                                                               |
|11.0          |[summer, luxury, italy]                                                                                            |
|10.0          |[crop, top, might, actually]                                                                                       |
|9.0           |[parent, reality, leave, tyke]                                                                                     |
|11.0          |[best, redness, pasta, sauce, italian, love, life, ve

## IV- Feature Engineering


### 1. Explode the filtered descriptions to get the words

In [None]:
exploded_df=df.select(explode(df.description_filtered)).alias('words')
exploded_df.show()

+--------+
|     col|
+--------+
|    best|
|    baby|
| toddler|
| product|
|    year|
|  summer|
|  luxury|
|   italy|
|    crop|
|     top|
|   might|
|actually|
|  parent|
| reality|
|   leave|
|    tyke|
|    best|
| redness|
|   pasta|
|   sauce|
+--------+
only showing top 20 rows



In [None]:
#df=df.unpersist()

### 2. Get unique words in the filtered_description

In [None]:
unique_words=exploded_df.distinct()

### 3. Cache and show the unique words dataframe

In [None]:
unique_words=unique_words.cache()
unique_words.show()



+------------+
|         col|
+------------+
|     melodic|
|   traveling|
|         art|
|       oscar|
|      outfit|
|      travel|
|       mammy|
|       inner|
|        pant|
|      online|
|       still|
|     jewelry|
|accumulation|
|        hope|
|      bazaar|
|      voyage|
|    everyday|
|     blossom|
|     embrace|
|        clog|
+------------+
only showing top 20 rows



                                                                                

### 4. Get the vocabulary size

In [None]:
vocabulary_size=unique_words.count()
vocabulary_size

128622

### 5. Define the CountVectorizer and IDF stages

In [None]:
# Define the HashingTF and IDF stages
vectorizer = CountVectorizer(inputCol="description_filtered", outputCol="raw_features",vocabSize=vocabulary_size, minDF=3.0)
idf = IDF(inputCol="raw_features", outputCol="features")

## V- Models set up, training and evaluation

### 1. Set up LDA model

In [None]:
#num_topics = 20
#lda = LDA(k=num_topics, maxIter=10)
lda = LDA(featuresCol="features",seed=0)
lda

LDA_e18db5c90121

### 2. Set up pipelines

We will  set up the pipelines of the following transformations for Naive Bayes and Linear reggression

- CountVectorizer
- IDF

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import LDA

# Create pipeline for LDA
pipeline = Pipeline(stages=[vectorizer, idf, lda])


pipeline

Pipeline_369afe3f2f73

### 3. Split the data

First of all let us split the data into train and test set: 80% for train and 20% for test

In [None]:
# Split data
(train_set, test_set) = df.randomSplit([0.8, 0.2], seed=0)

### 4. Create a function for model training

Let us create a function which takes as argument a model that it trains and then returns the trained model.

In [None]:
def train_model(model):
    return model.fit(train_set)

In [None]:
fitted_model=train_model(pipeline)
fitted_model

24/06/06 13:51:49 WARN DAGScheduler: Broadcasting large task binary with size 1984.0 KiB
24/06/06 13:51:51 WARN DAGScheduler: Broadcasting large task binary with size 1984.0 KiB
24/06/06 13:51:52 WARN DAGScheduler: Broadcasting large task binary with size 2000.9 KiB
24/06/06 13:51:53 WARN DAGScheduler: Broadcasting large task binary with size 2004.0 KiB
24/06/06 13:51:57 WARN DAGScheduler: Broadcasting large task binary with size 2005.1 KiB
24/06/06 13:51:58 WARN DAGScheduler: Broadcasting large task binary with size 2000.9 KiB
24/06/06 13:51:59 WARN DAGScheduler: Broadcasting large task binary with size 2004.0 KiB
24/06/06 13:52:01 WARN DAGScheduler: Broadcasting large task binary with size 2005.1 KiB
24/06/06 13:52:02 WARN DAGScheduler: Broadcasting large task binary with size 2000.9 KiB
24/06/06 13:52:02 WARN DAGScheduler: Broadcasting large task binary with size 2004.0 KiB
24/06/06 13:52:04 WARN DAGScheduler: Broadcasting large task binary with size 2005.1 KiB
24/06/06 13:52:07 WAR

PipelineModel_01bccd6396f8

### 5. Visualize the topics

In [None]:
fitted_vectirizer=fitted_model.stages[0]
vocabulary= fitted_vectirizer.vocabulary
len(vocabulary)

73411

In [None]:
vocabulary[:10]

['new', 'photo', 'state', 'trump', 'day', 'nt', 'say', 'woman', 'get', 'make']

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[7, 137, 2, 318, ...|[0.00404272871093...|
|    1|[3, 36, 107, 4, 1...|[0.00444673252088...|
|    2|[30, 16, 0, 57, 1...|[0.00510976670347...|
|    3|[4, 1, 11, 26, 34...|[0.00456010441787...|
|    4|[27, 3, 0, 2, 47,...|[0.00347962927032...|
|    5|[35, 0, 187, 5, 1...|[0.00399826687904...|
|    6|[100, 0, 123, 1, ...|[0.00358280357609...|
|    7|[230, 328, 222, 2...|[0.00373880100877...|
|    8|[0, 8, 1, 22, 56,...|[0.00302276662198...|
|    9|[21, 93, 5, 26, 5...|[0.00814480394485...|
+-----+--------------------+--------------------+



Py4JJavaError: An error occurred while calling o252.javaToPython.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:834)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at org.apache.spark.sql.SparkSession.$anonfun$leafNodeDefaultParallelism$1(SparkSession.scala:906)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.leafNodeDefaultParallelism(SparkSession.scala:906)
	at org.apache.spark.sql.execution.LocalTableScanExec.rdd$lzycompute(LocalTableScanExec.scala:53)
	at org.apache.spark.sql.execution.LocalTableScanExec.rdd(LocalTableScanExec.scala:48)
	at org.apache.spark.sql.execution.LocalTableScanExec.doExecute(LocalTableScanExec.scala:60)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:207)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:206)
	at org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:4139)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
topics = fitted_model.stages[-1].describeTopics()
topics.show()

topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocabulary[idx] for idx in idx_list])\
       .collect()
topics_words[:2]



for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

### Get topics distributions

In [None]:
# Transform the training and test data
train_set_transformed = fitted_model.transform(train_set)
test_set_transformed = fitted_model.transform(test_set)

# Get the LDA model from the pipeline model
lda_model = fitted_model.stages[-1]

# Extract the topic distributions
train_topic_distributions = train_set_transformed.select("description_filtered", "topicDistribution")
test_topic_distributions = test_set_transformed.select("description_filtered", "topicDistribution")

In [None]:
# Show the topic distributions for the training set
train_topic_distributions.show(truncate=False)

# Show the topic distributions for the test set
test_topic_distributions.show(truncate=False)

### 5. Define a function to evaluate the model

The function takes as parameter a fitted model, evaluates the model on train and test split and then return the train and test performance. The accuracy is the metric used.

In [None]:
# Function to evaluate model and get best parameters
def evaluate_model(fitted_model,data_transformed=[train_set_transformed,test_set_transformed]):

    print('Evaluating the model on training set')
    train_lp = fitted_model.logPerplexity(data_transformed[0])

    print('Evaluating the model on test set')
    test_lp = fitted_model.logPerplexity(data_transformed[1])

    print("The upper bound on perplexity for train set: " + str(train_lp))
    print("The upper bound on perplexity for test set: " + str(test_lp))
    return train_lp, test_lp

NameError: name 'train_set_transformed' is not defined

In [None]:
train_lp,test_lp=evaluate_model(fitted_model.stages[-1])
train_lp,test_lp

In [None]:
def

In [None]:
results={}
num_topics_range=[20, 25, 30, 35, 40, 45, 50]

for num_topics in num_topics_range:
    print('LDA for k={}'.format(num_topics))
    # Create LDA
    lda = LDA(featuresCol="features",seed=0)
    # Create pipeline for LDA
    pipeline = Pipeline(stages=[vectorizer, idf, lda])
    print('Model training')
    # Train the model
    fitted_model=train_model(pipeline)
    print('Done')

    train_set_transformed = fitted_model.transform(train_set)
    test_set_transformed = fitted_model.transform(test_set)
    train_lp,test_lp=evaluate_model(fitted_model,data_transformed=[train_set_transformed,test_set_transformed])
    results[num_topics]=



### 6. Create a function which takes pipelines and train the models, evaluate them and then return the results

We remark that
- Naive Bayes
- Logistic regression

We can then conclude that t
- he two models set a good performance on both training and test set.
- The Logistic regression models outperforms the Naive Bayes model

In the next section, we will tune the parameters of the Naive bayes to get the best parameters.

## VI- Logistic regression hyperparameters tuning

### 1. Pipeline creation

In [None]:
# Define parameter grids for Logistic regresion grid search
reg_values = np.logspace(-4, 4, num=100)
l1_ratios = np.linspace(0, 1, num=10)

paramGrid_lr=paramGrid_lr.addGrid(lr.regParam, reg_values).build()

# Create Cross-validation for Logistic Regression
cv_lr = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr,
                        evaluator=MulticlassClassificationEvaluator(labelCol="category_label", predictionCol="prediction", metricName="accuracy"),
                        numFolds=3, parallelism=1)


# Create pipeline for Logistic Regression
pipeline_lr = Pipeline(stages=[hashingTF, idf, cv_lr])

pipeline_lr

### 2. Hyperparameters tuning

In [None]:
results=train_and_evaluate_models(model_pipelines=[pipeline_lr],model_names=["Logistic Regression"])
results

### 3. Get the best parameters

In [None]:
fitted_model=results['fitted_model']

# Get the best model
best_model = fitted_model.stages[-1].bestModel

# Print the best parameters
print(f"Best parameters for Logistic regression:")

for param, value in best_model.extractParamMap().items():
     print(f"  {param.name}: {value}")

### 4. Save the best model

In [None]:
best_model.save('output/news_categorization_model')

24/06/04 20:03:43 WARN TaskSetManager: Stage 216 contains a task of very large size (33450 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

## VII- Summary

In this notebook we have studied two models for our news categorization task. There are Naive Bayes and Logistic regression.

 Our study reveals that the Logistic regression was the one with best performance.

 Then we tunned the Logistic regression hyperparameters using grid search and then we find the best model that we save.

 The next step of our work will be to ...

In [None]:
#df.unpersist()