The following code is to create environment folder which will contain the kaggle credentials ( You need to add you're credentials )

In [None]:
import os
os.makedirs('/root/.kaggle', exist_ok=True)

with open('/root/.kaggle/kaggle.json', 'w') as f:
    f.write('{"username":"Kaggle username","key":"Kaggle API Key"}')


Then we will download the dataset from kaggle

In [None]:
!kaggle datasets download -d jonasbecker98/286k-topic-clustered-news-articles

Unzipping the dataset

In [None]:
!unzip 286k-topic-clustered-news-articles.zip

Installing spark

In [None]:
!pip install pyspark

Installing java, because it will be used by Spark

In [None]:
import os       #importing os to set environment variable
def install_java():
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      #install openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
  !java -version       #check java version
install_java()

Since the dataset is already clustered we have removed all columns added as a result of the K-Means clustring, so we can re-cluster it again.

In [None]:
import os
import json

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator

json_array = []

def data_preprocessing():

    # First of all we will list all the directories in the dataset directory
    dataset_dir = "/content/clustered_json"

    # array of main directories
    dirs_list = [d for d in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, d))]

    # array of subdirectories
    sub_dirs_list = []
    for main_dir in dirs_list:
        for d in os.listdir(dataset_dir + "/" + main_dir):
            if os.path.isdir(os.path.join(dataset_dir, main_dir, d)):
                sub_dirs_list.append(dataset_dir + "/" + main_dir + "/" + d)

    # reading the json files one by one then adding tem all in one json array
    for sub_dir in sub_dirs_list:
        for file in os.listdir(sub_dir):
            if file.endswith("json"):
                with open(sub_dir + '/' + file, 'r') as f:
                    json_data = json.load(f)
                    for j in json_data['data']:
                        json_array.append(j)


    spark = SparkSession.builder.appName('MyApp') \
        .config('spark.ui.port', '4050') \
        .config("spark.driver.memory", "12g") \
        .getOrCreate()

    df = spark.read.json(spark.sparkContext.parallelize(json_array))

    keep_cols = keep_cols = ['maintext', 'date_download', 'date_modify', 'date_publish', 'description', 'language', 'year_month']

    new_df = df.select([col(c) for c in keep_cols])
    new_df = new_df.withColumn('maintext', new_df['maintext'].cast(StringType()))
    new_df = new_df.filter(col("maintext").isNotNull())

    # Tokenize the text data
    tokenizer = Tokenizer(inputCol="maintext", outputCol="words")
    new_df = tokenizer.transform(new_df)

    # Remove stop words
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    new_df = remover.transform(new_df)

    # Filter out rows with null values in the filtered_words column
    new_df = new_df.filter(new_df.filtered_words.isNotNull())

    # Compute term frequencies
    cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")    
    cv_model = cv.fit(new_df)
    new_df = cv_model.transform(new_df)

    # Compute IDF values
    idf = IDF(inputCol="raw_features", outputCol="input")
    idf_model = idf.fit(new_df)
    new_df = idf_model.transform(new_df)

    # create a vector assembler to convert the text column into a vector
    vec_assembler = VectorAssembler(inputCols=['input'], outputCol='features')

    # apply the vector assembler to your dataframe
    new_df = vec_assembler.transform(new_df)

    return new_df

In [None]:
def K_Means(dataframe, k=5, seed=1):

    # train the K-means model on the features column
    kmeans = KMeans().setK(k).setSeed(seed)
    model = kmeans.fit(dataframe.select('features'))

    # add the predicted clusters to your original dataframe
    df_pred = model.transform(dataframe).withColumnRenamed('prediction', 'kmeans_cluster_id')

    return df_pred


In [None]:
df = data_preprocessing()

We have two methods (Elbow & Silhouette) for finding the optimal K, but in fact usig one of them is enough, and you can choose based on your dataset

In [None]:
def elbow_method(dataframe, k_range):
    # Repartition to increase parallelism and avoid memory issues
    dataframe = dataframe.repartition(10)

    costs = []
    for k in k_range:
        kmeans = KMeans().setK(k).setSeed(1)
        model = kmeans.fit(dataframe.select('features'))
        cost = model.summary.trainingCost
        costs.append(cost)

    # determine the optimal k
    delta = np.diff(costs)
    acceleration = np.diff(delta)
    optimal_k = k_range[acceleration.argmax() + 1]
    return optimal_k

elbow_optimal_k = elbow_method(df, range(2, 10))
print(f"Elbow Optimal: {elbow_optimal_k}")

In [None]:
def silhouette_method(dataframe, k_range):

    # Repartition to increase parallelism and avoid memory issues
    dataframe = dataframe.repartition(10)
    
    scores = []
    for k in k_range:
        kmeans = KMeans().setK(k).setSeed(1)
        model = kmeans.fit(dataframe.select('features'))
        predictions = model.transform(dataframe)
        evaluator = ClusteringEvaluator()
        score = evaluator.evaluate(predictions)
        scores.append(score)
   
    # determine the optimal k
    optimal_k = k_range[scores.index(max(scores))]
    
    return optimal_k


silhouette_optimal_k = silhouette_method(df, range(2,10))
print(f"Silhouette Optimal: {silhouette_optimal_k}")

In [None]:
pred = K_Means(df, k=silhouette_optimal_k)
print(pred.show(5))

In [None]:
unique_values = pred.select('kmeans_cluster_id').distinct().rdd.flatMap(lambda x: x).collect()
print(f"Clusters Array: {unique_values}")