# **Global Constants**

In [0]:
JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
GDRIVE_DIR = "/content/gdrive"
GDRIVE_HOME_DIR = GDRIVE_DIR + "/My Drive"
GDRIVE_DATA_DIR = GDRIVE_HOME_DIR + "/Teaching/2019-20-BDC/datasets"
DATASET_URL = "https://github.com/gtolomei/big-data-computing/raw/master/datasets/all-the-news-1.csv.bz2"
GDRIVE_DATASET_FILE = GDRIVE_DATA_DIR + "/" + DATASET_URL.split("/")[-1]

RANDOM_SEED = 42 # for reproducibility
MAX_K_CLUSTERS = 20 # max number of clusters (more on this later...)

# **Spark + Google Colab Setup**

## **1.** Install PySpark and related dependencies

In [0]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = JAVA_HOME

## **2.** Import useful Python packages

In [0]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

## **3.** Create Spark context

In [0]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050").set('spark.executor.memory', '4G').set('spark.driver.memory', '45G').set('spark.driver.maxResultSize', '10G')

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## **4.** Create <code>ngrok</code> tunnel to check the Spark UI

In [0]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

## **5.** Link Colab to our Google Drive

In [0]:
# Point Colaboratory to our Google Drive

from google.colab import drive

drive.mount(GDRIVE_DIR, force_remount=True)

## **6.** Check everything is ok

In [0]:
spark

In [0]:
sc._conf.getAll()

# **Data Acquisition**

Download dataset file from URL directly to our Google Drive.

**NOTE:** This is just a sample of the full <code>all-the-news</code> dataset available from [Kaggle](https://www.kaggle.com/snapcrack/all-the-news); more specifically, it is one of the three files which the dataset is composed of (i.e., <code>articles1.csv</code>).

In [0]:
def get_data(dataset_url, dest, chunk_size=1024):
  response = requests.get(dataset_url, stream=True)
  if response.status_code == 200:
    with open(dest, "wb") as file:
      for block in response.iter_content(chunk_size=chunk_size): 
        if block: 
          file.write(block)

In [0]:
print("Retrieving dataset from URL: {} ...".format(DATASET_URL))
get_data(DATASET_URL, GDRIVE_DATASET_FILE)
print("Dataset successfully retrieved and stored at: {}".format(GDRIVE_DATASET_FILE))

### Read dataset file into a Spark Dataframe

In [0]:
news_df = spark.read.load(GDRIVE_DATASET_FILE, 
                         format="csv", 
                         sep=",", 
                         inferSchema="true", 
                         header="true"
                         )

### Check the shape of the loaded dataset, i.e., number of rows and columns

In [0]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(news_df.count(), len(news_df.columns)))

### Print out the schema of the loaded dataset

In [0]:
news_df.printSchema()

### Display the first 5 rows of the dataset

In [0]:
news_df.show(5, truncate=False)

### Count the number of duplicated news (if any)

In [0]:
print("The total number of duplicated news are {:d} out of {:d}".
      format(news_df.count() - news_df.dropDuplicates(['content']).count(), news_df.count()))

### Display the top-10 most duplicated news

In [0]:
news_df.groupby(["content"]).count().sort("count", ascending=False).show(10)

### Remove duplicate news

In [0]:
news_df = news_df.dropDuplicates(["content"])

In [0]:
print("The total number of unique news is: {:d}".format(news_df.count()))

### Check for any missing value (i.e., <code>NULL</code>) along <code>content</code> column

In [0]:
news_df.where(col("content").isNull()).count()
# Alternatively, using filter:
# news_df.filter(news_df.content.isNull()).count()

### Show the corresponding NULL entry/ies

In [0]:
news_df.where(col("content").isNull()).show()

### Remove <code>NULL</code> entry/ies

In [0]:
news_df = news_df.na.drop(subset=["content"])

# **Data Preprocessing**

In this example, we are working with text data and our ultimate goal is to cluster news into groups of coherent "topics" using one of the clustering algorithms we know (e.g., K-means). This is a specific task of a more general area, which is referred to as _natural language processing_ (NLP).

As **preliminary** steps of any NLP task, at least the following pipeline must be executed first:

- Text cleaning:
 - Case normalization (<code>lower</code>) -> convert all text to lower case;
 - Filter out _leading_ and _trailing_ whitespaces (<code>trim</code>);
 - Filter out punctuation symbols (<code>regexp_replace</code>);
 - Filter out any internal extra whitespace resulting from the step above (<code>regexp_replace</code> + <code>trim</code>).
