In [105]:
import csv
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType
from pyspark.sql.functions import *

In [128]:
match_paths = ['MNLI/dev_matched.tsv', 'MNLI/test_matched.tsv']
mismatch_paths = ['MNLI/dev_mismatched.tsv', 'MNLI/test_mismatched.tsv']
training_path = 'MNLI/train.tsv'
stop_words_path = 'MNLI/stopwords.txt'
remote_path = 's3://qsftw-bucket/COMP5349/dev_matched.tsv'

In [107]:
spark = SparkSession \
    .builder \
    .appName("training explore") \
    .getOrCreate()

match = spark.read.csv(match_paths,header=True,sep='\t')
mismatch = spark.read.csv(mismatch_paths,header=True,sep='\t')
match.printSchema()

root
 |-- index: string (nullable = true)
 |-- promptID: string (nullable = true)
 |-- pairID: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- sentence1_binary_parse: string (nullable = true)
 |-- sentence2_binary_parse: string (nullable = true)
 |-- sentence1_parse: string (nullable = true)
 |-- sentence2_parse: string (nullable = true)
 |-- sentence1: string (nullable = true)
 |-- sentence2: string (nullable = true)
 |-- label1: string (nullable = true)
 |-- label2: string (nullable = true)
 |-- label3: string (nullable = true)
 |-- label4: string (nullable = true)
 |-- label5: string (nullable = true)
 |-- gold_label: string (nullable = true)



In [108]:
match_sentences = match.select("sentence1", "sentence2").rdd.map(list)

mismatch_sentences = mismatch.select("sentence1", "sentence2").rdd.map(list)

In [109]:
def concatSentence(line):
    '''
    Concatenate sentence1 and sentence2 and get the words in the 
    two sentences and return as a list of words. Basic preprocessing
    including remove empty string and convert to lowercase is done.
    '''
    line1 = line[0] if line[0] != None else ""
    line2 = line[1] if line[1] != None else ""
    concat = line1 + " " + line2
    wordList = concat.lower().split(' ')
    processedWordList = []
    for word in wordList:
        if word != '':
            processedWordList.append(word.strip())   
    return processedWordList

match_words = match_sentences \
                .flatMap(concatSentence) \
                .distinct().map(lambda word:(word,1)) \
                .cache()

mismatch_words = mismatch_sentences \
                    .flatMap(concatSentence) \
                    .distinct() \
                    .map(lambda word:(word,1)) \
                    .cache()

In [110]:
common_words_count = match_words.join(mismatch_words).count()
unique_matches_count = match_words.subtractByKey(mismatch_words).count()
unique_mismatches_count = mismatch_words.subtractByKey(match_words).count()

print("Number of Common Words:\t" + str(common_words_count) + '\n' + \
     "Number of Unique Matched Words:\t" + str(unique_matches_count) + '\n' + \
     "Number of Unique Mismatched Words:\t" + str(unique_mismatches_count) + '\n')

Number of Common Words:	13000
Number of Unique Matched Words:	18592
Number of Unique Mismatched Words:	15826



In [111]:
train = spark.read.csv(training_path,header=True,sep='\t')
train_sentences = train.select("genre", "sentence1", "sentence2").rdd.map(list)

In [135]:
def makeGenreWordPair(line):
    wordList = concatSentence([line[1],line[2]])
    return [(word, line[0]) for word in wordList]

word_genre_pair = train_sentences.flatMap(makeGenreWordPair)

words_genreSet_pair = word_genre_pair \
                        .groupByKey() \
                        .mapValues(set) \
                        .cache()

num_genre_num_words_pair = words_genreSet_pair \
                            .map(lambda line: (len(line[1]), line[0])) \
                            .groupByKey() \
                            .mapValues(len) \
                            .collect()

In [136]:
def showPercentages(result):
    total_words = 0
    for pair in result:
        total_words += pair[1]
    for pair in sorted(num_genre_num_words_pair):
        print("Percentage of words in " + str(pair[0]) + 'genre(s): \t' + str(pair[1]/total_words))
showPercentages(num_genre_num_words_pair)

Percentage of words in 1genre(s): 	0.6880971408805181
Percentage of words in 2genre(s): 	0.1308849314741701
Percentage of words in 3genre(s): 	0.07043429512029295
Percentage of words in 4genre(s): 	0.049942149647530326
Percentage of words in 5genre(s): 	0.06064148287748846


In [137]:
stopwords = []
with open(stop_words_path) as f:
    stopwords = f.read().splitlines()
spark.sparkContext.broadcast(stopwords)

words_genreSet_pair_stopwords = words_genreSet_pair.filter(lambda line: line[0] not in stopwords)
num_genre_num_words_pair_stopwords = words_genreSet_pair_stopwords \
                                        .map(lambda line: (len(line[1]), line[0])) \
                                        .groupByKey() \
                                        .mapValues(len) \
                                        .collect()

showPercentages(num_genre_num_words_pair_stopwords)

Percentage of words in 1genre(s): 	0.6890217319900388
Percentage of words in 2genre(s): 	0.13106080060197434
Percentage of words in 3genre(s): 	0.07052893716892905
Percentage of words in 4genre(s): 	0.050009256549755444
Percentage of words in 5genre(s): 	0.060722966395738404


[(4, 8374), (5, 10168), (1, 115376), (2, 21946), (3, 11810)]
[(4, 8360), (5, 9995), (1, 115362), (2, 21936), (3, 11796)]
