# Q2.Part2: K-Means clustering using USE Encoding

In [1]:
# import basic spark session and requirements
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from pyspark.sql import SparkSession


# importing ML functionalities
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import matplotlib.pyplot as plt

# importing functions for concatenation of sentences
from pyspark.sql.functions import concat,lit

# importing libraries for performing tokenization and punctuation removal
import csv
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import string


# importing TFIDF
from pyspark.ml.feature import HashingTF, IDF, Tokenizer



# starting spark session
spark = SparkSession \
    .builder \
    .appName("Assignment Q2 Solution") \
    .getOrCreate()

#importing required dataset
data_path = "s3://akum44880/assignmen/train.tsv"
df = spark.read.csv(data_path,header=True,sep='\t').limit(10000)


#selecting only required columns
data_df = df.select(['genre','sentence1','sentence2'])

#concatenating the dataset to form just one column of words
df = data_df.select(['genre',concat(data_df.sentence1, lit(" "), data_df.sentence2).alias('joined')])

# converting df to RDD now
#data_rdd = df.rdd
data_rdd = df.rdd.map(lambda row: str(row[1])).filter(lambda data: data is not None)

# Google Encoder Function

def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    rev_text_list = [text for text in rev_text_partition]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings

data_embedding = data_rdd.mapPartitions(review_embed)


# creating vector of features for K-means clustering

data_embedding_df = spark.createDataFrame(data_embedding.map(lambda v: v.tolist()))
assembler = VectorAssembler(inputCols=data_embedding_df.columns,
    outputCol="features")
data_embedding_vectors = assembler.transform(data_embedding_df).select("features")

data_embedding.unpersist()

#attemtping k-means clustering on the encoded dataset now

kmeans = KMeans(featuresCol='features',k=5)
model=kmeans.fit(data_embedding_vectors)
predictions = model.transform(data_embedding_vectors)
#centers = model.clusterCenters()


from pyspark.sql.functions import monotonically_increasing_id 


indexed_data = data_df.select("genre").withColumn("id", monotonically_increasing_id())
indexed_prediction = predictions.select("prediction").withColumn("id", monotonically_increasing_id())
final_predicted = indexed_data.join(indexed_prediction, "id", "inner").drop("id") 
result = final_predicted.groupBy("genre","prediction").count().orderBy("prediction", "count")


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1589978725637_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

KeyboardInterrupt: 

In [None]:
#intialising labels to store each prediction label with actual value count in order to map them

list_of_labels=[]

#storing the actual genre and count for prediction 0
list_of_labels.append( result.filter(result.prediction==0).orderBy('count',ascending=False).collect())
#storing the actual genre and count for prediction 1
list_of_labels.append( result.filter(result.prediction==1).orderBy('count',ascending=False).collect())
#storing the actual genre and count for prediction 2
list_of_labels.append( result.filter(result.prediction==2).orderBy('count',ascending=False).collect())
#storing the actual genre and count for prediction 3
list_of_labels.append( result.filter(result.prediction==3).orderBy('count',ascending=False).collect())
#storing the actual genre and count for prediction 4
list_of_labels.append( result.filter(result.prediction==4).orderBy('count',ascending=False).collect())

result.unpersist()
final_predicted.cache()

In [None]:
#if this class is already assigned to some other predicted label, we map the next available actual class to the label

def cmax(z):
    for i in z:
        maxg = i['genre']
        if(flag[maxg]==0):
            flag[maxg]=1
            return maxg
        
#dictionary to keep a check whether the actual class has been assigned to a predicted label or not
flag={'fiction':0,'slate':0,'travel':0,'telephone':0,'government':0} 

#Enabler to map the actual class to predicted label
enabler={'fiction':0,'slate':0,'travel':0,'telephone':0,'government':0} 

mapped_clusters=[]
i=0

for l in labels:
    g=checkmax(l)
    mapped_clusters.append(g) #storing all the actual classes in order of their predicted labels
    mapper[g]=i #mapping the actual class to their predicted label
    i=i+1
    
# replace the actual numerical cluster labels with mapped categorical labels
y = final_predicted.withColumn('prediction', final_predicted.prediction.cast('string'))
y = y.na.replace(['0', '1','2','3','4'], mapped_clusters, 'prediction')


In [None]:
# import libraries for visualization and confusion matrix
import sklearn
from sklearn.metrics import confusion_matrix
import seaborn as sns

# create the matrix from a numpy array of the generated results
x = np.array(y.collect())
cf_matrix = confusion_matrix(x[:,1], x[:,0])

# create custom confusion matrix with percentages for our requirement
def my_confusion_matrix(array):
    a = []
    row = array.shape[0]
    column = array.shape[1]
    for i in range(row):
        l = []
        for j in range(column):
            x = (array[i][j]/sum(array[i]))*100
            l.append(round(x,2))
        a.append(l)
    return a

cd = my_confusion_matrix(cf_matrix)

# visualize the final confusion matrix
fig, ax = plt.subplots(figsize=(7,7))
sns.heatmap(cd, annot=True, fmt='.2f', cmap='Blues', xticklabels=mapped_clusters, yticklabels=mapped_clusters)
plt.ylabel('ACTUAL LABELS')
plt.xlabel('PREDICTED LABELS')
plt.show(block=False)

%matplot plt