- Tokenization (<code>Tokenizer</code>): splitting raw text into a list of individual _tokens_ (i.e., words), typically using whitespace as delimiter 
- Stopwords removal (<code>StopWordsRemover</code>): removing so-called _stopwords_, namely words that do not contribute to the deeper meaning of the document like "the", "a", "me", etc.
- Stemming (<code>SnowballStemmer</code>): reducing each word to its root or base. For example "fishing", "fished", "fisher" all reduce to the stem "fish".

In [0]:
def clean_text(df, column_name="content"):
    """ 
    This fucntion takes the raw text data and apply a standard NLP preprocessing pipeline consisting of the following steps:
      - Text cleaning
      - Tokenization
      - Stopwords removal
      - Stemming (Snowball stemmer)

    parameter: dataframe
    returns: the input dataframe along with the `cleaned_content` column as the results of the NLP preprocessing pipeline

    """
    from pyspark.sql.functions import udf, col, lower, trim, regexp_replace
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer

    # Text preprocessing pipeline
    print("***** Text Preprocessing Pipeline *****\n")

    # 1. Text cleaning
    print("# 1. Text Cleaning\n")
    # 1.a Case normalization
    print("1.a Case normalization:")
    lower_case_news_df = df.select("id", lower(col(column_name)).alias(column_name))
    lower_case_news_df.show(10)
    # 1.b Trimming
    print("1.b Trimming:")
    trimmed_news_df = lower_case_news_df.select("id", trim(col(column_name)).alias(column_name))
    trimmed_news_df.show(10)
    # 1.c Filter out punctuation symbols
    print("1.c Filter out punctuation:")
    no_punct_news_df = trimmed_news_df.select("id", (regexp_replace(col(column_name), "[^a-zA-Z\\s]", "")).alias(column_name))
    no_punct_news_df.show(10)
    # 1.d Filter out any internal extra whitespace
    print("1.d Filter out extra whitespaces:")
    cleaned_news_df = no_punct_news_df.select("id", trim(regexp_replace(col(column_name), " +", " ")).alias(column_name))
    cleaned_news_df.show(10)

    # 2. Tokenization (split text into tokens)
    print("# 2. Tokenization:")
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    tokens_df = tokenizer.transform(cleaned_news_df).cache()
    tokens_df.show(10)

    # 3. Stopwords removal
    print("# 3. Stopwords removal:")
    stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="terms")
    terms_df = stopwords_remover.transform(tokens_df).cache()
    terms_df.show(10)

    # 4. Stemming (Snowball stemmer)
    print("# 4. Stemming:")
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms"))
    terms_stemmed_df.show(10)
    
    return terms_stemmed_df.cache()

In [0]:
clean_news_df = clean_text(news_df)

# **Feature Engineering**

Machine learning techniques cannot work directly on text data; in fact, words must be first converted into some numerical representation which machine learning algorithms can make use of. This process is often known as _embedding_ or _vectorization_.

In terms of vectorization, it is important to remember that it isn't merely turning a single word into a single number. While words can be transformed into numbers, an entire document can be translated into a vector. Moreover, vectors derived from text data are usually high-dimensional. This is because each dimension of the feature space will correspond to a word, and the language in the documents may have thousands of words.

## TF-IDF
In information retrieval, **tf-idf** - short for term frequency-inverse document frequency - is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus.

The tf-idf value increases proportionally to the number of times a word appears in the document and is offset by the frequency of the word in the corpus, which helps to adjust for the fact that some words appear more frequently in general.

