                                                               
#                Sentiment Prediction                           
#                Team: BDII                                   
#                Course:EECS6893                      
#                Columbia University                   

In [1]:
# Note；This part is implemented on databricks

# Sentiment Classifier Model Training

# Import Training Data

In [3]:
path = "dbfs:/FileStore/tables/Sentiment_Analysis_Dataset-08265.csv"

In [4]:
data = sqlContext.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .option("delimiter", ",")\
  .load(path)

data.cache()
data=data.dropna()
display(data)

In [5]:
data.count()
sqlContext.registerDataFrameAsTable(data, "table1")
df2 = sqlContext.sql("SELECT Sentiment, count(*) from table1 group by Sentiment")
df2.show()

# Tokenizer

In [6]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="SentimentText", outputCol="words")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(data)
tokenized.select("SentimentText", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=True)

# Regex Tokenizer

In [7]:
from pyspark.ml.feature import RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="SentimentText", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(data)
regexTokenized.select("words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

# Stop Words Remover

In [8]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered_raw")
removed = remover.transform(regexTokenized)
removed.select("filtered_raw").show(truncate=False)

In [9]:
remover_adv = StopWordsRemover(inputCol="filtered_raw", outputCol="filtered", stopWords=["","1","2","3","4","5","6","7","8","9","m","kaggle","etc","though","man","too","so","rain","shower","000","http",
                                                                                        "day", "quot","com","im","it","get","bit","see"])
removed_adv = remover_adv.transform(removed)
removed_adv.dropna()
removed_adv.select("filtered").show(truncate=False)

In [10]:
all_words = removed_adv.select("filtered")
display(all_words)

# Hashing TF-IDF

In [11]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
featurizedData = hashingTF.transform(removed_adv)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

#rescaledData.select("title", "features").show()
svms = rescaledData.selectExpr("Sentiment as label", "features")
svms.dropna()
svms.show()

# Training Naive Bayes Model

In [12]:
from pyspark.ml.classification import NaiveBayes,NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils
splits = svms.randomSplit([0.85, 0.15],12)  
train = splits[0]  
test = splits[1]  

nb = NaiveBayes(smoothing=1.0, modelType="multinomial") 
model = nb.fit(train)

result = model.transform(test)  
predictionAndLabels = result.select("prediction", "label")  
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")  

print("Accuracy: " + str(evaluator.evaluate(predictionAndLabels)))  

# Sentiment Prediction

# Modularize Class - tweet_sentiment

In [14]:
class tweet_sentiment():
  
  def __init__(self, path_x):
    self.dataframe=None
    self.regexTokenized=None
    self.stop_removed=None
    self.tweet_removed_adv=None
    self.tweet_svms=None
    self.model=None
    self.positive=0
    self.negative=0
    self.path=path_x
  
  def data_loading(self):
    tweet_data = sqlContext.read.format("csv")\
    .option("header", "false")\
    .option("inferSchema", "true")\
    .option("delimiter", ",")\
    .load(self.path)
    tweet_data.cache()
    tweet_data=tweet_data.dropna()
    self.dataframe=tweet_data
    return tweet_data
  
  def nb_model(self, nb_model):
    self.model=nb_model
    
  def row_count(self):
    return self.dataframe.count()
  
  def regex_tokenize(self):
    from pyspark.ml.feature import RegexTokenizer
    from pyspark.ml.feature import Tokenizer
    from pyspark.sql.functions import col, udf
    from pyspark.sql.types import IntegerType
    tweet_regexTokenizer = RegexTokenizer(inputCol="_c2", outputCol="words", pattern="\\W")

    #tweet_countTokens = udf(lambda words: len(words), IntegerType())

    self.regexTokenized = tweet_regexTokenizer.transform(self.dataframe)
    return self.regexTokenized
    
  def regex_tokenize_show(self, truncation=True):  
    self.regexTokenized.select("words").show(truncate=truncation)
  
  def stop_words_remove(self, stop=[]):
    from pyspark.ml.feature import StopWordsRemover

    tweet_remover = StopWordsRemover(inputCol="words", outputCol="filtered_raw")
    tweet_removed = tweet_remover.transform(self.regexTokenized)
    self.stop_removed = tweet_removed
    
    tweet_remover_adv = StopWordsRemover(inputCol="filtered_raw", outputCol="filtered", stopWords=stop)
    self.tweet_removed_adv = tweet_remover_adv.transform(tweet_removed)
    self.tweet_removed_adv = self.tweet_removed_adv.dropna()
    return self.tweet_removed_adv
  
  def word2vector(self, feature_num=10000):
    from pyspark.ml.feature import HashingTF, IDF

    tweet_hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=feature_num)
    tweet_featurizedData = tweet_hashingTF.transform(self.tweet_removed_adv)

    tweet_idf = IDF(inputCol="rawFeatures", outputCol="features")
    tweet_idfModel = tweet_idf.fit(tweet_featurizedData)
    tweet_rescaledData = tweet_idfModel.transform(tweet_featurizedData)

    tweet_svms = tweet_rescaledData.selectExpr("features")
    self.tweet_svms=tweet_svms.dropna()
    
  def nb_predicting(self):
    from pyspark.ml.classification import NaiveBayes,NaiveBayesModel
    from pyspark.mllib.util import MLUtils

    self.tweet_result = self.model.transform(self.tweet_svms)  
    #tweet_predictionAndLabels = tweet_result.select("prediction")   
  
  def nb_predicting_show(self):
    self.tweet_result.select("prediction","features").show(truncate=True)
    
  def result(self):
    sqlContext.registerDataFrameAsTable(self.tweet_result, "table1")
    self.positive = sqlContext.sql("SELECT count(*) from table1 where prediction=1 group by prediction")
    self.negative = sqlContext.sql("SELECT count(*) from table1 where prediction=0 group by prediction")
  

