In [1]:
import pyspark 
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

In [3]:
MAX_MEMORY = "5g"

spark = SparkSession.builder \
                    .appName('multi_class_text_classifiter')\
                    .config("spark.executor.memory", MAX_MEMORY) \
                    .config("spark.driver.memory", MAX_MEMORY) \
                    .getOrCreate()

In [4]:
df = spark.read.csv('./train.csv',inferSchema=True,header=True)

In [17]:
df.show()

+--------------------+--------------------+------+
|                 qid|       question_text|target|
+--------------------+--------------------+------+
|00002165364db923c7e6|How did Quebec na...|     0|
|000032939017120e6e44|Do you have an ad...|     0|
|0000412ca6e4628ce2cf|Why does velocity...|     0|
|000042bf85aa498cd78e|How did Otto von ...|     0|
|0000455dfa3e01eae3af|Can I convert mon...|     0|
|00004f9a462a357c33be|Is Gaza slowly be...|     0|
|00005059a06ee19e11ad|Why does Quora au...|     0|
|0000559f875832745e2e|Is it crazy if I ...|     0|
|00005bd3426b2d0c8305|Is there such a t...|     0|
|00006e6928c5df60eacb|Is it just me or ...|     0|
|000075f67dd595c3deb5|What can you say ...|     0|
|000076f3b42776c692de|How were the Calg...|     0|
|000089792b3fc8026741|What is the dumbe...|     0|
|000092a90bcfbfe8cd88|Can we use our ex...|     0|
|000095680e41a9a6f6e3|I am 30, living a...|     0|
|0000a89942e3143e333a|What do you know ...|     0|
|0000b8e1279eaa0a7062|How diffi

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
print((df.count(), len(df.columns)))

In [None]:
df.describe('target').show()

In [None]:
df.filter(df.target==1).show()

In [None]:
df.filter(df.target==1).count()

In [None]:
df.orderBy(df.target).show()

In [5]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, Tokenizer

In [18]:
filtered_df = df.filter((df.target == 1) | (df.target == 0))

In [19]:
filtered_df = filtered_df.select('question_text','target')

In [22]:
filtered_df.show()

+--------------------+------+
|       question_text|target|
+--------------------+------+
|How did Quebec na...|     0|
|Do you have an ad...|     0|
|Why does velocity...|     0|
|How did Otto von ...|     0|
|Can I convert mon...|     0|
|Is Gaza slowly be...|     0|
|Why does Quora au...|     0|
|Is it crazy if I ...|     0|
|Is there such a t...|     0|
|Is it just me or ...|     0|
|What can you say ...|     0|
|How were the Calg...|     0|
|What is the dumbe...|     0|
|Can we use our ex...|     0|
|I am 30, living a...|     0|
|What do you know ...|     0|
|How difficult is ...|     0|
|Have you licked t...|     0|
|Do you think Amaz...|     0|
|How many baronies...|     0|
+--------------------+------+
only showing top 20 rows



In [23]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

stopwordsRemover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
tokenizer = Tokenizer(inputCol="question_text", outputCol="tokens")
w2v = Word2Vec(vectorSize=100, minCount=0, inputCol="filtered_tokens", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[tokenizer,stopwordsRemover,w2v])
doc2vec_model = doc2vec_pipeline.fit(filtered_df)
doc2vecs_df = doc2vec_model.transform(filtered_df)

In [24]:
doc2vecs_df.show()

+--------------------+------+--------------------+--------------------+--------------------+
|       question_text|target|              tokens|     filtered_tokens|            features|
+--------------------+------+--------------------+--------------------+--------------------+
|How did Quebec na...|     0|[how, did, quebec...|[quebec, national...|[-0.0455056603532...|
|Do you have an ad...|     0|[do, you, have, a...|[adopted, dog,, e...|[0.00572819014390...|
|Why does velocity...|     0|[why, does, veloc...|[velocity, affect...|[-0.0184132257210...|
|How did Otto von ...|     0|[how, did, otto, ...|[otto, von, gueri...|[0.05853901430964...|
|Can I convert mon...|     0|[can, i, convert,...|[convert, montra,...|[-0.0443321327074...|
|Is Gaza slowly be...|     0|[is, gaza, slowly...|[gaza, slowly, be...|[0.02233039102117...|
|Why does Quora au...|     0|[why, does, quora...|[quora, automatic...|[0.06008154375012...|
|Is it crazy if I ...|     0|[is, it, crazy, i...|[crazy, wash, wip...

In [25]:
w2v_train_df, w2v_test_df = doc2vecs_df.randomSplit([0.8, 0.2])

In [26]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

In [28]:
si = StringIndexer(inputCol="target", outputCol="label")
lr_classifier = LogisticRegression(family="binomial")

lr_classifier_pipeline = Pipeline(stages=[si,lr_classifier])
lr_predictions = lr_classifier_pipeline.fit(w2v_train_df).transform(w2v_test_df)

In [30]:
lr_model_evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")

In [31]:
accuracy = lr_model_evaluator.evaluate(lr_predictions)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.598611