In [0]:
def extract_tfidf_features(df, column_name="terms_stemmed"):
    """ 
    This fucntion takes the text data and converts it into a term frequency-inverse document frequency vector

    parameter: dataframe
    returns: dataframe with tf-idf vectors

    """

    # Importing the feature transformation classes for doing TF-IDF 
    from pyspark.ml.feature import HashingTF, CountVectorizer, IDF
    from pyspark.ml import Pipeline

    ## Creating Term Frequency Vector for each word
    #cv = CountVectorizer(inputCol=column_name, outputCol="tf_features", vocabSize=2000, minDF=5)
    #cvModel = cv.fit(df)
    #tf_features_df = cvModel.transform(df).cache()

    ## Alternatively to CountVectorizer, use HashingTF
    #hashingTF = HashingTF(inputCol=column_name, outputCol="tf_features", numFeatures=2000)
    #tf_features_df = hashingTF.transform(df).cache()

    ## Carrying out Inverse Document Frequency on the TF data
    #idf = IDF(inputCol="tf_features", outputCol="features")
    #idfModel = idf.fit(tf_features_df)
    #tf_idf_features_df = idfModel.transform(tf_features_df).cache()

    # USING PIPELINE
    cv = CountVectorizer(inputCol=column_name, outputCol="tf_features", vocabSize=2000, minDF=10)
    # hashingTF = HashingTF(inputCol=column_name, outputCol="tf_features", numFeatures=2000)
    idf = IDF(inputCol="tf_features", outputCol="features")

    pipeline = Pipeline(stages=[cv, idf]) # replace `cv` with `hashingTF` if needed
    features = pipeline.fit(df)
    tf_idf_features_df = features.transform(df).cache()

    return tf_idf_features_df

In [0]:
tf_idf_df = extract_tfidf_features(clean_news_df)

In [0]:
tf_idf_df.select(col("features")).show(10, truncate=False)

### Check and remove any possible zero-length vector

In [0]:
@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

#### Check if there is any zero-lenght vector



In [0]:
print("Total n. of zero-length vectors: {:d}".
      format(tf_idf_df.where(num_nonzeros("features") == 0).count()))

#### Remove zero-lenght vector(s)


In [0]:
tf_idf_df = tf_idf_df.where(num_nonzeros("features") > 0)

#### Double-check there is no more zero-length vector

In [0]:
print("Total n. of zero-length vectors (after removal): {:d}".
      format(tf_idf_df.where(num_nonzeros("features") == 0).count()))

# **K-means Clustering**

### Function used for running K-means

In [0]:
def k_means(dataset, 
            n_clusters, 
            distance_measure="euclidean", 
            max_iter=20, 
            features_col="features", 
            prediction_col="cluster", 
            random_seed=RANDOM_SEED):
  
  from pyspark.ml.clustering import KMeans

  print("""Training K-means clustering using the following parameters: 
  - K (n. of clusters) = {:d}
  - max_iter (max n. of iterations) = {:d}
  - distance measure = {:s}
  - random seed = {:d}
  """.format(n_clusters, max_iter, distance_measure, random_seed))
  # Train a K-means model
  kmeans = KMeans(featuresCol=features_col, 
                   predictionCol=prediction_col, 
                   k=n_clusters, 
                   initMode="k-means||", 
                   initSteps=5, 
                   tol=0.000001, 
                   maxIter=max_iter, 
                   seed=random_seed, 
                   distanceMeasure=distance_measure)
  model = kmeans.fit(dataset)

  # Make clusters
  clusters_df = model.transform(dataset).cache()

  return model, clusters_df

### Function used to evaluate obtained clusters

In [0]:
def evaluate_k_means(clusters, 
                     metric_name="silhouette", 
                     distance_measure="squaredEuclidean", # cosine
                     prediction_col="cluster"
                     ):
  
  from pyspark.ml.evaluation import ClusteringEvaluator
  
  # Evaluate clustering by computing Silhouette score
  evaluator = ClusteringEvaluator(metricName=metric_name,
                                  distanceMeasure=distance_measure, 
                                  predictionCol=prediction_col
                                  )

  return evaluator.evaluate(clusters)

### Run K-means by calling the function above

In [0]:
model, clusters_df = k_means(tf_idf_df, 10, max_iter=100, distance_measure="euclidean") # Alternatively, distance_measure="cosine"

In [0]:
evaluate_k_means(clusters_df, distance_measure="squaredEuclidean") # Alternatively, distance_measure="cosine"

In [0]:
clusters_df.show(5)

In [0]:
clusters_df.groupBy("cluster").count().sort("cluster").show()

In [0]:
# Get unique values in the grouping column
clusters = sorted([x[0] for x in clusters_df.select("cluster").distinct().collect()])
print("Cluster IDs: [{:s}]".format(", ".join([str(c) for c in clusters])))

# Create a filtered DataFrame for each group in a list comprehension
cluster_list = [clusters_df.where(clusters_df.cluster == x) for x in clusters]

# Show the results
for x_id, x in enumerate(cluster_list):
  print("Showing the first 10 records of cluster ID #{:d}".format(x_id))
  x.select(["cluster", "id", "content"]).show(10, truncate=False)