In [1]:
#!pip install pip install beautifulsoup4
#!pip install nltk

In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import re
from pyspark.ml import Transformer, Pipeline
from pyspark.sql.functions import udf
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.feature import NGram, Tokenizer, CountVectorizer, SQLTransformer
import pandas as pd
import pprint
pp = pprint.PrettyPrinter(indent=4)
pd.set_option('display.max_colwidth', 1000)
spark = SparkSession.builder.getOrCreate()

In [81]:
schema = StructType([
   StructField("sentiment", IntegerType(), False),
   StructField("id", IntegerType(), False),
   StructField("date", StringType(), False),
   StructField("query_string", StringType(), False),
   StructField("user", StringType(), False),
   StructField("text", StringType(), False),
])
path = "gs://sentiment-twitter-analys/data/trainingandtestdata/training.1600000.processed.noemoticon.csv"
df = spark.read.format("csv").load(path, schema=schema)

In [82]:
df = df.select(['sentiment', 'text']).limit(100000)

### PreProcessing

In [6]:
pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
combined_pat = r'|'.join((pat1, pat2))
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')



In [7]:

class PreprocessingTokenizer(Transformer):

    pat1 = r'@[A-Za-z0-9_]+'
    pat2 = r'https?://[^ ]+'
    combined_pat = r'|'.join((pat1, pat2))
    www_pat = r'www.[^ ]+'
    negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                    "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                    "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                    "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                    "mustn't":"must not"}
    neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')
    tok = WordPunctTokenizer()
    
    def __init__(self, inputCol=None, outputCol=None):
        super(PreprocessingTokenizer, self).__init__()
        self.inputCol=inputCol
        self.outputCol=outputCol
        
       

    def _transform(self, dataset):
        def f(text):
            soup = BeautifulSoup(text, 'lxml')
            souped = soup.get_text()
            try:
                bom_removed = souped.decode("utf-8-sig").replace(u"\ufffd", "?")
            except:
                bom_removed = souped
            stripped = re.sub(self.combined_pat, '', bom_removed)
            stripped = re.sub(self.www_pat, '', stripped)
            lower_case = stripped.lower()
            neg_handled = self.neg_pattern.sub(lambda x: self.negations_dic[x.group()], lower_case)
            letters_only = re.sub("[^a-zA-Z]", " ", neg_handled)
            words = [x for x  in self.tok.tokenize(letters_only) if len(x) > 1]
            return (" ".join(words)).strip()

        return dataset.withColumn(self.outputCol, udf(f, StringType())(dataset[self.inputCol]))

In [8]:
pt = PreprocessingTokenizer(inputCol='text', outputCol='text_clean')

### Feature engineering

In [83]:
tk = Tokenizer(inputCol='text', outputCol='words')
ng1 = NGram(n=1, inputCol='words', outputCol='1_gr_words')
ng2 = NGram(n=2, inputCol='words', outputCol='2_gr_words')
ng3 = NGram(n=3, inputCol='words', outputCol='3_gr_words')
statement = """
SELECT
    *, concat(1_gr_words, 2_gr_words, 3_gr_words) c_words
FROM
    __THIS__
"""
sql = SQLTransformer(statement=statement)
cv = CountVectorizer(inputCol='c_words', vocabSize=80000, outputCol='features')

### Building model

In [84]:
df_train, df_test = df.randomSplit([0.8, 0.2], seed=100500)

In [85]:
df_train.count(), df_test.count()

(79872, 20128)

In [86]:
lr = LogisticRegression(featuresCol='features', labelCol='sentiment', maxIter=5000)
pipeline_model = Pipeline(stages=[pt, tk, ng1, ng2, ng3, sql, cv, lr]).fit(df_train)

### Evaluating model

In [87]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [88]:
ev = MulticlassClassificationEvaluator(labelCol='sentiment',metricName="accuracy", predictionCol='prediction')

In [89]:
df_predict = pipeline_model.transform(df_test).cache()

In [90]:
ev.evaluate(df_predict)

1.0

In [92]:
df_predict.select('sentiment').where('sentiment==0').count()

0

### Store model(pipeline)