__Imports__

In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from google_drive_downloader import GoogleDriveDownloader as gdd

__Download File from link given in Canvas__

This will be stored into your local...Do not add into git, file is too large to be pushed onto git master branch

So, we download locally

In [2]:
gdd.download_file_from_google_drive(file_id='0B04GJPshIjmPRnZManQwWEdTZjg',
                                    dest_path='/Users/mwoo/Downloads/trainingandtestdata.zip',
                                    unzip=True)

In [3]:
# gdd.download_file_from_google_drive(file_id='0B04GJPshIjmPRnZManQwWEdTZjg',
#                                     dest_path='/Users/swapnilbasu/Downloads/trainingandtestdata.zip',
#                                     unzip=True)

__Create spark session object (Data Processing)__

In [4]:
spark=SparkSession.builder.appName('classification_tweet').getOrCreate()

__Load in data__

In [5]:
training_data = spark.read.csv("/Users/mwoo/Downloads/training.1600000.processed.noemoticon.csv",header=False)

__Renaming columns__

In [6]:
training_data.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5']

In [7]:
training_data = training_data.toDF("target",'id','date','query','user_name','text')

In [8]:
training_data.columns

['target', 'id', 'date', 'query', 'user_name', 'text']

__Exploratory__

In [9]:
training_data.describe()

DataFrame[summary: string, target: string, id: string, date: string, query: string, user_name: string, text: string]

__Selecting the target value and text__

In [10]:
df = training_data.select('text','target')

In [11]:
df.show(5)

+--------------------+------+
|                text|target|
+--------------------+------+
|@switchfoot http:...|     0|
|is upset that he ...|     0|
|@Kenichan I dived...|     0|
|my whole body fee...|     0|
|@nationwideclass ...|     0|
+--------------------+------+
only showing top 5 rows



In [12]:
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- target: string (nullable = true)



We can see below that its an even split between positive and negative tweets

0: negative
4: positive

In [13]:
from pyspark.sql.functions import col
df.groupBy("target").count().orderBy(col("count").desc()).show()

+------+------+
|target| count|
+------+------+
|     0|800000|
|     4|800000|
+------+------+



__Model Pipeline__

In [14]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

__Regular Expression Tokenizer__

In [15]:
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

__Stop Words Download from NLTK__

In [16]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /Users/mwoo/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

__Stop Words Remover__

In [17]:
from nltk.corpus import stopwords
import string
sp = set(string.punctuation)
stop_words = set(stopwords.words('english'))
extra_words = {"http","https","amp","rt","t","c","the"}
for i in extra_words:
    stop_words.add(i) 
stop_words = list(stop_words)

In [18]:
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stop_words)

__Bag of words count__

This is a type of feature engineering

In [19]:
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

__StringIndexer__

This is where we create our new dataframe in spark

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(5)