# Brand - Coca-cola

# Loading Data - Coca-Cola

In [17]:
path_list=["dbfs:/FileStore/tables/53m9hj5w1507394714906/1_1-d7876.txt", "dbfs:/FileStore/tables/53m9hj5w1507394714906/1_2-b8a26.txt", "dbfs:/FileStore/tables/53m9hj5w1507394714906/2_1-6a057.txt", "dbfs:/FileStore/tables/53m9hj5w1507394714906/2_2-08a2a.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/3_1-3ead6.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/3_2-80939.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/4_1-fa2a9.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/4_2-a3648.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/5_1-377eb.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/5_2-dce00.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/6_1-486e2.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/6_2-45e97.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/7_1-b307e.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/7_2-74996.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/8_1-b4602.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/8_2-dc3bd.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/9_1-9fbd5.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/9_2-53f30.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/10_1-d72c3.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/10_2-953fe.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/11_1-10e51.txt","dbfs:/FileStore/tables/53m9hj5w1507394714906/11_2-f6fb2.txt"]
path_account = len(path_list)
print("file account=", path_account)

In [18]:
stopWordslist=['http','Coca-Cola','Cola','Coca',  'Coke','cola','CocaCola','https','coca','Soft','Plush','Stuffed','Animal','Teddy','http://www','via','YouTube','ebay.to/2isSz6e','https://www','COLA','COCA','video']

In [19]:
result_dict = {}
for i in range(path_account):
  result_dict[i]={'positive':0, 'negative':0}
print result_dict

In [20]:
test = tweet_sentiment(path_list[0])
test_df=test.data_loading()
test_df.take(20)

# Processing - Coca-Cola

In [21]:
now_file = 0
while now_file<path_account:
  test = None
  test = tweet_sentiment(path_list[now_file])
  test_df=test.data_loading()
  test_df.take(2)
  #print(test.row_count())
  test_regex_tokenized = test.regex_tokenize()
  #test.regex_tokenize_show(truncation=False)
  test_stopremoved=test.stop_words_remove(stop=stopWordslist)
  #test_stopremoved.select("filtered").show(truncate=False)
  test_svms=test.word2vector(feature_num=10000)
  #test.tweet_svms.show()
  test.nb_model(model)
  test.nb_predicting()
  test.result()
  posres=test.positive.head(1)
  postest_result = int(posres[0][0])
  negres=test.negative.head(1)
  negtest_result = int(negres[0][0])
  result_dict[now_file]={'positive':postest_result, 'negative':negtest_result}
  now_file += 1

