## Import useful Python packages

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

## Check everything is ok

In [0]:
spark

In [0]:
sc._conf.getAll()

# **Data Acquisition**

**NOTE:** This is the <code>abc-news</code> dataset available from [Kaggle](https://www.kaggle.com/therohk/million-headlines), which contains more tan **1 million** news headlines published over a period of eighteen years by the reputable Australian news source ABC ([Australian Broadcasting Corporation](http://www.abc.net.au)).

### Download the dataset to the local driver node's ```/tmp``` folder using ```wget```

In [0]:
%sh wget -P /tmp https://github.com/gtolomei/big-data-computing/raw/master/datasets/abc-news.csv.bz2

In [0]:
%fs ls file:/tmp/

### Move the file from local driver node's file system to DBFS

In [0]:
dbutils.fs.mv("file:/tmp/abc-news.csv.bz2", "dbfs:/bdc-2020-21/datasets/abc-news.csv.bz2")

In [0]:
%fs ls /bdc-2020-21/datasets/

### Read dataset file into a Spark Dataframe

In [0]:
news_df = spark.read.load("dbfs:/bdc-2020-21/datasets/abc-news.csv.bz2", 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="true"
                         )

### Check the shape of the loaded dataset, i.e., number of rows and columns

In [0]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(news_df.count(), len(news_df.columns)))

### Print out the schema of the loaded dataset

In [0]:
news_df.printSchema()

### Display the first 5 rows of the dataset

In [0]:
news_df.show(5, truncate=False)

### Count the number of duplicated news (if any)

In [0]:
print("The total number of duplicated news are {:d} out of {:d}".
      format(news_df.count() - news_df.dropDuplicates(['headline_text']).count(), news_df.count()))

### Display the top-10 most duplicated news

In [0]:
news_df.groupby(["headline_text"]).count().sort("count", ascending=False).show(10)

### Remove duplicate news

In [0]:
news_df = news_df.dropDuplicates(["headline_text"])

In [0]:
print("The total number of unique news is: {:d}".format(news_df.count()))

### Check for any missing value (i.e., <code>NULL</code>) along <code>headline_text</code> column

In [0]:
news_df.where(col("headline_text").isNull()).count()
# Alternatively, using filter:
# news_df.filter(news_df.headline_text.isNull()).count()

### Show the corresponding NULL entry/ies

In [0]:
news_df.where(col("headline_text").isNull()).show()

# **Data Preprocessing**

In this example, we are working with text data and our ultimate goal is to cluster news into groups of coherent "topics" using one of the clustering algorithms we know (e.g., K-means). This is a specific task of a more general area, which is referred to as _natural language processing_ (NLP).

As **preliminary** steps of any NLP task, at least the following pipeline must be executed first:

- Text cleaning:
 - Case normalization (<code>lower</code>) -> convert all text to lower case;
 - Filter out _leading_ and _trailing_ whitespaces (<code>trim</code>);
 - Filter out punctuation symbols (<code>regexp_replace</code>);
 - Filter out any internal extra whitespace resulting from the step above (<code>regexp_replace</code> + <code>trim</code>).
- Tokenization (<code>Tokenizer</code>): splitting raw text into a list of individual _tokens_ (i.e., words), typically using whitespace as delimiter 
- Stopwords removal (<code>StopWordsRemover</code>): removing so-called _stopwords_, namely words that do not contribute to the deeper meaning of the document like "the", "a", "me", etc.
- Stemming (<code>SnowballStemmer</code>): reducing each word to its root or base. For example "fishing", "fished", "fisher" all reduce to the stem "fish".

