In [1]:
import time
import json
import re
import string
import numpy as np
import pandas as pd
import nltk

from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import confusion_matrix
from sklearn import metrics
from sklearn.metrics import roc_curve, auc
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk import word_tokenize  

from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark import SQLContext
from pyspark.mllib.feature import Normalizer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel,LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from vaderSentiment import vaderSentiment
from pyspark.ml.feature import NGram

In [2]:
# New API
a = 8
spark_session = SparkSession\
        .builder\
        .appName("test4")\
        .master("spark://192.168.2.147:7077")\
        .config("spark.cores.max", a)\
        .config("spark.executor.memory", "6G")\
        .config("spark.executor.cores", 4)\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", False)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout", "30s")\
        .config("spark.driver.port", 9998)\
        .config("spark.blockManager.port", 10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")
sql_context = SQLContext(spark_context)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/22 21:40:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
# read the json file into dataframe
t = time.time()
t0 = time.time()
test_data = sql_context.read.json("hdfs://192.168.2.147:9000/test1/RC_2012-04")
print("Time to load test data set: ", time.time()-t0)
train_data = sql_context.read.csv("hdfs://192.168.2.147:9000/test1/training.1600000.processed.noemoticon.csv")
#train_data.printSchema()
#test_data.printSchema()

                                                                                

Time to load test data set:  83.18953895568848


## Data Preprocessing

In [9]:
# Remove punctuations mentions and alphanumeric characters
def remove_features(data):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data = data.lower()
    # remove hyperlinks
    data = url_re.sub(' ', data)
    # remove @mentions
    data = mention_re.sub(' ', data)
    # remove puncuation
    data = punc_re.sub(' ', data)
    # remove numeric 'words'
    data = num_re.sub(' ', data)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned = ''
    for word in data.split():
        if list_pos == 0:
            if alpha_num_re.match(word):
                cleaned = word
            else:
                cleaned = ' '
        else:
            if alpha_num_re.match(word):
                cleaned = cleaned + ' ' + word
            else:
                cleaned += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned.split())

# Remove stop words
def remove_stops(data):
    # expects a string
    # stops = set(stopwords.words("english"))
    stopwords = [u'rt', u're', u'i', u'me', u'my', u'myself', u'we', u'our', u'ours', u'ourselves', u'you', u'your',
             u'yours', u'yourself', u'yourselves', u'he', u'him', u'his', u'himself', u'she', u'her', u'hers',
             u'herself', u'it', u'its', u'itself', u'they', u'them', u'their', u'theirs', u'themselves', u'what',
             u'which', u'who', u'whom', u'this', u'that', u'these', u'those', u'am', u'is', u'are', u'was', u'were',
             u'be', u'been', u'being', u'have', u'has', u'had', u'having', u'do', u'does', u'did', u'doing', u'a',
             u'an', u'the', u'and', u'but', u'if', u'or', u'because', u'as', u'until', u'while', u'of', u'at', u'by',
             u'for', u'with', u'about', u'against', u'between', u'into', u'through', u'during', u'before', u'after',
             u'above', u'below', u'to', u'from', u'up', u'down', u'in', u'out', u'on', u'off', u'over', u'under',
             u'again', u'further', u'then', u'once', u'here', u'there', u'when', u'where', u'why', u'how', u'all',
             u'any', u'both', u'each', u'few', u'more', u'most', u'other', u'some', u'such', u'no', u'nor', u'not',
             u'only', u'own', u'same', u'so', u'than', u'too', u'very', u's', u't', u'can', u'will', u'just', u'don',
             u'should', u'now']
    list_pos = 0
    cleaned = ''
    text = data.split()
    for word in text:
        if word not in stopwords:
            # rebuild cleaned
            if list_pos == 0:
                cleaned = word
            else:
                cleaned = cleaned + ' ' + word
            list_pos += 1
    return cleaned

