In [1]:
import tensorflow as tf
import tensorflow_hub as hub

import numpy as np

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1589977629721_0003,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def extractSentence(record):
    try:
        split_record = record.strip().split('\t')
        genre = split_record[3]
        sentences = []
        if (len(split_record[8])>0):
            sentences.append(split_record[8])
        if (len(split_record[9])>0):
            sentences.append(split_record[9])
        if(len(sentences)>0):
            return [(genre,sentence) for sentence in sentences]
        else:
            return ()
    except:
        return()

def google_embed(text_partition):
    embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
    record = [row for row in text_partition] #(genre,sentence)
    genre = [row[0] for row in record]
    sentence = [row[1] for row in record]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        sentence_embeddings = session.run(embed(sentence))
    return [[genre[i]] + sentence_embeddings[i].tolist() for i in range(len(record))]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Load data and remove the header
train = sc.textFile('s3://barnettgao/Assignment/train.tsv', 16)
header = train.first()
data_noheader = train.filter(lambda x: x != header)
sentence_genre = data_noheader.flatMap(extractSentence).cache()
sentence_genre.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

785404

In [4]:
# Encode the sentence
sentence_embedding = sentence_genre.mapPartitions(google_embed).repartition(16).cache()
genre_sentence_df = spark.createDataFrame(sentence_embedding, schema = ['genre'])
genre_sentence_df.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+---------------

In [5]:
# Project data point in a five dimensional space
assembler = VectorAssembler(inputCols=genre_sentence_df.columns[1:513], outputCol='features')
genre_features_vectors = assembler.transform(genre_sentence_df).select(['genre','features'])
genre_features_vectors.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|     genre|            features|
+----------+--------------------+
|government|[-0.0131281083449...|
+----------+--------------------+
only showing top 1 row

In [6]:
# Inspect the PCA projected vectors
pca = PCA(k=5, inputCol='features', outputCol='pca')
model = pca.fit(genre_features_vectors)
pca_result = model.transform(genre_features_vectors).select('genre','pca')
pca_result.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|     genre|                 pca|
+----------+--------------------+
|government|[0.04403965246054...|
+----------+--------------------+
only showing top 1 row

In [7]:
# Kmeans
kmeans = KMeans(featuresCol='pca',k=5).setSeed(5)
model=kmeans.fit(pca_result)
predictions = model.transform(pca_result)
prediction_genre = predictions.select("prediction","genre").cache()
prediction_genre.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|prediction|     genre|
+----------+----------+
|         0|government|
+----------+----------+
only showing top 1 row

In [8]:
# Process prediction data
prediction_count = prediction_genre.groupBy("genre","prediction").count().orderBy("genre","prediction")
prediction_list = [(row['prediction']) for row in prediction_count.collect()]
genre_list = [(row['genre']) for row in prediction_count.collect()]
count_list = [(row['count']) for row in prediction_count.collect()]
genre_prediction_count = [] 

for i in range(len(prediction_list)):
    temp = (genre_list[i],prediction_list[i],count_list[i])
    genre_prediction_count.append(temp)

print(genre_prediction_count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('fiction', 0, 8453), ('fiction', 1, 80290), ('fiction', 2, 34176), ('fiction', 3, 8785), ('fiction', 4, 22992), ('government', 0, 128287), ('government', 1, 6664), ('government', 2, 3828), ('government', 3, 4508), ('government', 4, 11413), ('slate', 0, 39994), ('slate', 1, 24711), ('slate', 2, 20477), ('slate', 3, 11058), ('slate', 4, 58372), ('telephone', 0, 19027), ('telephone', 1, 15545), ('telephone', 2, 117507), ('telephone', 3, 5449), ('telephone', 4, 9168), ('travel', 0, 14658), ('travel', 1, 9075), ('travel', 2, 5151), ('travel', 3, 117356), ('travel', 4, 8460)]

In [11]:
# Calculate the percentage 

fiction_sum= 0
governement_sum = 0
slate_sum = 0
tele_sum= 0
travel_sum= 0

for i in range(len(genre_prediction_count)):
    genre = genre_prediction_count[i][0]
    count = genre_prediction_count[i][2]
    if(genre == 'fiction'):
        fiction_sum +=count
    if(genre == 'government'):
        governement_sum +=count
    if(genre == 'slate'):
        slate_sum +=count
    if(genre == 'telephone'):
        tele_sum +=count
    if(genre == 'travel'):
        travel_sum +=count
        
genre_prediction_percent = []
for i in range(len(genre_prediction_count)):
    genre = genre_prediction_count[i][0]
    if(genre == 'fiction'):
        percentage = round(genre_prediction_count[i][2]/fiction_sum*100,2)
        genre_prediction_percent.append((genre_prediction_count[i][0],genre_prediction_count[i][1],percentage))
    if(genre == 'government'):
        percentage = round(genre_prediction_count[i][2]/governement_sum*100,2)
        genre_prediction_percent.append((genre_prediction_count[i][0],genre_prediction_count[i][1],percentage))
    if(genre == 'slate'):
        percentage = round(genre_prediction_count[i][2]/slate_sum*100,2)
        genre_prediction_percent.append((genre_prediction_count[i][0],genre_prediction_count[i][1],percentage))
    if(genre == 'telephone'):
        percentage = round(genre_prediction_count[i][2]/tele_sum*100,2)
        genre_prediction_percent.append((genre_prediction_count[i][0],genre_prediction_count[i][1],percentage))
    if(genre == 'travel'):
        percentage = round(genre_prediction_count[i][2]/travel_sum*100,2)
        genre_prediction_percent.append((genre_prediction_count[i][0],genre_prediction_count[i][1],percentage))
        
print(genre_prediction_percent)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('fiction', 0, 5.46), ('fiction', 1, 51.9), ('fiction', 2, 22.09), ('fiction', 3, 5.68), ('fiction', 4, 14.86), ('government', 0, 82.93), ('government', 1, 4.31), ('government', 2, 2.47), ('government', 3, 2.91), ('government', 4, 7.38), ('slate', 0, 25.87), ('slate', 1, 15.98), ('slate', 2, 13.24), ('slate', 3, 7.15), ('slate', 4, 37.75), ('telephone', 0, 11.41), ('telephone', 1, 9.33), ('telephone', 2, 70.49), ('telephone', 3, 3.27), ('telephone', 4, 5.5), ('travel', 0, 9.48), ('travel', 1, 5.87), ('travel', 2, 3.33), ('travel', 3, 75.86), ('travel', 4, 5.47)]

In [12]:
# Print the matrix
matrix = np.array([[0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0]],dtype=object)
for i in range(0,5):
    matrix[0][i+1] = genre_list[i] #set x-axis true lable
    matrix[i+1][0] = prediction_list[i*5] #set y-axis predict lable
    
# match the records to table
iterate = 0
for i in range (1,6):
    for j in range(1,6):
        matrix[i][j] = str(genre_prediction_percent[(i-1)*5+j-1][2])+"%"
    iterate +=1
for i in range (1,6):
    x = matrix[1][i]
    
print(matrix)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[0 'fiction' 'fiction' 'fiction' 'fiction' 'fiction']
 [0 '5.46%' '51.9%' '22.09%' '5.68%' '14.86%']
 [0 '82.93%' '4.31%' '2.47%' '2.91%' '7.38%']
 [0 '25.87%' '15.98%' '13.24%' '7.15%' '37.75%']
 [0 '11.41%' '9.33%' '70.49%' '3.27%' '5.5%']
 [0 '9.48%' '5.87%' '3.33%' '75.86%' '5.47%']]