In [4]:
from pyspark.context import  SparkContext
from pyspark.sql.functions import col, lower
from pyspark.sql import SQLContext
import re
sc = SparkContext('local','test1')
sql = SQLContext(sc)

In [227]:
from pyspark.sql.functions import lit
dems_df =  sql.read.text("dems.txt")
gop_df = sql.read.text("gop.txt")

In [228]:
corpus_df = dems_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))

In [229]:
corpus_df.select("*").limit(20).show()

+--------------------+-----+
|               value|label|
+--------------------+-----+
|This week @senate...|    1|
|Health care profe...|    1|
|RT @SeemaNanda: G...|    1|
|Republicans keep ...|    1|
|RT @SpeakerPelosi...|    1|
|While the preside...|    1|
|You are not alone...|    1|
|RT @DNCWarRoom: W...|    1|
|RT @DNCWarRoom: T...|    1|
|RT @DNCWarRoom: T...|    1|
|LISTEN. TO. HEALT...|    1|
|RT @SeemaNanda: B...|    1|
|This is a HUGE wi...|    1|
|RT @SenSherrodBro...|    1|
|RT @WisDems: Make...|    1|
|RT @DemConvention...|    1|
|Abortion is healt...|    1|
|RT @RepLucyMcBath...|    1|
|Get counted. Get ...|    1|
+--------------------+-----+



In [230]:
from pyspark.sql.functions import udf,lower,col,trim
from pyspark.sql.types import FloatType,StringType,IntegerType
def clean_text(text):
    text=re.sub(r'@[A-Za-z0-9]+','',text).strip() #remove mentions
    text=re.sub(r'#','',text).strip() #removing #symbol
    text=re.sub(r'RT[\s]+','',text).strip()
    text=re.sub(r'[?|$|.|!|;|:|&|"|,|"|"|*|-|(|)]','',text).strip()
    text=re.sub(r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)+',"",text).strip()
    return text.strip()

In [231]:
def remove_emoji(string):
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', string).strip()

In [232]:
#value
clean_udf_str=udf(lambda z: clean_text(z), StringType())
corpus_df=corpus_df.select("label",clean_udf_str("value").alias("value"))
emoji_udf_str=udf(lambda z: remove_emoji(z), StringType())
corpus_df=corpus_df.select("label",emoji_udf_str('value').alias('value'))
corpus_df=corpus_df.select(trim(lower(col('value'))).alias("value"),"label")

In [233]:
#After Preprocessing
corpus_df.select("*").limit(20).show()

+--------------------+-----+
|               value|label|
+--------------------+-----+
|this week  said w...|    1|
|health care profe...|    1|
|good to see  sign...|    1|
|republicans keep ...|    1|
|the congress has ...|    1|
|while the preside...|    1|
|you are not alone...|    1|
|well this is conc...|    1|
|trump “in the end...|    1|
|trump proposed hu...|    1|
|listen to health ...|    1|
|breaking we  alon...|    1|
|this is a huge wi...|    1|
|update this is th...|    1|
|make sure your vo...|    1|
|in light of the u...|    1|
|abortion is healt...|    1|
|why does completi...|    1|
|get counted get c...|    1|
+--------------------+-----+



In [234]:
train_df, test_df = corpus_df.randomSplit([0.75, 0.25])

In [235]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")
cleaning_pipeline = Pipeline(stages = [tokenizer,stop_words_remover,vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)
cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)

In [236]:
cleaned_training_df.show()

+--------------------+-----+--------------------+--------------------+--------------------+
|               value|label|               words|       words_cleaned|            features|
+--------------------+-----+--------------------+--------------------+--------------------+
|                    |    1|                  []|                  []|   (58092,[0],[1.0])|
|'emergency' - an ...|    1|['emergency', -, ...|['emergency', -, ...|(58092,[18,23,46,...|
|'read the transcr...|    1|['read, the, tran...|['read, transcrip...|(58092,[0,2,3,180...|
|'s actions on dac...|    1|['s, actions, on,...|['s, actions, dac...|(58092,[1,204,247...|
|'s discriminatory...|    1|['s, discriminato...|['s, discriminato...|(58092,[0,4,203,2...|
|'s leadership is ...|    1|['s, leadership, ...|['s, leadership, ...|(58092,[0,41,81,1...|
|'s role in our na...|    1|['s, role, in, ou...|['s, role, nation...|(58092,[0,51,69,7...|
|'s visit to pa is...|    1|['s, visit, to, p...|['s, visit, pa, r...|(58092,[24

In [237]:
from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol = "label")

In [238]:
naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)

In [239]:
predictions_df.select("features","label","prediction").limit(20).show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|   (58092,[0],[1.0])|    1|       0.0|
|(58092,[0,12,24,1...|    1|       0.0|
|(58092,[0,119,179...|    1|       1.0|
|(58092,[18,109,12...|    1|       0.0|
|(58092,[10,11,12,...|    1|       1.0|
|(58092,[5,9,10,11...|    1|       1.0|
|(58092,[4,20,26,2...|    1|       1.0|
|(58092,[0,167,194...|    1|       1.0|
|(58092,[8,12,15,2...|    1|       1.0|
|(58092,[0,1,20,29...|    1|       0.0|
|(58092,[0,13,14,4...|    1|       1.0|
|(58092,[0,38,138,...|    1|       1.0|
|(58092,[4,6,9,22,...|    1|       1.0|
|(58092,[0,2,4,31,...|    1|       1.0|
|(58092,[5,7,20,24...|    1|       1.0|
|(58092,[0,38,55,9...|    1|       1.0|
|(58092,[7,25,44,5...|    1|       1.0|
|(58092,[10,11,43,...|    1|       1.0|
|(58092,[0,23,82,1...|    1|       1.0|
|(58092,[4,38,112,...|    1|       1.0|
+--------------------+-----+----------+



In [240]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
eval = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction', metricName = 'accuracy')
eval.evaluate(predictions_df)

0.8804404824331411

In [241]:
cleaned_training_df.select("features").show(4)

+--------------------+
|            features|
+--------------------+
|   (58092,[0],[1.0])|
|(58092,[18,23,46,...|
|(58092,[0,2,3,180...|
|(58092,[1,204,247...|
+--------------------+
only showing top 4 rows

