In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark_conf = SparkConf()\
        .setAppName("Cloud Computing Assignment")

sc=SparkContext.getOrCreate(spark_conf) 

ssc = StreamingContext(sc, 3)

#You can change the input path pointing to your own HDFS
#If spark is able to read hadoop configuration, you can use relative path

spark = SparkSession \
    .builder \
    .appName("Cloud Computing Assignment") \
    .getOrCreate()

test_matched = 's3://comp5349-auda0496/assignment/test_matched.tsv'
test_mismatched = 's3://comp5349-auda0496/assignment/test_mismatched.tsv'
dev_matched = 's3://comp5349-auda0496/assignment/dev_matched.tsv'
dev_mismatched = 's3://comp5349-auda0496/assignment/dev_mismatched.tsv'
stop_words_file = 's3://comp5349-auda0496/assignment/stop_words.txt'
stop_words = sc.textFile(stop_words_file).map(lambda word: str(word)).collect()
trainFile = 's3://comp5349-auda0496/assignment/train.tsv'


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
21,application_1589937331213_0022,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import csv
import nltk

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

def extractSentence(record):    
    row = record.split('\t')

    if len(row) < 10 : 
        return []    
    else: #normal record
        firstSentence = row[8]
        secondSentence = row[9]    
        return [firstSentence]+[secondSentence]

def extractWord(record):
    words = word_tokenize(record)
    extractedWords = [(word.lower(),1) for word in words if word.isalpha()]
    return extractedWords

def tagRow(row, tag):    
    return (row[0], tag)

def getWordsTagged(devFile, testFile, matchType):
    devData = sc.textFile(devFile)
    devHeader = devData.first() #get headers from first row
    devData = devData.filter(lambda row : row != devHeader) #get data from other rows

    testData = sc.textFile(testFile)
    testHeader = testData.first() #get headers from first row
    testData = testData.filter(lambda row : row != testHeader) #get data from other rows

    data = devData.union(testData)

    sentence = data.flatMap(extractSentence)
    words = sentence.flatMap(extractWord).distinct() #get unique mismatched words
    wordsTagged = words.map(lambda row: tagRow(row,matchType))
    return wordsTagged

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
mismatchedWordsTagged = getWordsTagged(dev_mismatched,test_mismatched, "mis")
matchedWordsTagged = getWordsTagged(dev_matched,test_matched, "mat")
wordsTagged = matchedWordsTagged.fullOuterJoin(mismatchedWordsTagged)
commonWords = wordsTagged.filter(lambda x: x[1][0]!=None and x[1][1]!=None)
mismatchedUniqueWords = wordsTagged.filter(lambda x: x[1][0]==None and x[1][1]!=None)
matchedUniqueWords = wordsTagged.filter(lambda x: x[1][0]!=None and x[1][1]==None)

print("Number of common words: " +str(commonWords.count())) 
print("Number of unique matched words: " +str(matchedUniqueWords.count())) 
print("Number of unique mismatched words: " +str(mismatchedUniqueWords.count())) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of common words: 9079
Number of unique matched words: 8970
Number of unique mismatched words: 6414

In [4]:
def extractSentenceWithGenre(record):    
    row = record.split('\t')
    if len(row) < 10 :
        return []
    genre = row[3]
    firstSentence = row[8]
    secondSentence = row[9]
    return [(genre,firstSentence)]+[(genre,secondSentence)]
    
def extractWordWithGenre(sentenceGenrePair, isStopwordsRemoved):    
    genre = sentenceGenrePair[0]
    sentence = sentenceGenrePair[1]
    wordsRawData = word_tokenize(sentence)
    words = [word.lower() for word in wordsRawData if word.isalpha()] 
    if isStopwordsRemoved:
        words = [word for word in words if not word in stop_words] 
    return [(word, genre) for word in words]

