In [1]:
from pyspark import SparkFiles
import twitter
import os
from config import *
import pandas as pd
import numpy as np


# initialize api instance
twitter_api = twitter.Api(consumer_key=consumer_key,
                         consumer_secret=consumer_secret,
                         access_token_key=access_token_key,
                         access_token_secret=access_token_secret)

#print(consumer_key,consumer_secret,access_token_key,access_token_secret)

# test authentication
print(twitter_api.VerifyCredentials())

{"created_at": "Wed Jul 20 02:34:27 +0000 2016", "description": "I am Al and I am passionate about technology.", "followers_count": 5, "friends_count": 52, "id": 755591669194514432, "id_str": "755591669194514432", "location": "Tustin, CA", "name": "Al", "profile_background_color": "000000", "profile_background_image_url": "http://abs.twimg.com/images/themes/theme1/bg.png", "profile_background_image_url_https": "https://abs.twimg.com/images/themes/theme1/bg.png", "profile_banner_url": "https://pbs.twimg.com/profile_banners/755591669194514432/1469041661", "profile_image_url": "http://pbs.twimg.com/profile_images/755843006062694401/2JikSpJq_normal.jpg", "profile_image_url_https": "https://pbs.twimg.com/profile_images/755843006062694401/2JikSpJq_normal.jpg", "profile_link_color": "1B95E0", "profile_sidebar_border_color": "000000", "profile_sidebar_fill_color": "000000", "profile_text_color": "000000", "screen_name": "alknowstech", "status": {"created_at": "Mon Jul 31 21:38:46 +0000 2017", 

In [2]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

# Get the default configurations
spark.sparkContext._conf.getAll()

# Update the default configurations
conf = spark.sparkContext._conf.setAll([("spark.executor.heartbeatInterval", "300000"),
                                       ("spark.network.timeout", "400000"),('spark.executor.memory', '6g'),
                                        ('spark.app.name', 'Spark Updated Conf'),
                                        ('spark.executor.cores', '6', ('spark.cores.max', '6'), 
                                        ('spark.driver.memory','6g')])

# Stop the current Spark Session
spark.sparkContext.stop()

# Create a Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [None]:
traindata140 = os.path.join(".","trainingandtestdata_sentiment140","training.csv")

train_df = pd.read_csv(traindata140, header=None, usecols=[0,5], names=['polarity of the tweet', 'text'], 
                       encoding="ISO-8859-1")
train_df.head()
# (0 = negative, 2 = neutral, 4 = positive)

In [2]:
# Test Data
testdata140 = os.path.join(".","trainingandtestdata_sentiment140","test.csv")

test_df = pd.read_csv(testdata140, header=None, usecols=[0,5], names=['polarity of the tweet', 'text'], 
                       encoding="ISO-8859-1")
test_df.head()
# (0 = negative, 2 = neutral, 4 = positive)

conditions_test = [
    (test_df['polarity of the tweet'] == 0),
    (test_df['polarity of the tweet'] == 2),
    (test_df['polarity of the tweet'] == 4)]
choices_test = ['negative', 'neutral', 'positive']
test_df['class'] = np.select(conditions_test, choices_test)
test_df.head()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# convert pandas DF to spark Df
spark_df_test = sqlContext.createDataFrame(test_df.iloc[:,1:3])
spark_df_test.show()

+--------------------+--------+
|                text|   class|
+--------------------+--------+
|@stellargirl I lo...|positive|
|Reading my kindle...|positive|
|Ok, first assesme...|positive|
|@kenburbary You'l...|positive|
|@mikefish  Fair e...|positive|
|@richardebaker no...|positive|
|Fuck this economy...|negative|
|Jquery is my new ...|positive|
|       Loves twitter|positive|
|how can you not l...|positive|
|Check this video ...| neutral|
|@Karoli I firmly ...|negative|
|House Corresponde...|positive|
|Watchin Espn..Jus...|positive|
|dear nike, stop w...|negative|
|#lebron best athl...|positive|
|I was talking to ...|negative|
|i love lebron. ht...|positive|
|@ludajuice Lebron...|negative|
|@Pmillzz lebron I...|positive|
+--------------------+--------+
only showing top 20 rows



In [None]:
conditions = [
    (train_df['polarity of the tweet'] == 0),
    (train_df['polarity of the tweet'] == 2),
    (train_df['polarity of the tweet'] == 4)]
choices = ['negative', 'neutral', 'positive']
train_df['class'] = np.select(conditions, choices)
train_df.head()

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# convert pandas DF to spark Df
spark_df = sqlContext.createDataFrame(train_df.iloc[:,1:3])
spark_df.show()

In [3]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = spark_df_test.withColumn('length', length(spark_df_test['text']))
data_df.show()

+--------------------+--------+------+
|                text|   class|length|
+--------------------+--------+------+
|@stellargirl I lo...|positive|   111|
|Reading my kindle...|positive|    58|
|Ok, first assesme...|positive|    58|
|@kenburbary You'l...|positive|   140|
|@mikefish  Fair e...|positive|    75|
|@richardebaker no...|positive|    67|
|Fuck this economy...|negative|    61|
|Jquery is my new ...|positive|    29|
|       Loves twitter|positive|    13|
|how can you not l...|positive|    57|
|Check this video ...| neutral|   101|
|@Karoli I firmly ...|negative|   140|
|House Corresponde...|positive|   106|
|Watchin Espn..Jus...|positive|    98|
|dear nike, stop w...|negative|    95|
|#lebron best athl...|positive|   135|
|I was talking to ...|negative|   136|
|i love lebron. ht...|positive|    34|
|@ludajuice Lebron...|negative|    74|
|@Pmillzz lebron I...|positive|    27|
+--------------------+--------+------+
only showing top 20 rows



### Feature Transformations


In [4]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [6]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [7]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [8]:
cleaned.show(1)

+--------------------+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|   class|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|@stellargirl I lo...|positive|   111|  0.0|[@stellargirl, i,...|[@stellargirl, lo...|(262144,[7877,158...|(262144,[7877,158...|(262145,[7877,158...|
+--------------------+--------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



In [9]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[7877,158...|
|  0.0|(262145,[13342,15...|
|  0.0|(262145,[9639,821...|
|  0.0|(262145,[7877,158...|
|  0.0|(262145,[14837,24...|
|  0.0|(262145,[7877,158...|
|  1.0|(262145,[16474,21...|
|  0.0|(262145,[15889,29...|
|  0.0|(262145,[203466,2...|
|  0.0|(262145,[36073,91...|
|  2.0|(262145,[57341,10...|
|  1.0|(262145,[9287,225...|
|  0.0|(262145,[5381,255...|
|  0.0|(262145,[25570,29...|
|  1.0|(262145,[9639,158...|
|  0.0|(262145,[9616,963...|
|  1.0|(262145,[2437,538...|
|  0.0|(262145,[11850,24...|
|  1.0|(262145,[15889,36...|
|  0.0|(262145,[15889,42...|
+-----+--------------------+
only showing top 20 rows



In [10]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
# training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(cleaned)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 50803)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most re

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50776)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-1b40f3396547>", line 7, in <module>
    predictor = nb.fit(cleaned)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.nam

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50776)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-1b40f3396547>", line 7, in <module>
    predictor = nb.fit(cleaned)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.nam

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50776)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-1b40f3396547>", line 7, in <module>
    predictor = nb.fit(cleaned)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/Cellar/apache-spark/2.4.4/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.nam

Py4JError: An error occurred while calling o262.fit

In [None]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

In [None]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)