In [None]:
!pip install pyspark

In [None]:
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import monotonically_increasing_id 
from nltk.stem import WordNetLemmatizer
from pyspark.ml import Pipeline
import nltk
import re
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf
from  pyspark.sql.functions import regexp_replace
#from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

In [None]:
nltk.download('wordnet')

In [None]:
conf=SparkConf().setAppName("SparkStreaming")
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")
conf.set("spark.cores.max", "2")
sc=SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

In [None]:
my_schema = tp.StructType([
  tp.StructField(name= 'sentiment', dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'ids', dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'date', dataType= tp.StringType(),   nullable= True),
  tp.StructField(name= 'flag', dataType= tp.StringType(),   nullable= True),
  tp.StructField(name= 'user', dataType= tp.StringType(),   nullable= True),
  tp.StructField(name= 'text', dataType= tp.StringType(),   nullable= True)
])

In [None]:
my_data = spark.read.csv('twitter_data.csv',schema=my_schema,header=True)

In [None]:
my_data.select('sentiment').where(my_data['sentiment']=="0").count()

In [None]:
my_data=my_data.replace("4","1")

In [None]:
my_data.select('sentiment').where(my_data['sentiment']=="1").count()

In [None]:
my_data.show(5)

In [None]:
from pyspark.sql.functions import rand 
my_data=my_data.orderBy(rand())
my_data=my_data.withColumn("label",my_data["sentiment"].cast(tp.IntegerType()))

In [None]:
my_data.show(10)

In [None]:
my_data=my_data.select(["text","label"])

In [None]:
my_data=my_data.select("*").withColumn("id",monotonically_increasing_id())
my_data=my_data.withColumn("id",my_data["id"].cast(tp.IntegerType()))

In [None]:
my_data.filter(my_data['id']>0).show(5)

In [None]:
my_data.count()

In [None]:
# Preprocessing Text. 
# LowerCasing->This erases confusion without the text losing any meaning 
# For clarity, replace words like http,https,or www by URL 
# Replacing emojis with words so that it can be used for feature extraction 
# Replace username with the word USER 
# Removing non-alphabets
# Remove repitive letter >=3 for reducing redudandency 
# Removing short words->Remove words with less than length 2 because they are mostly irrelevant
# Remove stop words:->Does not add much meaning to the whole sentence so it can be ignored. 
# Lemmantizing=>convert word to its base form to have a concise bag of words and prevent overfitting. 

emojis={':)': 'smile', ':-)': 'smile', ';d': 'wink', ':-E': 'vampire', ':(': 'sad', 
          ':-(': 'sad', ':-<': 'sad', ':P': 'raspberry', ':O': 'surprised',
          ':-@': 'shocked', ':@': 'shocked',':-$': 'confused', ':\\': 'annoyed', 
          ':#': 'mute', ':X': 'mute', ':^)': 'smile', ':-&': 'confused', '$_$': 'greedy',
          '@@': 'eyeroll', ':-!': 'confused', ':-D': 'smile', ':-0': 'yell', 'O.o': 'confused',
          '<(-_-)>': 'robot', 'd[-_-]b': 'dj', ":'-)": 'sadsmile', ';)': 'wink', 
          ';-)': 'wink', 'O:-)': 'angel','O*-)': 'angel','(:-D': 'gossip', '=^.^=': 'cat'}

## Defining set containing all stopwords in english.
stopwordlist = ['a', 'about', 'above', 'after', 'again', 'ain', 'all', 'am', 'an',
             'and','any','are', 'as', 'at', 'be', 'because', 'been', 'before',
             'being', 'below', 'between','both', 'by', 'can', 'd', 'did', 'do',
             'does', 'doing', 'down', 'during', 'each','few', 'for', 'from', 
             'further', 'had', 'has', 'have', 'having', 'he', 'her', 'here',
             'hers', 'herself', 'him', 'himself', 'his', 'how', 'i', 'if', 'in',
             'into','is', 'it', 'its', 'itself', 'just', 'll', 'm', 'ma',
             'me', 'more', 'most','my', 'myself', 'now', 'o', 'of', 'on', 'once',
             'only', 'or', 'other', 'our', 'ours','ourselves', 'out', 'own', 're',
             's', 'same', 'she', "shes", 'should', "shouldve",'so', 'some', 'such',
             't', 'than', 'that', "thatll", 'the', 'their', 'theirs', 'them',
             'themselves', 'then', 'there', 'these', 'they', 'this', 'those', 
             'through', 'to', 'too','under', 'until', 'up', 've', 'very', 'was',
             'we', 'were', 'what', 'when', 'where','which','while', 'who', 'whom',
             'why', 'will', 'with', 'won', 'y', 'you', "youd","youll", "youre",
             "youve", 'your', 'yours', 'yourself', 'yourselves']

