In [28]:
from pyspark.sql import SparkSession
import os

MAX_MEMORY = "5g"

spark = SparkSession.builder \
                    .appName('multi_class_text_classifiter')\
                    .config("spark.executor.memory", MAX_MEMORY) \
                    .config("spark.driver.memory", MAX_MEMORY) \
                    .getOrCreate()
spark

In [29]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("value", StringType(), True)
])
total_df = spark.createDataFrame([], schema)

for file_name in os.listdir("../data"):
    df = spark.read.option("header", "true").text('../data/' + file_name)
    total_df = total_df.union(df)

In [30]:
total_df.count()

3297

In [31]:
total_df.show()

+--------------------+
|               value|
+--------------------+
|    ### abstract ###|
|AIMX	we test in t...|
|OWNX	we manipulat...|
|OWNX	our analysis...|
|OWNX	the magnitud...|
|OWNX	we conclude ...|
|### introduction ###|
|MISC	lured by tem...|
|MISC	self-control...|
|MISC	for example,...|
|MISC	the student ...|
|MISC	and, similar...|
|MISC	perhaps less...|
|MISC	this concept...|
|MISC	that individ...|
|MISC	for example,...|
|MISC	nonetheless,...|
|MISC	that is, pro...|
|MISC	self-control...|
|MISC	a multitude ...|
+--------------------+
only showing top 20 rows



In [32]:
import re

def process_line(x):
    line = x['value']
    parts = re.split("\s+",line,1)
    sub_parts = re.split('--', parts[0])
    parts_1 = ''
    if len(sub_parts) > 1:
       parts_1 = sub_parts[1] + ' ' + parts[1]
    else:
       parts_1 = parts[1]
    return ([sub_parts[0],parts_1])

In [33]:
input_rdd = total_df.rdd \
                    .filter(lambda x : x['value'] not in ['### introduction ###','### abstract ###']) \
                    .map(lambda x : process_line(x))

input_df = input_rdd.toDF()
input_df.show()

+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|AIMX|we test in the co...|
|OWNX|we manipulated th...|
|OWNX|our analysis reve...|
|OWNX|the magnitude of ...|
|OWNX|we conclude that ...|
|MISC|lured by temptati...|
|MISC|self-control fail...|
|MISC|for example, the ...|
|MISC|the student may f...|
|MISC|and, similarly, t...|
|MISC|perhaps less intu...|
|MISC|this conceptualiz...|
|MISC|that individuals ...|
|MISC|for example, many...|
|MISC|nonetheless, one ...|
|MISC|that is, pro-soci...|
|MISC|self-control-our ...|
|MISC|a multitude of co...|
|MISC|typically, and in...|
|MISC|willpower, then, ...|
+----+--------------------+
only showing top 20 rows



In [34]:
input_df.groupBy('_1').count().show()

+----+-----+
|  _1|count|
+----+-----+
|BASE|   61|
|OWNX|  867|
|CONT|  170|
|MISC| 1825|
|AIMX|  194|
+----+-----+



In [35]:
import gensim.parsing.preprocessing as gsp
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from gensim import utils


filters = [
           gsp.strip_tags, 
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric,
           gsp.remove_stopwords, 
           gsp.strip_short, 
           gsp.stem_text
          ]

def clean_text(x):
    s = x[1]
    s = s.lower()
    s = utils.to_unicode(s)
    for f in filters:
        s = f(s)
    return (x[0],s)

In [36]:
input_rdd.take(1)[0][1]

u'we test in the context of a dictator game the proposition that individuals may experience a self-control conflict between the temptation to act selfishly and the better judgment to act pro-socially'

In [37]:
clean_text(input_rdd.take(1)[0])[1]

u'test context dictat game proposit individu experi self control conflict temptat act selfishli better judgment act pro social'

In [38]:
cleaned_rdd = input_rdd.map(lambda x : clean_text(x))

In [39]:
cleaned_df = cleaned_rdd.toDF()
cleaned_df.show()

+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|AIMX|test context dict...|
|OWNX|manipul likelihoo...|
|OWNX|analysi reveal po...|
|OWNX|magnitud effect e...|
|OWNX|conclud subtl cue...|
|MISC|lure temptat indi...|
|MISC|self control fail...|
|MISC|exampl dieter fac...|
|MISC|student feel conf...|
|MISC|similarli fashion...|
|MISC|intuit importantl...|
|MISC|conceptu help rec...|
|MISC|individu care sel...|
|MISC|exampl individu v...|
|MISC|nonetheless imagi...|
|MISC|pro social prefer...|
|MISC|self control capa...|
|MISC|multitud conceptu...|
|MISC|typic line classi...|
|MISC|willpow repres co...|
+----+--------------------+
only showing top 20 rows



In [41]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="_2", outputCol="tokens")
w2v = Word2Vec(vectorSize=300, minCount=0, inputCol="tokens", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[tokenizer,w2v])
doc2vec_model = doc2vec_pipeline.fit(cleaned_df)
doc2vecs_df = doc2vec_model.transform(cleaned_df)

In [49]:
doc2vecs_df.show()

+----+--------------------+--------------------+--------------------+
|  _1|                  _2|              tokens|            features|
+----+--------------------+--------------------+--------------------+
|AIMX|test context dict...|[test, context, d...|[-0.0369722979828...|
|OWNX|manipul likelihoo...|[manipul, likelih...|[-0.0605062282910...|
|OWNX|analysi reveal po...|[analysi, reveal,...|[-0.0351625842740...|
|OWNX|magnitud effect e...|[magnitud, effect...|[0.00155935803195...|
|OWNX|conclud subtl cue...|[conclud, subtl, ...|[-0.0070894029567...|
|MISC|lure temptat indi...|[lure, temptat, i...|[-0.0176912855919...|
|MISC|self control fail...|[self, control, f...|[-0.0023453514066...|
|MISC|exampl dieter fac...|[exampl, dieter, ...|[-0.0113371225265...|
|MISC|student feel conf...|[student, feel, c...|[-0.0150255131185...|
|MISC|similarli fashion...|[similarli, fashi...|[-0.0130583480304...|
|MISC|intuit importantl...|[intuit, importan...|[-0.0272456038722...|
|MISC|conceptu help 

In [51]:
w2v_train_df, w2v_test_df = doc2vecs_df.randomSplit([0.8, 0.2])

In [58]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

si = StringIndexer(inputCol="_1", outputCol="label")
rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features")

rf_classifier_pipeline = Pipeline(stages=[si,rf_classifier])
rf_predictions = rf_classifier_pipeline.fit(w2v_train_df).transform(w2v_test_df)

rf_model_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [59]:
accuracy = rf_model_evaluator.evaluate(rf_predictions)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.641141


In [56]:
from pyspark.ml.classification import LogisticRegression

lr_classifier = LogisticRegression(family="multinomial")

lr_classifier_pipeline = Pipeline(stages=[si,lr_classifier])
lr_predictions = lr_classifier_pipeline.fit(w2v_train_df).transform(w2v_test_df)

lr_model_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [57]:
accuracy = lr_model_evaluator.evaluate(lr_predictions)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.624625