+--------------------+------+--------------------+--------------------+--------------------+-----+
|                text|target|               words|            filtered|            features|label|
+--------------------+------+--------------------+--------------------+--------------------+-----+
|@switchfoot http:...|     0|[switchfoot, http...|[switchfoot, twit...|(10000,[1,10,16,6...|  0.0|
|is upset that he ...|     0|[is, upset, that,...|[upset, update, f...|(10000,[6,70,172,...|  0.0|
|@Kenichan I dived...|     0|[kenichan, i, div...|[kenichan, dived,...|(10000,[4,213,251...|  0.0|
|my whole body fee...|     0|[my, whole, body,...|[whole, body, fee...|(10000,[3,325,374...|  0.0|
|@nationwideclass ...|     0|[nationwideclass,...|[nationwideclass,...|(10000,[20,486],[...|  0.0|
+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [21]:
dataset = dataset.select('text','features','label')

__Set seed for reproducibility__

In [22]:
# (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
# print("Training Dataset Count: " + str(trainingData.count()))
# print("Test Dataset Count: " + str(testData.count()))

In [23]:
model_df = dataset.select('features','label')
model_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(10000,[1,10,16,6...|  0.0|
|(10000,[6,70,172,...|  0.0|
|(10000,[4,213,251...|  0.0|
|(10000,[3,325,374...|  0.0|
|(10000,[20,486],[...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [24]:
# trainingData = dataset
# print("Training Dataset Count: " + str(trainingData.count()))
# # print("Test Dataset Count: " + str(testData.count()))

In [42]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0).fit(model_df)
lr.save("Users/mwoo/lr_model_1")
# lrModel = lr.fit(trainingData)
#predictions = lrModel.transform(testData)
#predictions.filter(predictions['prediction'] == 0) \
#     .select("text","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

In [None]:
# predictions.printSchema()

In [None]:
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
# evaluator.evaluate(predictions)

__Format Test Data__

In [29]:
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream

ACCESS_TOKEN = "1458842253779161088-QFeO6udaAdHR4VARxaDza1w4LUlooE"
ACCESS_TOKEN_SECRET = "tC7IJDbl5T97Zvu3kE8sdGnmZWC2qxOrkdOv90YkdzIVO"
API_KEY = "KLP5ct26qaVo0KjAgP8O4j4y5"
API_KEY_SECRET = "AbxH3913WIPG0FHIwvVRomul92RWvuOdxRo2ecXR6H0Qgibo29"

auth = tweepy.OAuthHandler(API_KEY, API_KEY_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

api = tweepy.API(auth)

In [30]:
tweet_list = list()
# Subclass Stream to print IDs of Tweets received
class IDPrinter(tweepy.Stream):
    
    def on_status(self, status):
        tweet_list.append(status.text)
        #print(tweet_list)
        print(status.text)
        if len(tweet_list) == 10:
            Stream.disconnect(self)
# Initialize instance of the subclass
printer = IDPrinter(
  API_KEY, API_KEY_SECRET,
  ACCESS_TOKEN, ACCESS_TOKEN_SECRET
)

# Filter realtime Tweets by keyword
# printer.filter(track=["Spiderman"])
printer.sample(languages=['en'])

Stream connection closed by Twitter


RT @Madelei05642209: Nice weekend 

More links in bio https://t.co/EicnkmON8z
@itstylersays @valueandtime lemons &gt; rugs
RT @bvanhoovan: SCRAPPY is skinny, has lost over 20 pounds and you can see her spine... https://t.co/1ebmu0sh7e
@hndrxhours Preciate it
RT @EricaSimonee: u gon make me turn up on youuuuu
RT @THOUXANBANKY: who likes big tiddies? https://t.co/eoqX0jQOll
@TheRealForno Bailey Zappe is a borderline submarine for how under the radar he's been flying
@WORLDMUSICAWARD @VGlobalUnion @BTS_twt You too whipped for him huh?!? Omg this man is driving the world insane. 😭
@DrewLawDesign It i think it’s same accounts i report.
RT @luminecity: paimon jokes about being the goddess of protection but is she REALLY joking? the traveler has gone through multiple life or…


In [31]:
df_2 = pd.DataFrame(np.array(tweet_list))

In [32]:
df_2.columns = ['text']

In [33]:
df_2.to_csv("test.csv",index=False)

In [34]:
df_2 = spark.read.csv('test.csv',header=True)

In [35]:
df_2.show()

+--------------------+
|                text|
+--------------------+
|RT @Madelei056422...|
|More links in bio...|
|@itstylersays @va...|
|RT @bvanhoovan: S...|
|@hndrxhours Preci...|
|RT @EricaSimonee:...|
|RT @THOUXANBANKY:...|
|@TheRealForno Bai...|
|@WORLDMUSICAWARD ...|
|@DrewLawDesign It...|
|RT @luminecity: p...|
+--------------------+



In [36]:
dataset_1 = pipelineFit.transform(df_2)
dataset_1.show()

+--------------------+--------------------+--------------------+--------------------+
|                text|               words|            filtered|            features|
+--------------------+--------------------+--------------------+--------------------+
|RT @Madelei056422...|[rt, madelei05642...|[madelei05642209,...|(10000,[60,79],[1...|
|More links in bio...|[more, links, in,...|[links, bio, co, ...|(10000,[1122,1739...|
|@itstylersays @va...|[itstylersays, va...|[itstylersays, va...| (10000,[270],[1.0])|
|RT @bvanhoovan: S...|[rt, bvanhoovan, ...|[bvanhoovan, scra...|(10000,[20,154,54...|
|@hndrxhours Preci...|[hndrxhours, prec...|[hndrxhours, prec...|       (10000,[],[])|
|RT @EricaSimonee:...|[rt, ericasimonee...|[ericasimonee, u,...|(10000,[14,54,594...|
|RT @THOUXANBANKY:...|[rt, thouxanbanky...|[thouxanbanky, li...|(10000,[157,1122,...|
|@TheRealForno Bai...|[therealforno, ba...|[therealforno, ba...|(10000,[1461,6071...|
|@WORLDMUSICAWARD ...|[worldmusicaward,...|[worldmusic

In [39]:
dataset_1 = dataset_1.select('features')

In [43]:
#lr = LogisticRegression.load("Users/mwoo/lr_model_1")

Py4JJavaError: An error occurred while calling o847.load.
: java.lang.NoSuchMethodException: org.apache.spark.ml.classification.LogisticRegressionModel.<init>(java.lang.String)
	at java.base/java.lang.Class.getConstructor0(Class.java:3427)
	at java.base/java.lang.Class.getConstructor(Class.java:2165)
	at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:468)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:832)


In [44]:
model_predictions = lr.transform(dataset_1)

In [45]:
model_predictions.show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(10000,[60,79],[1...|[-0.4281486250091...|[0.39456850936686...|       1.0|
|(10000,[1122,1739...|[-0.1648060284513...|[0.45889149655844...|       1.0|
| (10000,[270],[1.0])|[-0.0885826444741...|[0.47786880873173...|       1.0|
|(10000,[20,154,54...|[0.21449568197955...|[0.55341926613738...|       0.0|
|       (10000,[],[])|[-0.0928704763649...|[0.47679905404779...|       1.0|
|(10000,[14,54,594...|[0.12539457939191...|[0.53130763268025...|       0.0|
|(10000,[157,1122,...|[-0.3916937325633...|[0.40330963564820...|       1.0|
|(10000,[1461,6071...|[-0.0827977819452...|[0.47931237178605...|       1.0|
|(10000,[118,140,2...|[-0.1839235009762...|[0.45414830730536...|       1.0|
|(10000,[27,1493,2...|[0.18043210896929...|[0.54498604721907...|       0.0|
|(10000,[19,