In [0]:
%pyspark
!pip install wordcloud nltk numpy

In [1]:
%pyspark
# from pyspark import SparkFiles
%matplotlib inline
import matplotlib.pyplot as plt
from pyspark.sql.functions import size, avg, col, length, isnull, udf, explode
import string
import re
from  pyspark.ml.feature import Tokenizer
import nltk
from nltk.corpus import stopwords
from pyspark.sql import DataFrame
from PIL.Image import Image
from wordcloud import WordCloud

SEED = 176

In [2]:
%pyspark
# from pyspark import SparkFiles
# url = "https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv"

# sc.addFile(url)
# df = spark.read.csv("file://"+SparkFiles.get("wikipedia.csv"), header = True, quote='\"', escape='\"')
# df = df.drop("_c0")
# df = df.withColumnRenamed("categoria","category")
# df.printSchema()
# df.show()

In [3]:
%pyspark


# url = "https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv"

# sc.addFile(url)
# spark_df = spark.read.csv("file://"+SparkFiles.get("wikipedia.csv"), header = True, quote='\"', escape='\"')
spark_df = spark.read.csv("/data/wikipedia.csv", header = True, quote='\"', escape='\"')
spark_df = spark_df.drop("_c0")
spark_df = spark_df.withColumnRenamed("categoria","category")
spark_df.printSchema()
spark_df.show()

# _ = spark.sql("DROP TABLE IF EXISTS wikipedia")
# spark_df.write.saveAsTable("wikipedia")
spark_df.createOrReplaceTempView("wikipedia")

In [4]:
%sql
-- check
SELECT * FROM wikipedia LIMIT 10

# EDA - Explorative data analysis
The EDA aims to understand wikipedia articles features, related to their category  

First of all, let's check if there are missing values in `documents`and `summary` columns

In [7]:
%pyspark
print("Number of missing values for documents column:")
print(spark_df.where(isnull("documents")).count())
print("Number of missing values for summary column:")
print(spark_df.where(isnull("summary")).count())

#check if they're same records
print(spark_df.where(isnull("summary") & isnull("documents")).count())


There are some missing values, and those records have both `summary` and `documents` missing.
Since these record carry no information, they can be dropped without problems.


In [9]:
%pyspark

spark_df = spark_df.filter(~isnull("documents"))

#check - this should be 0 now
print(spark_df.where(isnull("documents")).count())

#update temp view
spark_df.createOrReplaceTempView("wikipedia")

## Articles counts for each category

In [11]:
%sql
SELECT
  w.category,
  COUNT(*) AS count
FROM wikipedia w
GROUP BY w.category


In [12]:
%pyspark
result_set = spark.sql("SELECT w.category, COUNT(*) AS count FROM wikipedia w GROUP BY w.category").collect()
categories = [row["category"] for row in result_set]
counts = [row["count"] for row in result_set]

plt.figure()
plt.bar(categories, counts)
plt.xlabel("Category")
plt.ylabel("Frequency")
plt.title("Category absolute frequency distribution")
plt.xticks(rotation=45)
plt.show()

#TODO improve the chart

The dataset is quite balanced (in terms of categories), even if `politics` category is a bit more represented than other ones.

## Average words count in each article
Articles are stored in `documents` column

documents -> string split [tokenizer] -> length of each split -> mean of these lengths

In [16]:
%pyspark

@udf
def remove_punctuation_and_numbers(sentence:str) -> str:
    """
    Compute a raw text cleaning, by removing punctuation, only digits words and replce multiple spaces with single ones.

    Words with only digits and punctuation are removed too, like doi numbers, example: 10.1080/13501780801913298
    """
    #lowercase
    sentence = sentence.lower()

    for c in string.punctuation:
        sentence = sentence.replace(c, " ")
    
    #remove only digits words
    sentence = re.sub(r"\b\d+\b","", sentence)

    #remove multiple spaces
    sentence = re.sub(r" +"," ",sentence)

    #questo qui sotto è per le stopwords(dopo)
    #clean_sentence = " ".join([word for word in sentence.split() if word not in string.punctuation])

    return sentence

