In [1]:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from operator import add
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import StopWordsRemover
from gensim.models import KeyedVectors

In [2]:
spark = SparkSession\
        .builder\
        .appName("QuoraInsincere")\
        .getOrCreate()
# conf = SparkConf().setMaster("local").setAppName("sample")
# sc = SparkContext(conf=conf)
# conf = spark.conf
# sc = SparkContext(conf)
sc = spark.sparkContext

In [3]:
corpus = spark.read.option("header","true").option("inferSchema","true").csv( "/home/akash/project/train.csv")

In [4]:
data = corpus.toDF("qid","question_text","target")

In [5]:
data.show()

+--------------------+--------------------+------+
|                 qid|       question_text|target|
+--------------------+--------------------+------+
|00002165364db923c7e6|How did Quebec na...|     0|
|000032939017120e6e44|Do you have an ad...|     0|
|0000412ca6e4628ce2cf|Why does velocity...|     0|
|000042bf85aa498cd78e|How did Otto von ...|     0|
|0000455dfa3e01eae3af|Can I convert mon...|     0|
|00004f9a462a357c33be|Is Gaza slowly be...|     0|
|00005059a06ee19e11ad|Why does Quora au...|     0|
|0000559f875832745e2e|Is it crazy if I ...|     0|
|00005bd3426b2d0c8305|Is there such a t...|     0|
|00006e6928c5df60eacb|Is it just me or ...|     0|
|000075f67dd595c3deb5|What can you say ...|     0|
|000076f3b42776c692de|How were the Calg...|     0|
|000089792b3fc8026741|What is the dumbe...|     0|
|000092a90bcfbfe8cd88|Can we use our ex...|     0|
|000095680e41a9a6f6e3|I am 30, living a...|     0|
|0000a89942e3143e333a|What do you know ...|     0|
|0000b8e1279eaa0a7062|How diffi

In [6]:
from pyspark.sql.functions import *
data = data.withColumn('question_text', ltrim(data.question_text))
data = data.withColumn('question_text', rtrim(data.question_text))
#data.show()
import re
#data=data.withColumn('question_text',commaRep(data.question_text))
data=data.withColumn('question_text', regexp_replace('question_text', ',', ''))
data.count()

1306140

In [7]:
data = data.where(data['question_text'] != "")
data.count()

1306126

In [8]:
tokenizer = Tokenizer(inputCol="question_text", outputCol="words")
tokenized = tokenizer.transform(data)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
cleanData = remover.transform(tokenized)

In [9]:
#cleanData = cleanData.limit(2000)
cleanData.show()