remove_features_udf = udf(remove_features, StringType())
remove_stops_udf = udf(remove_stops, StringType())

# Preprocess train data
t1 = time.time()
new_train_data = train_data.withColumn('remove_features_text', remove_features_udf(train_data['_c5']))
new_train_data = new_train_data.withColumn('remove_stops_text', remove_stops_udf('remove_features_text'))
new_train_data = new_train_data.withColumn('label', new_train_data['_c0'].cast(DoubleType()))

# Preprocess test data
new_test_data = test_data[test_data.body!='[deleted]']
new_test_data = new_test_data.withColumn('remove_features_text', remove_features_udf(new_test_data['body']))
new_test_data = new_test_data.withColumn('remove_stops_text', remove_stops_udf('remove_features_text'))
new_test_data = new_test_data.select('body', 'remove_stops_text')
print('time for data preprocess:', time.time()-t1)
new_train_data.select('_c5', 'remove_stops_text', 'label').show(5)
new_test_data.show(5)

time for data preprocess: 0.19132685661315918
+--------------------+--------------------+-----+
|                 _c5|   remove_stops_text|label|
+--------------------+--------------------+-----+
|@switchfoot http:...|awww bummer shoul...|  0.0|
|is upset that he ...|upset update face...|  0.0|
|@Kenichan I dived...|dived many times ...|  0.0|
|my whole body fee...|whole body feels ...|  0.0|
|@nationwideclass ...|  behaving m mad see|  0.0|
+--------------------+--------------------+-----+
only showing top 5 rows

+--------------------+--------------------+
|                body|   remove_stops_text|
+--------------------+--------------------+
|I think an equall...|think equally rea...|
|[Hi.](http://meme...|hi yo dawg xzibit...|
|yeah,im going to ...|yeah im going kee...|
|[Here you go] (ht...|                  go|
|[Is this better?]...| better com dm m png|
+--------------------+--------------------+
only showing top 5 rows



## Logistic Regression Model

In [10]:
# Tokenize the document
tokenizer = Tokenizer(inputCol='remove_stops_text', outputCol='words')
wordsDataFrame = tokenizer.transform(new_train_data)
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='hashing')
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol='features')

t2 = time.time()
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])
# Train the model
model = pipeline.fit(new_train_data)
print('time for model training:', time.time()-t2)

t3 = time.time()
# Predict sentiment
prediction = model.transform(new_test_data)
print('time for testing:', time.time()-t3)
prediction.select('body', 'prediction').show(10)

                                                                                

time for model training: 122.59346675872803
time for testing: 0.17111730575561523
+--------------------+----------+
|                body|prediction|
+--------------------+----------+
|I think an equall...|       4.0|
|[Hi.](http://meme...|       4.0|
|yeah,im going to ...|       4.0|
|[Here you go] (ht...|       4.0|
|[Is this better?]...|       4.0|
|Thanks! I tweaked...|       4.0|
|Noooooooooooooooo...|       4.0|
|Once in a while I...|       0.0|
|Mudkipz. I liek t...|       4.0|
|You know the Arby...|       0.0|
+--------------------+----------+
only showing top 10 rows



In [11]:
# Test for output JSON file
output_df = prediction.select('body', 'prediction')
output_df.printSchema()
output_df.count()

root
 |-- body: string (nullable = true)
 |-- prediction: double (nullable = false)



                                                                                

17559861

In [None]:
# output_df.write.json("test1001.json")
output_df.repartition(1).write.json("output3/test1204")



In [None]:
spark_context.stop()

[Stage 0:>                                                         (0 + 0) / 13]

**Import data into MongoDB database**

mongoimport --db=reddit --collection=result200512 --file=test0512.json<br>
mongoimport --db=reddit --collection=result201001 --file=test1001.json<br>
mongoimport --db=reddit --collection=result201011 --file=test1011.json<br>
mongoimport --db=reddit --collection=result201204 --file=test1204.json