In [None]:
def preprocess_sentiments(tweet): 
    wordLemm=WordNetLemmatizer()
    url_pattern=r"((https://)[^ ]*|(http://)[^ ]*|(www\.)[^ ]*)"
    user_pattern="@[^\s]+"
    alpha_pattern="[^a-zA-Z0-9]"
    sequence_pattern=r"(.)\1\1+"
    sequence_replace_pattern= r"\1\1"
    if type(tweet)==str: 
      tweet=tweet.lower()
    tweet=re.sub(url_pattern,' URL',tweet)
    for emoji in emojis.keys(): 
        tweet=tweet.replace (emoji,"EMOJ"+emojis[emoji])
    tweet=re.sub(user_pattern,' USER',tweet)
    tweet=re.sub(alpha_pattern," ",tweet)
    tweet=re.sub(sequence_pattern,sequence_replace_pattern,tweet)

    tweet_words=""
    for word in tweet.split(' '): 
        if len(word)>1 and word not in stopwordlist: 
            word=wordLemm.lemmatize(word)
            tweet_words+=(word+' ')
    return tweet_words

In [None]:
preprocess_udf = udf(preprocess_sentiments,tp.StringType())
my_data=my_data.withColumn("text",preprocess_udf("text"))

In [None]:
my_data.show(5)

In [None]:
train_data=my_data.filter(my_data['id']<8000)
test_data=my_data.filter(my_data['id']>=8000)

In [None]:
train_data.count()

In [None]:
test_data.count()

In [None]:
nltk.download('wordnet')

In [None]:
train_data.show(5)

In [None]:
train_data.select('label').where(train_data['label']==1).count()

In [None]:
#stages for setting up the machine learning pipeline.
stage=RegexTokenizer(inputCol= 'text', outputCol= 'tokens', pattern= '\\W')
stage_1=StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_text')
stage_2=Word2Vec(inputCol='filtered_text', outputCol= 'vector', vectorSize=100)
lr=LogisticRegression(featuresCol='vector', labelCol= 'label',maxIter=10, regParam=0.001)
pipeline1=Pipeline(stages=[stage,stage_1,stage_2, lr])
model=pipeline1.fit(train_data)

In [None]:
model.save(F"/content/gdrive/My Drive//mlmodel1")

In [None]:
test = spark.createDataFrame([
   (1,"I love you"),
   (2,"I am so happy"),
   (5,"I hate good mood"),
], ["id", "text"])


In [None]:
train_data.select('label').where(train_data['label']>0).show(5)

In [None]:
prediction1=model.transform(test_data)
prediction1.select("prediction")

In [None]:
prediction1.show(10)

In [None]:
prediction1=model.transform(train_data)
pred=prediction1.select("prediction")

In [None]:
pred_lis=[pred[i].prediction for i in range(len(pred.collect())] 

In [None]:
true=train_data.select("label").collect()

In [None]:
true_lis=[pred[i].prediction for i in range(len(true))] 

In [None]:
right=0
for idx in range(len(true_lis)): 
  if true_lis[idx]==pred_lis[idx]: 
    right+=1
print(right/len(true_lis))


In [None]:
prediction = model.transform(test)

In [None]:
 prediction.select( "text", "prediction").collect()

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
model.save(F"/content/gdrive/My Drive/mlmod")

In [None]:
model.save(F"/content/gdrive/My Drive//mlmodel")

In [None]:
stage_3=IDF(inputCol="rawFeatures", outputCol="features")