+--------------------+--------------------+------+--------------------+--------------------+
|                 qid|       question_text|target|               words|            filtered|
+--------------------+--------------------+------+--------------------+--------------------+
|00002165364db923c7e6|How did Quebec na...|     0|[how, did, quebec...|[quebec, national...|
|000032939017120e6e44|Do you have an ad...|     0|[do, you, have, a...|[adopted, dog, en...|
|0000412ca6e4628ce2cf|Why does velocity...|     0|[why, does, veloc...|[velocity, affect...|
|000042bf85aa498cd78e|How did Otto von ...|     0|[how, did, otto, ...|[otto, von, gueri...|
|0000455dfa3e01eae3af|Can I convert mon...|     0|[can, i, convert,...|[convert, montra,...|
|00004f9a462a357c33be|Is Gaza slowly be...|     0|[is, gaza, slowly...|[gaza, slowly, be...|
|00005059a06ee19e11ad|Why does Quora au...|     0|[why, does, quora...|[quora, automatic...|
|0000559f875832745e2e|Is it crazy if I ...|     0|[is, it, crazy, i...

In [10]:
word2Vec = Word2Vec(vectorSize=100, minCount=4, inputCol="filtered", outputCol="result")
model = word2Vec.fit(cleanData)

result = model.transform(cleanData)

In [11]:
result.show()

+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|                 qid|       question_text|target|               words|            filtered|              result|
+--------------------+--------------------+------+--------------------+--------------------+--------------------+
|00002165364db923c7e6|How did Quebec na...|     0|[how, did, quebec...|[quebec, national...|[6.35995451981822...|
|000032939017120e6e44|Do you have an ad...|     0|[do, you, have, a...|[adopted, dog, en...|[2.19369035524626...|
|0000412ca6e4628ce2cf|Why does velocity...|     0|[why, does, veloc...|[velocity, affect...|[-3.4805833794442...|
|000042bf85aa498cd78e|How did Otto von ...|     0|[how, did, otto, ...|[otto, von, gueri...|[-3.7509443548818...|
|0000455dfa3e01eae3af|Can I convert mon...|     0|[can, i, convert,...|[convert, montra,...|[-9.5511236577294...|
|00004f9a462a357c33be|Is Gaza slowly be...|     0|[is, gaza, slowly...|[gaza, slowly, be

In [12]:
from pyspark.ml.feature import IDF, CountVectorizer

cv = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")

cvmodel = cv.fit(result)

featurizedData = cvmodel.transform(result)
featurizedData.show(truncate=False)

+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("filtered", "features").show()

+--------------------+--------------------+
|            filtered|            features|
+--------------------+--------------------+
|[quebec, national...|(6759,[49,1496,18...|
|[adopted, dog, en...|(6759,[2,311,762,...|
|[velocity, affect...|(6759,[155,246,47...|
|[otto, von, gueri...|(6759,[40,3036,39...|
|[convert, montra,...|(6759,[224,327,57...|
|[gaza, slowly, be...|(6759,[491,1544,1...|
|[quora, automatic...|(6759,[36,559,118...|
|[crazy, wash, wip...|(6759,[890,1170,1...|
|[thing, dressing,...|(6759,[80,113,105...|
|[ever, phase, whe...|(6759,[1,2,9,10,2...|
|    [say, feminism?]|(6759,[167,3425],...|
|[calgary, flames,...|(6759,[1614,1865,...|
|[dumbest, yet, po...|(6759,[66,262,687...|
|[use, external, h...|(6759,[11,98,115,...|
|[30, living, home...|(6759,[35,205,287...|
|[know, bram, fisc...|(6759,[12,1930,27...|
|[difficult, find,...|(6759,[4,18,19,20...|
|[licked, skin, co...|(6759,[161,2480,6...|
|[think, amazon, a...|(6759,[6,109,217,...|
|[many, baronies, ...|(6759,[16,

In [21]:
from pyspark.ml.clustering import LDA
def train_LDA(dataset):
    num_topics = 20
    max_iterations = 100
    lda = LDA(k=num_topics, maxIter=max_iterations)
    model = lda.fit(dataset.select("filtered", "features", "result"))
    return model

In [22]:
#sincere = rescaledData.where(rescaledData["target"]==0)
topicModel = train_LDA(rescaledData)
topics = topicModel.describeTopics(1)
print("The topics described by their top-weighted terms :")
topics.show()

The topics described by their top-weighted terms :
+-----+-----------+--------------------+
|topic|termIndices|         termWeights|
+-----+-----------+--------------------+
|    0|       [10]|[0.02442143334678...|
|    1|        [3]|[0.02491455100207...|
|    2|        [4]|[0.03016236964390...|
|    3|       [31]|[0.02435691264999...|
|    4|       [15]|[0.02918693572376...|
|    5|      [245]|[0.01375003573680...|
|    6|       [30]|[0.02437284779045...|
|    7|        [0]|[0.03999776056016...|
|    8|        [7]|[0.0261999422240193]|
|    9|       [36]|[0.02149752960641...|
|   10|       [16]|[0.02179336000139...|
|   11|       [46]|[0.02106224911791...|
|   12|        [9]|[0.02454414443971...|
|   13|       [80]|[0.01435135603188...|
|   14|       [14]|[0.01675849855782...|
|   15|       [84]|[0.01690374978509...|
|   16|       [18]|[0.02666852243379...|
|   17|        [2]|[0.02784112616147...|
|   18|       [11]|[0.02174986196568...|
|   19|       [92]|[0.01843938168400...|
+-----

In [None]:
# insincere = rescaledData.where(rescaledData["target"]==1)
# modelInsincere = train_LDA(insincere)
# topicInsincere = modelInsincere.describeTopics(5)
# print("The topics described by their top-weighted terms for sincere questions:")
# topicInsincere.show()

In [23]:
from pyspark.sql.types import ArrayType, StringType

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))

In [24]:
topics.withColumn("topics_words", indices_to_terms(cvmodel.vocabulary)("termIndices")).show(truncate=False)

+-----+-----------+----------------------+------------+
|topic|termIndices|termWeights           |topics_words|
+-----+-----------+----------------------+------------+
|0    |[10]       |[0.024421433346785724]|[way]       |
|1    |[3]        |[0.024914551002078466]|[like]      |
|2    |[4]        |[0.03016236964390237] |[good]      |
|3    |[31]       |[0.02435691264999293] |[feel]      |
|4    |[15]       |[0.02918693572376691] |[it?]       |
|5    |[245]      |[0.013750035736803659]|[normal]    |
|6    |[30]       |[0.024372847790453338]|[start]     |
|7    |[0]        |[0.03999776056016788] |[best]      |
|8    |[7]        |[0.0261999422240193]  |[make]      |
|9    |[36]       |[0.02149752960641526] |[quora]     |
|10   |[16]       |[0.021793360001394698]|[many]      |
|11   |[46]       |[0.02106224911791196] |[indian]    |
|12   |[9]        |[0.024544144439714127]|[ever]      |
|13   |[80]       |[0.014351356031880317]|[different] |
|14   |[14]       |[0.01675849855782779] |[want]

In [None]:
#topicInsincere.withColumn("topics_words", indices_to_terms(cvmodel.vocabulary)("termIndices")).show(truncate=False)

In [25]:
transformedSincere = topicModel.transform(rescaledData)

In [None]:
#transformedInsincere =  modelInsincere.transform(transformed.where(rescaledData["target"]==1))

In [26]:
transformedSincere.show(truncate = False)

+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------