In [17]:
%pyspark
# #check 
example = "this is a doi number: 10.1080/13501780801913298 bla bla another number: 1256 eee"
print(example)
print(remove_punctuation_and_numbers(example))

version with stopwords clean

In [19]:
%pyspark

nltk.download("stopwords")

stopwords_en: list[str] = stopwords.words("english")


In [20]:
%pyspark

@udf
def clean_text(sentence:str) -> str:
    """
    Compute text cleaning, by removing stopwords, punctuation, only digits words and replce multiple spaces with single ones.

    Words with only digits and punctuation are removed too, like doi numbers, example: 10.1080/13501780801913298
    """

    for c in string.punctuation:
        sentence = sentence.replace(c, " ")
        
    #stopwords
    sentence = " ".join([word for word in sentence.split() if word not in stopwords_en])
    
    #remove only digits words
    sentence = re.sub(r"\b\d+\b","", sentence)

    #remove multiple spaces
    sentence = re.sub(r" +"," ",sentence)

    return sentence

In [21]:
%pyspark
# spark_df_clean = spark_df.withColumn("documents_clean", remove_punctuation_and_numbers("documents"))
spark_df_clean = spark_df.withColumn("documents_clean", clean_text("documents"))

In [22]:
%pyspark
spark_df_clean.show()

## TODO: gestire i casi qui sotto

In [24]:
%pyspark
spark_df_clean.select("documents").where(length("documents_clean") < 20).count()

In [25]:
%pyspark

tokenizer = Tokenizer(inputCol="documents_clean",outputCol="words")


df_words = tokenizer.transform(spark_df_clean)
#check
df_words.show(5)

In [26]:
%pyspark
df_words.withColumn("size",size("words")).select(avg(col("size")).alias("average_words_count")).show()

## Length of longest and shortest articles for each category


In [28]:
%sql
SELECT 
    category,
    MIN(LENGTH(documents)) AS min_len,
    MAX(LENGTH(documents)) AS max_len
FROM wikipedia
GROUP BY category


In [29]:
%sql
-- WARNING
-- SELECT * FROM wikipedia WHERE LENGTH(documents) = 7

## Word cloud for each category
compute the count for each word inside category, and then select the most N frequent words.

In [31]:
%pyspark
def generate_wordcloud_by_category(df_words: DataFrame, category:str, N: int = 1000) -> Image:
    """
    Generates a word cloud representation for given category. 
    
    Parameters:
    - df_words (pyspark.sql.Dataframe):
        spark DataFrame containing the column 'words' as a array of strings column; 
        this columns contains the words for each record.
    - category (str):
        a value from 'category' column, that is used to filter records
    - N (int) default 1000:
        number of words to include in wordcloud. In other terms, the N most frequent words will be used
        to make the word cloud.
    
    Return (PIL.Image.Image):
        Pillow Image generated by wordcloud.to_image()
    """
    
    tmp_words = df_words.select(explode("words").alias("words_explode")).where(f"category = '{category}'")\
    .groupBy("words_explode").count()\
    .orderBy("count",ascending=False)
    
    # uncomment for debugging
    # tmp_words.show()
    
    terms_freq: dict[str, int] = {word:freq for word, freq in tmp_words.head(N)}
    
    wordcloud = WordCloud(width=800,height=400,colormap='cool',max_words=N)
    # gist_rainbow
    wordcloud.generate_from_frequencies(terms_freq)
    # wordcloud.to_file(f"/data/{category}.png")
    return wordcloud.to_image()

In [32]:
%pyspark

#check
generate_wordcloud_by_category(df_words,'medicine')

In [33]:
%pyspark
categories = [row[0] for row in df_words.select("category").distinct().collect()]