def mergeGenreWords(accumulatedPair, currentGenreWord):
    ratingTotal, ratingCount = accumulatedPair
    ratingTotal += currentGenreWord
    ratingCount = ratingCount + 1
    return (ratingTotal, ratingCount)

def mergeCombiners(accumulatedPair1, accumulatedPair2):
    ratingTotal1, ratingCount1 = accumulatedPair1
    ratingTotal2, ratingCount2 = accumulatedPair2
    return (ratingTotal1+ratingTotal2, ratingCount1+ratingCount2)

def mapWordGenreCount(wordGenrePair):
    return (wordGenrePair[0], wordGenrePair[1][1])

def wordGenrePercentageCount(isStopwordsRemoved, trainSentence):
    trainWords = trainSentence.flatMap(lambda x: extractWordWithGenre(x, isStopwordsRemoved)).distinct()
    wordsCount = trainWords.aggregateByKey(([],0), mergeGenreWords, mergeCombiners, 1).map(mapWordGenreCount)
    count = []
    
    for genre in range(1,6):
        count.append(wordsCount.filter(lambda x: x[1] == genre).count())
    print("without stopwords in:" if isStopwordsRemoved else "with stopwords in:")
    totalWordCount = 0
    genreCount = 0
    for i in count:
        totalWordCount = totalWordCount + i
    for i in count:
        genreCount = genreCount + 1
        print(str(genreCount) + " genre : " +str(i/totalWordCount*100))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
trainData = sc.textFile(trainFile)
trainHeader = trainData.first() #extract header
trainData = trainData.filter(lambda row : row != trainHeader)   #filter out header
trainSentence = trainData.flatMap(extractSentenceWithGenre)

print("Percentage of words appearing")
wordGenrePercentageCount(True, trainSentence) #Stopwords Removed
wordGenrePercentageCount(False, trainSentence) #Stopwords Added

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Percentage of words appearing
without stopwords in:
1 genre : 56.29497220244497
2 genre : 15.351521669974098
3 genre : 9.50873312832321
4 genre : 7.654552891097207
5 genre : 11.190220108160513
with stopwords in:
1 genre : 56.1781109717205
2 genre : 15.32171520986684
3 genre : 9.493508260153263
4 genre : 7.649521621499071
5 genre : 11.357143936760327

In [6]:
for (id, rdd) in sc._jsc.getPersistentRDDs().items(): rdd.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from pyspark.sql import SparkSession
from pyspark.mllib.clustering import KMeans, KMeansModel #https://spark.apache.org/docs/2.2.0/mllib-clustering.html
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.feature import HashingTF, IDF

def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    rev_text_list = [text for text in rev_text_partition]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings

def sequencer(accumulatedPair, currentItem):
    return accumulatedlist + [currentItem]

def combiner(accumulatedPair1, accumulatedPair2):
    return accumulatedPair1 + accumulatedPair2

def recurringCheck(x, clusterMapped):
    for i in clusterMapped:
        if x == i[1]:
            return i[0]
        
def kMeanCluster(embeddedWords, trainSentence):
    clusters = KMeans.train(embeddedWords, 5, maxIterations= 1)

    prediction = clusters.predict(embeddedWords)
    result = prediction.zip(trainSentence).map(lambda x:(x[0],x[1][0]))

    sum = (lambda x, y : x + y)

    aCount = result.map(lambda x : (x, 1)).aggregateByKey(0, sum, sum, 1)

    accuracy = sorted(aCount.collect(), key=lambda x: - x[1])

    clusterName = []
    genre = []

    for i in accuracy:
        if i[0][0] in clusterName or i[0][1] in genre:
            continue
        clusterName = clusterName + [i[0][0]]
        genre = genre + [i[0][1]]

    clusterZip = zip(cluster_name, genre_name)
    clusterMapped = set(clusterZip)
    
    aCount.aggregateByKey
    
    KMeanPredictions = aCount.map(lambda x: (x[0][0], (recurringCheck(x[0][1],clusterMapped), x[1])))
    result = KMeanPredictions.aggregateByKey([], sequencer, combiner, 1).map(lambda x: (x[0], dict(x[1])))
    return result

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
train_file = 's3://comp5349-auda0496/assignment/train.tsv'