# Result and Statistics

In [22]:
print(result_dict)

In [23]:
sample_no = 0
pos_res_in_month = []
neg_res_in_month = []
while sample_no<path_account:
  if sample_no%2==0:
    pos_res_in_month.append(result_dict[sample_no]['positive'])
    neg_res_in_month.append(result_dict[sample_no]['negative'])
  else:
    pos_res_in_month[-1]+=result_dict[sample_no]['positive']
    neg_res_in_month[-1]+=result_dict[sample_no]['negative']
  sample_no+=1
print("positive:", pos_res_in_month)
print("negative", neg_res_in_month)

In [24]:
ratio_res = []
for mon_cur in range(11):
  ratio_res.append("%3f"%(float(pos_res_in_month[mon_cur])/float(neg_res_in_month[mon_cur])))
print(ratio_res)

# Brand - Pepsi

In [27]:
path_list2=["dbfs:/FileStore/tables/0h7dq7vp1507485401798/1_1-d7876.txt","dbfs:/FileStore/tables/0h7dq7vp1507485401798/1_2-b8a26.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/2_1-6a057.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/2_2-08a2a.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/3_1-3ead6.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/3_2-80939.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/4_1-fa2a9.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/4_2-a3648.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/5_1-377eb.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/5_2-dce00.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/6_1-486e2.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/6_2-45e97.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/7_1-b307e.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/7_2-74996.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/8_1-b4602.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/8_2-dc3bd.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/9_1-9fbd5.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/9_2-53f30.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/10_1-d72c3.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/10_2-953fe.txt", "dbfs:/FileStore/tables/0h7dq7vp1507485401798/11_1-10e51.txt",
          "dbfs:/FileStore/tables/0h7dq7vp1507485401798/11_2-f6fb2.txt"]
path_account2 = len(path_list2)
print("file account=", path_account2)

In [28]:
stopWordslist2=['Pepsi','pepsi','http','https','like','Coke','can','drink','coke','PEPSI','get','https://www','YouTube','via','need','cola','video','time']

In [29]:
result_dict2 = {}
for i in range(path_account2):
  result_dict2[i]={'positive':0, 'negative':0}
print result_dict2

In [30]:
test = tweet_sentiment(path_list2[0])
test_df=test.data_loading()
test_df.take(20)

In [31]:
now_file = 0
while now_file<path_account2:
  test = None
  test = tweet_sentiment(path_list2[now_file])
  test_df=test.data_loading()
  #test_df.show()
  #print(test.row_count())
  test_regex_tokenized = test.regex_tokenize()
  #test.regex_tokenize_show(truncation=False)
  test_stopremoved=test.stop_words_remove(stop=stopWordslist2)
  #test_stopremoved.select("filtered").show(truncate=False)
  test_svms=test.word2vector(feature_num=10000)
  #test.tweet_svms.show()
  test.nb_model(model)
  test.nb_predicting()
  test.result()
  posres=test.positive.head(1)
  postest_result = int(posres[0][0])
  negres=test.negative.head(1)
  negtest_result = int(negres[0][0])
  result_dict2[now_file]={'positive':postest_result, 'negative':negtest_result}
  now_file += 1

In [32]:
print(result_dict2)

In [33]:
sample_no = 0
pos_res_in_month2 = []
neg_res_in_month2 = []
while sample_no<path_account:
  if sample_no%2==0:
    pos_res_in_month2.append(result_dict2[sample_no]['positive'])
    neg_res_in_month2.append(result_dict2[sample_no]['negative'])
  else:
    pos_res_in_month2[-1]+=result_dict2[sample_no]['positive']
    neg_res_in_month2[-1]+=result_dict2[sample_no]['negative']
  sample_no+=1
print("positive:", pos_res_in_month2)
print("negative", neg_res_in_month2)

In [34]:
ratio_res2 = []
for mon_cur2 in range(11):
  ratio_res2.append("%3f"%(float(pos_res_in_month2[mon_cur2])/float(neg_res_in_month2[mon_cur2])))
print(ratio_res2)