In [34]:
%pyspark
# for category in categories:
#     plt.figure(figsize=(12,7))
#     generate_wordcloud_by_category(df_words,category=category).show()

In [35]:
%pyspark
for category in categories:
    plt.figure(figsize=(12,7))
    plt.imshow(generate_wordcloud_by_category(df_words,category=category),interpolation="bilinear")
    plt.axis('off')
    plt.title(category)
    plt.show()

# Machine learning
Use columns `summary` and `documents` to predict the `category` target.

The idea is to make a new column as the concatenation of `summary` and `documents` -> clean this column (with `clean_text` function used before) -> tokenizer -> CountVectorizer (Bag of words) -> standard scaler -> classifier. 

As classifier a Bayes model is suitable with BOW approach, but keep in mind that it's a multiclass problem  (check spark MLlib doc for Bayes model).

Alternative:
Use a multilayer perceptron; one idea is to make it inside spark itself (there is this model in MLlib), or one can think about tensorflow, to customize the model.

NB: If tensorflow is used, it will be run only on one node (it should be ok in this case). With tensorflow one could evaluate an Embedding and RNN approach too...

In [38]:

%pyspark
from pyspark.sql.functions import concat_ws

df_classification = spark_df.drop("title").withColumn("text",concat_ws(" ","summary","documents"))

#check
df_classification.show()


clean_text -> tokenizer -> CountVectorizer -> NaiveByes (multinomial)

NB: Naive Bayes classifiers are multiclass by definition

see also: https://scikit-learn.org/stable/modules/naive_bayes.html

In [40]:
%pyspark
from pyspark.sql import Column
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from typing import Union

class CustomTextCleaner(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol: Union[str,Column,None] = None, outputCol: Union[str,Column,None] = None):
        self.inputCol=inputCol
        self.outputCol=outputCol
        
        nltk.download("stopwords")
        self._stopwords_en: list[str] = stopwords.words("english")
        
    def _transform(self, dataset: DataFrame) -> DataFrame:
        #define again custon function clean_text here
        
        @udf
        def clean_text(sentence:str) -> str:
            """
            Compute text cleaning, by removing stopwords, punctuation, only digits words and replce multiple spaces with single ones.
        
            Words with only digits and punctuation are removed too, like doi numbers, example: 10.1080/13501780801913298
            """
        
            for c in string.punctuation:
                sentence = sentence.replace(c, " ")
                
            #stopwords
            sentence = " ".join([word for word in sentence.split() if word not in self._stopwords_en])
            
            #remove only digits words
            sentence = re.sub(r"\b\d+\b","", sentence)
        
            #remove multiple spaces
            sentence = re.sub(r" +"," ",sentence)
        
            return sentence
        return dataset.withColumn(self.outputCol, clean_text(dataset[self.inputCol]))
        
        
        

In [41]:
%pyspark
train, test = df_classification.randomSplit([0.8, 0.2], seed=SEED)

In [42]:
%pyspark
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline, PipelineModel

pipeline: Pipeline = Pipeline(stages = [
    CustomTextCleaner(inputCol = "text", outputCol = "text_clean"),
    Tokenizer(inputCol = "text_clean", outputCol = "tokens"),
    CountVectorizer(inputCol = "tokens", outputCol = "vector", maxDF = 0.95), #words that appear in more than 95% of documents are ignored
    StringIndexer(inputCol = "category", outputCol="category_idx"),
    NaiveBayes(featuresCol = "vector", labelCol = "category_idx", modelType = "multinomial") #Laplace smoothing by default
    ])
    
model: PipelineModel = pipeline.fit(train)

In [43]:
%pyspark
prediction_train = model.transform(train)
prediction_test = model.transform(test)

In [44]:
%pyspark
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="category_idx", predictionCol="prediction", metricName="accuracy")

print(f"Accuracy in train set: {evaluator.evaluate(prediction_train)}")


In [45]:
%pyspark
print(f"Accuracy in test set: {evaluator.evaluate(prediction_test)}")