In [0]:
def clean_text(df, column_name="headline_text"):
    """ 
    This function takes the raw text data and applies a standard NLP preprocessing pipeline consisting of the following steps:
      - Text cleaning
      - Tokenization
      - Stopwords removal
      - Stemming (Snowball stemmer)

    parameter: dataframe
    returns: the input dataframe along with the `cleaned_content` column as the results of the NLP preprocessing pipeline

    """
    from pyspark.sql.functions import udf, col, lower, trim, regexp_replace
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer # BE SURE NLTK IS INSTALLED ON THE CLUSTER USING THE "LIBRARIES" TAB IN THE MENU

    # Text preprocessing pipeline
    print("***** Text Preprocessing Pipeline *****\n")

    # 1. Text cleaning
    print("# 1. Text Cleaning\n")
    # 1.a Case normalization
    print("1.a Case normalization:")
    lower_case_news_df = df.select(lower(col(column_name)).alias(column_name))
    lower_case_news_df.show(10)
    # 1.b Trimming
    print("1.b Trimming:")
    trimmed_news_df = lower_case_news_df.select(trim(col(column_name)).alias(column_name))
    trimmed_news_df.show(10)
    # 1.c Filter out punctuation symbols
    print("1.c Filter out punctuation:")
    no_punct_news_df = trimmed_news_df.select((regexp_replace(col(column_name), "[^a-zA-Z\\s]", "")).alias(column_name))
    no_punct_news_df.show(10)
    # 1.d Filter out any internal extra whitespace
    print("1.d Filter out extra whitespaces:")
    cleaned_news_df = no_punct_news_df.select(trim(regexp_replace(col(column_name), " +", " ")).alias(column_name))
    cleaned_news_df.show(10)

    # 2. Tokenization (split text into tokens)
    print("# 2. Tokenization:")
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    tokens_df = tokenizer.transform(cleaned_news_df).cache()
    tokens_df.show(10)

    # 3. Stopwords removal
    print("# 3. Stopwords removal:")
    stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="terms")
    terms_df = stopwords_remover.transform(tokens_df).cache()
    terms_df.show(10)

    # 4. Stemming (Snowball stemmer)
    print("# 4. Stemming:")
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms")).cache()
    terms_stemmed_df.show(10)
    
    return terms_stemmed_df

In [0]:
clean_news_df = clean_text(news_df)

# **Feature Engineering**

Machine learning techniques cannot work directly on text data; in fact, words must be first converted into some numerical representation that machine learning algorithms can make use of. This process is often known as _vectorization_.

In terms of vectorization, it is important to remember that it isn't merely turning a single word into a single number. While words can be transformed into numbers, an entire document can be translated into a vector. Moreover, vectors derived from text data are usually high-dimensional. This is because each dimension of the feature space will correspond to a word, and the language in the documents may have thousands of words.

## TF-IDF
In information retrieval, **tf-idf** - short for term frequency-inverse document frequency - is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus.

The tf-idf value increases proportionally to the number of times a word appears in the document and is offset by the frequency of the word in the corpus, which helps to adjust for the fact that some words appear more frequently in general.

In [0]:
RANDOM_SEED = 42 # used below to run the actual K-means clustering
VOCAB_SIZE = 1000 # number of words to be retained as vocabulary
MIN_DOC_FREQ = 10 # minimum number of documents a word has to appear in to be included in the vocabulary
N_GRAMS = 2 # number of n-grams (if needed)
N_FEATURES = 200 # default embedding vector size (if HashingTF or, later, Word2Vec are used)