trainData = sc.textFile(trainFile)
trainHeader = trainData.first() #extract header
trainData = trainData.filter(lambda row : row != trainHeader)   #filter out header
trainSentence = trainData.flatMap(extractSentenceWithGenre)

cleanText = trainSentence.map(lambda row: str(row[1])).filter(lambda data: data is not None).cache()
reviewEmbedding = cleanText.mapPartitions(review_embed).cache()
trainDataUSE = reviewEmbedding.map(lambda x: [float(a) for a in x.tolist()])

trainData.unpersist()
cleanText.unpersist()
reviewEmbedding.unpersist()

accuracyKmeansUSE = kMeanCluster(trainDataUSE, trainSentence)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
trainDataUSE.unpersist()
#Kmeans_USE_prediction.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PythonRDD[107] at RDD at PythonRDD.scala:53

In [10]:
for (id, rdd) in sc._jsc.getPersistentRDDs().items(): rdd.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
train_file = 's3://comp5349-auda0496/assignment/train.tsv'

trainData = sc.textFile(trainFile)
trainHeader = trainData.first() #extract header
trainData = trainData.filter(lambda row : row != trainHeader)   #filter out header
trainSentence = trainData.flatMap(extractSentenceWithGenre)

cleanText = trainSentence.map(lambda row: str(row[1])).filter(lambda data: data is not None).cache()

hashingTF = HashingTF()
tf = hashingTF.transform(cleanText)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

accuracyKmeansTFIDF = kMeanCluster(tfidf, trainSentence)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
accuracyKmeansUSE.take(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(1, {1: 134997, 3: 58651, 2: 12423, 4: 96431, 0: 10820}), (3, {1: 1122, 2: 47149, 0: 3122, 4: 9401, 3: 15626}), (0, {0: 126663, 3: 13756, 1: 11015, 2: 2599, 4: 8639}), (2, {0: 3567, 2: 78221, 3: 51152, 1: 3997, 4: 12449}), (4, {2: 14308, 1: 3565, 4: 39776, 0: 10528, 3: 15427})]

In [25]:
accuracyKmeansTFIDF.take(100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(1, {3: 90936, 4: 104378, None: 123934, 1: 135117, 0: 86914}), (2, {1: 9, 4: 12, 0: 163}), (0, {None: 42091, 1: 15568, 3: 45047, 0: 51843, 4: 39725}), (4, {None: 54, 4: 136, 3: 1670, 1: 3305, 0: 44}), (3, {4: 10361, 0: 15736, 1: 697, 3: 17047, None: 617})]

In [20]:
kmeansDf = spark.createDataFrame(accuracyKmeansUSE).toDF("predicted_value", "count")

kmeansMatrix = kmeansDf.withColumn("0", kmeansDf["count"][0])\
.withColumn("1", kmeansDf["count"][1])\
.withColumn("2", kmeansDf["count"][2])\
.withColumn("3", kmeansDf["count"][3])\
.withColumn("4", kmeansDf["count"][4])\
.drop("count").sort("predicted_value")



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
kmeansMatrix.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------+------+-----+-----+-----+
|predicted_value|     0|     1|    2|    3|    4|
+---------------+------+------+-----+-----+-----+
|              0|126663| 11015| 2599|13756| 8639|
|              1| 10820|134997|12423|58651|96431|
|              2|  3567|  3997|78221|51152|12449|
|              3|  3122|  1122|47149|15626| 9401|
|              4| 10528|  3565|14308|15427|39776|
+---------------+------+------+-----+-----+-----+

In [22]:
kmeansAddMatrix = kmeansMatrix\
.withColumn("total", kmeansMatrix["0"]+kmeansMatrix["1"]+kmeansMatrix["2"]+kmeansMatrix["3"]+kmeansMatrix["4"])\

