In [1]:
# Initializing Spark
with open("setupPySpark.py", "r") as setup_file:
    exec(setup_file.read())


In [2]:
# Spark context
from pyspark.sql.session import SparkSession

spark = SparkSession(sc)

In [3]:
# SQL context
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [4]:
# Importing required functions
from util import read_file, read_folder, get_character_similarity
from pandas import Series, DataFrame
from util_spark import remove_stopwords_spark, detect_language_spark, flatten_list_of_tokens, spell_correct_tokens_spark, get_semantic_similarity_spark
from tokenization_spark import tokenize_sentence_nltk_spark
from pyspark.sql.functions import col
from pos_tagging_spark import run_treetagger_pos_tag_spark
from modeling_spark import run_word2vec_model_pyspark
from json import load

In [5]:
# Reading input file(s) using python's default libraries
in_file = load(open("in_file.cfg"))
patterns_file = in_file["patterns_file"]
file_folder = in_file["file_folder"]
label = in_file["label"]
column = in_file["column"]
in_type = in_file["in_type"]
in_file = in_file["in_file"]
if file_folder == "file":
    strings = read_file(in_file, in_type = in_type)
    if in_type == "text":
        strings = tokenize_sentence_nltk(strings)
        strings = DataFrame(strings)[0]
    elif in_type == "html_chat":
        timestamp = strings[2]
        meta_data = strings[1]
        strings = strings[0]
        strings[label] = meta_data["Comment"]
        labels = strings[label]
        strings = strings[column]
    else:
        if label in strings.columns:
            labels = strings[label]
        strings = strings[column]
else:
    strings = read_folder(in_file, in_type = in_type)
    patterns = Series([".*" + x + ".*" for x in open(patterns_file, 'r').readlines()])

In [6]:
# Appending conversation together and creating spark data frome
try:
    strings['conversation'] = strings['conversation'].apply(lambda x: ". ".join(x["Message"]))
except:
    pass
sentenceDataFrame = spark.createDataFrame(strings)

In [7]:
# Creating list of sentences for each conversation
sentenceDataFrame = tokenize_sentence_nltk_spark(df = sentenceDataFrame, in_col = "conversation")

In [8]:
# Language identification and filtering
sentenceDataFrame = detect_language_spark(df = sentenceDataFrame, in_col = "conversation", out_col = "language")
sentenceDataFrame = sentenceDataFrame.where(col('language') == "en")

In [9]:
# POS tagging and lemmatization using TreeTagger
sentenceDataFrame = run_treetagger_pos_tag_spark(df = sentenceDataFrame, in_col = "conversation", out_col = "pos", get_lemma = True)

In [10]:
# Merging 2 consecutive words if a) Words are incorrectly spelled and b) Merged word is correctly spelled
sentenceDataFrame = spell_correct_tokens_spark(df = sentenceDataFrame, in_col = "pos")

In [11]:
# Flattening out token of rows and running word2vec model
sentenceDataFrame = flatten_list_of_tokens(sentenceDataFrame, in_col = "pos")
model, sentenceDataFrame = run_word2vec_model_pyspark(sentenceDataFrame, in_col = "pos", vec_size = 100, in_type = "tokens", out_col = "result")

In [12]:
# Collecting document vectors in a list
doc_vecs = []
for row in sentenceDataFrame.select('result').collect():
    doc_vecs = doc_vecs + [row['result']]

In [13]:
sim1 = get_semantic_similarity_spark(model)

In [14]:
sim1.head()

Unnamed: 0,lover,rate,assert,termination,irs,california,e-meetings,scenario,nbsp,gardener,...,capacity,physical,brilliant,never,those,administrative,d,nickname,cash,only
lover,1.0,-0.029827,0.346149,0.097563,0.116361,-0.334425,0.290318,-0.33366,-0.088317,-0.263155,...,-0.366859,-0.018899,-0.409934,-0.338293,0.235658,0.303156,0.327642,-0.019418,-0.397134,-0.043701
rate,-0.029827,1.0,0.577931,0.594535,-0.389364,0.634342,0.620144,0.358991,-0.205235,0.361067,...,0.485562,0.542137,0.237939,0.545577,0.113771,0.350214,0.219478,0.638139,0.298379,0.782942
assert,0.346149,0.577931,1.0,0.611674,-0.146607,0.318166,0.699524,0.192764,-0.448236,0.161652,...,0.239511,0.553953,-0.026611,0.258411,0.443083,0.518845,0.543767,0.468013,0.075075,0.615667
termination,0.097563,0.594535,0.611674,1.0,-0.421173,0.31814,0.517693,0.148874,-0.601911,-0.0382,...,0.289914,0.383086,-0.010234,0.320089,0.070759,0.150894,0.198482,0.574032,0.178088,0.6713
irs,0.116361,-0.389364,-0.146607,-0.421173,1.0,-0.395529,-0.234021,0.038187,-0.222186,0.151884,...,-0.13445,-0.177418,0.041391,-0.249011,0.587008,0.26064,0.560561,-0.522951,-0.229981,-0.611751


In [15]:
ratio = get_character_similarity(sim1.columns, ratio_type = 'ratio')

In [16]:
partial_ratio = get_character_similarity(sim1.columns, ratio_type = 'partial_ratio')

In [17]:
token_sort_ratio = get_character_similarity(sim1.columns, ratio_type = 'token_sort_ratio')

In [18]:
token_set_ratio = get_character_similarity(sim1.columns, ratio_type = 'token_set_ratio')

In [19]:
sim1 = sim1[ratio.columns]
sim1 = sim1.loc[ratio.columns]

In [20]:
semantic_weight = 0.5
ratio_weight = 0.4
partial_ratio_weight = 0.4
token_sort_ratio_weight = 0.1
sim = 1 - (semantic_weight*sim1 + (ratio_weight*ratio + partial_ratio_weight*partial_ratio + token_sort_ratio_weight*token_sort_ratio + (1-ratio_weight-partial_ratio_weight-token_sort_ratio_weight)*token_set_ratio)*(1-semantic_weight))