In [0]:
def extract_tfidf_features(df, column_name="terms_stemmed"):
    """ 
    This fucntion takes the text data and converts it into a term frequency-inverse document frequency vector

    parameter: dataframe
    returns: dataframe with tf-idf vectors

    """

    # Importing the feature transformation classes for doing TF-IDF 
    from pyspark.ml.feature import HashingTF, CountVectorizer, IDF, NGram
    from pyspark.ml import Pipeline

    ## Extracting n-grams from text
    #ngrams = NGram(n=N_GRAMS, inputCol=column_name, outputCol="ngrams")
    #ngrams.transform(df)
    
    ## Creating Term Frequency Vector for each word
    #cv = CountVectorizer(inputCol=column_name, outputCol="tf_features", vocabSize=VOCAB_SIZE, minDF=MIN_DOC_FREQ)
    #cv_model = cv.fit(df)
    #tf_features_df = cv_model.transform(df).cache()

    ## Alternatively to CountVectorizer, use HashingTF
    #hashing_TF = HashingTF(inputCol=column_name, outputCol="tf_features", numFeatures=N_FEATURES)
    #tf_features_df = hashing_TF.transform(df).cache()

    ## Carrying out Inverse Document Frequency on the TF data
    #idf = IDF(inputCol="tf_features", outputCol="features")
    #idf_model = idf.fit(tf_features_df)
    #tf_idf_features_df = idf_model.transform(tf_features_df).cache()

    # USING PIPELINE
    #ngrams = NGram(n=N_GRAMS, inputCol=column_name, outputCol="ngrams")
    cv = CountVectorizer(inputCol=column_name, outputCol="tf_features", vocabSize=VOCAB_SIZE, minDF=MIN_DOC_FREQ)
    # hashingTF = HashingTF(inputCol=column_name, outputCol="tf_features", numFeatures=N_FEATURES)
    idf = IDF(inputCol="tf_features", outputCol="features")

    pipeline = Pipeline(stages=[cv, idf]) # add `ngrams` and replace `cv` with `hashingTF`, if needed
    features = pipeline.fit(df)
    tf_idf_features_df = features.transform(df).cache()

    return tf_idf_features_df

In [0]:
features = extract_tfidf_features(clean_news_df)

In [0]:
features.select(col("features")).show(10, truncate=False)

In [0]:
clean_news_df.show(5, truncate=False)

### Check and remove any possible zero-length vector

In [0]:
@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

#### Check if there is any zero-length vector

In [0]:
print("Total n. of zero-length vectors: {:d}".
      format(features.where(num_nonzeros("features") == 0).count()))

#### Remove zero-length vector(s)

In [0]:
features = features.where(num_nonzeros("features") > 0)

#### Double-check there is no more zero-length vector

In [0]:
print("Total n. of zero-length vectors (after removal): {:d}".
      format(features.where(num_nonzeros("features") == 0).count()))

# **K-means Clustering**

In [0]:
N_CLUSTERS = 10 # number of output clusters (K)
DISTANCE_MEASURE = "euclidean" # alternatively, "cosine"
MAX_ITERATIONS = 100 # maximum number of iterations of K-means EM algorithm
TOLERANCE = 0.000001 # tolerance between consecutive centroid updates (i.e., another stopping criterion)

### Function used for running K-means

In [0]:
def k_means(dataset, 
            n_clusters, 
            distance_measure=DISTANCE_MEASURE, 
            max_iter=MAX_ITERATIONS, 
            tol=TOLERANCE,
            features_col="features", 
            prediction_col="cluster", 
            random_seed=RANDOM_SEED):
  
  from pyspark.ml.clustering import KMeans

  print("""Training K-means clustering using the following parameters: 
  - K (n. of clusters) = {:d}
  - max_iter (max n. of iterations) = {:d}
  - distance measure = {:s}
  - random seed = {:d}
  """.format(n_clusters, max_iter, distance_measure, random_seed))
  # Train a K-means model
  kmeans = KMeans(featuresCol=features_col, 
                   predictionCol=prediction_col, 
                   k=n_clusters, 
                   initMode="k-means||", 
                   initSteps=5, 
                   tol=tol, 
                   maxIter=max_iter, 
                   seed=random_seed, 
                   distanceMeasure=distance_measure)
  model = kmeans.fit(dataset)

  # Make clusters
  clusters_df = model.transform(dataset).cache()

  return model, clusters_df

### Run K-means by calling the function above

In [0]:
model, clusters_df = k_means(features, N_CLUSTERS, max_iter=MAX_ITERATIONS, distance_measure=DISTANCE_MEASURE)