kmeansAddMatrix\
.withColumn("0_norm", (kmeansAddMatrix["0"]*100/kmeansAddMatrix["total"]))\
.withColumn("1_norm", (kmeansAddMatrix["1"]*100/kmeansAddMatrix["total"]))\
.withColumn("2_norm", (kmeansAddMatrix["2"]*100/kmeansAddMatrix["total"]))\
.withColumn("3_norm", (kmeansAddMatrix["3"]*100/kmeansAddMatrix["total"]))\
.withColumn("4_norm", (kmeansAddMatrix["4"]*100/kmeansAddMatrix["total"]))\
.drop("0","1","2","3","4","total").sort("predicted_value").show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------------------+------------------+------------------+------------------+------------------+
|predicted_value|            0_norm|            1_norm|            2_norm|            3_norm|            4_norm|
+---------------+------------------+------------------+------------------+------------------+------------------+
|              0| 77.86404544113307| 6.771294383790695|1.5976935182453034| 8.456280121963214|  5.31068653486771|
|              1|3.4533163965505134| 43.08570735537243|3.9649306464276366| 18.71908132847358| 30.77696427317584|
|              2| 2.387773954721326| 2.675618866560454| 52.36166709062429| 34.24149518696531|  8.33344490112862|
|              3| 4.085317979586495|1.4682020413504318| 61.69719968594609|20.447526825438366|12.301753467678617|
|              4|12.592698913927563| 4.264150040667911| 17.11401368355581| 18.45246638916798| 47.57667097268073|
+---------------+------------------+------------------+------------------+------------------+---

In [27]:
kmeansDf = spark.createDataFrame(accuracyKmeansTFIDF).toDF("predicted_value", "count")

kmeansMatrix = kmeansDf.withColumn("0", kmeansDf["label_count"][0])\
.withColumn("1", kmeansDf["count"][1])\
.withColumn("2", kmeansDf["count"][2])\
.withColumn("3", kmeansDf["count"][3])\
.withColumn("4", kmeansDf["count"][4])\
.drop("count").sort("predicted_value")

kmeansMatrix.show()

kmeansAddMatrix = kmeansMatrix\
.withColumn("total", kmeansMatrix["0"]+kmeansMatrix["1"]+kmeansMatrix["2"]+kmeansMatrix["3"]+kmeansMatrix["4"])\

kmeansAddMatrix\
.withColumn("0_norm", (kmeansAddMatrix["0"]*100/kmeansAddMatrix["total"]))\
.withColumn("1_norm", (kmeansAddMatrix["1"]*100/kmeansAddMatrix["total"]))\
.withColumn("2_norm", (kmeansAddMatrix["2"]*100/kmeansAddMatrix["total"]))\
.withColumn("3_norm", (kmeansAddMatrix["3"]*100/kmeansAddMatrix["total"]))\
.withColumn("4_norm", (kmeansAddMatrix["4"]*100/kmeansAddMatrix["total"]))\
.drop("0","1","2","3","4","total").sort("predicted_value").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------+------+------+------+------+
|predicted_value|     0|     1|     2|     3|     4|
+---------------+------+------+------+------+------+
|              0|138992|136785|104405|112086|111977|
|              1|   987| 22654|   475|   837|   470|
|              2|  5585|  3733| 16063| 13056| 14111|
|              3|  6909|  3291| 13975| 13842|  6968|
|              4|  2223|   233| 19782| 14791| 21174|
+---------------+------+------+------+------+------+

+---------------+------------------+-------------------+------------------+------------------+------------------+
|predicted_value|            0_norm|             1_norm|            2_norm|            3_norm|            4_norm|
+---------------+------------------+-------------------+------------------+------------------+------------------+
|              0| 23.00259000901952|  22.63734081374277|17.278587327987818|18.549760444852666|18.531721404397224|
|              1|3.8823112929237307|  89.10828777091609|1.86838689