### Function used to evaluate obtained clusters (Silhouette Coefficient)

In [0]:
def evaluate_k_means(clusters, 
                     metric_name="silhouette", 
                     distance_measure="squaredEuclidean", # cosine
                     prediction_col="cluster"
                     ):
  
  from pyspark.ml.evaluation import ClusteringEvaluator
  
  # Evaluate clustering by computing Silhouette score
  evaluator = ClusteringEvaluator(metricName=metric_name,
                                  distanceMeasure=distance_measure, 
                                  predictionCol=prediction_col
                                  )

  return evaluator.evaluate(clusters)

### Evaluate clustering (Silhouette Coefficient)

We can evaluate the clustering we just run using K=10 clusters

In [0]:
evaluate_k_means(clusters_df, distance_measure="squaredEuclidean")

In [0]:
clusters_df.show(5)

In [0]:
clusters_df.groupBy("cluster").count().sort("cluster").show()

In [0]:
# Get unique values in the grouping column
clusters = sorted([x[0] for x in clusters_df.select("cluster").distinct().collect()])
print("Cluster IDs: [{:s}]".format(", ".join([str(c) for c in clusters])))

# Create a filtered DataFrame for each group in a list comprehension
cluster_list = [clusters_df.where(clusters_df.cluster == x) for x in clusters]

# Show the results (first 5 cluters)
for x_id, x in enumerate(cluster_list):
  print("Showing the first 10 records of cluster ID #{:d}".format(x_id))
  x.select(["cluster", "headline_text"]).show(10, truncate=True)

# **Feature Learning**

Instead of using TF-IDF, we can _learn_ how to extract features using [Word2Vec](https://code.google.com/p/word2vec/).

Word2Vec computes distributed vector representation of words. The main advantage of the distributed representations is that similar words are close in the vector space, which makes generalization to novel patterns easier and model estimation more robust. Distributed vector representation is showed to be useful in many natural language processing applications such as named entity recognition, disambiguation, parsing, tagging and machine translation.

In [0]:
EMBEDDING_SIZE = 150 # size of embedding Word2Vec vectors

In [0]:
def extract_w2v_features(df, column_name="terms"):
  from pyspark.ml.feature import Word2Vec
  
  word2vec = Word2Vec(vectorSize=EMBEDDING_SIZE, minCount=5, inputCol=column_name, outputCol="features", seed=RANDOM_SEED)
  model = word2vec.fit(df)
  features = model.transform(df).cache()
  
  return model, features

In [0]:
model, w2v_features = extract_w2v_features(clean_news_df)

In [0]:
w2v_features.show(truncate=False)

In [0]:
vecs = model.getVectors()
syms = model.findSynonyms("doctor", 2)
syms.show()

### Find the "best" value of K

We can use the "elbow method" to find the best value of K.

In [0]:
K_MIN = 2
K_MAX = 18
STEP = 2

In [0]:
def elbow_method(data, k_min=K_MIN, k_max=K_MAX, step=STEP, max_iter=MAX_ITERATIONS, distance_measure=DISTANCE_MEASURE):
  results = []
  for k in range(k_min, k_max, step):
    model, clusters_df = k_means(data, k, max_iter=max_iter, distance_measure=distance_measure)
    results.append([k, model.summary.trainingCost])

  return pd.DataFrame(results, columns = ['K', 'SSE'])

In [0]:
# Get results from elbow method
elbow_results = elbow_method(w2v_features)

### Plot the results of "elbow method"

In [0]:
from matplotlib.ticker import MaxNLocator

def plot_elbow(results_df):
  fig, ax = plt.subplots(1,1, figsize =(8,6))
  _ = sns.lineplot(data=results_df, x="K", y="SSE", ax=ax)
  ax.set_xlabel('K (#clusters)')
  ax.set_ylabel('SSE')
  ax.xaxis.set_major_locator(MaxNLocator(integer=True))
  plt.show()

In [0]:
plot_elbow(elbow_results)