In [33]:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [34]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [35]:
#Tweet preprocessing and sentiment analysis
#Import the necessary packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
import json

import pickle
import tensorflow.keras.models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding
from keras.layers.recurrent import LSTM
from tensorflow.keras.layers import Input, Dense, Embedding, SpatialDropout1D, add, concatenate
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.preprocessing.sequence import pad_sequences
#Text Preprocessing
import re
from nltk.corpus import stopwords
import numpy as np

import spacy
from spacy import displacy
from collections import Counter
import en_core_web_sm
import re
import string
from nltk import PorterStemmer
from pathlib import Path

In [36]:
jsonSchema = StructType([ StructField("tweet", StringType(), True), StructField("user", StringType(), True), StructField("tweet_id", StringType(), True) ])

In [37]:
ROOT_DIR = "TwitterSupportML/"

In [38]:

class BinaryInference:

      def __init__(self):
          self.load_models()
        
      def clean_text(self, txt):

          """
          removing all hashtags , punctuations, stop_words  and links, also stemming words 
          """
          from nltk.corpus import stopwords
          txt = txt.lower()
          txt = re.sub(r"(?<=\w)nt", "not",txt) #change don't to do not cna't to cannot 
          txt = re.sub(r"(@\S+)", "", txt)  # remove hashtags
          txt = re.sub(r'\W', ' ', str(txt)) # remove all special characters including apastrophie 
          txt = txt.translate(str.maketrans('', '', string.punctuation)) # remove punctuations 
          txt = re.sub(r'\s+[a-zA-Z]\s+', ' ', txt)   # remove all single characters (it's -> it s then we need to remove s)
          txt = re.sub(r'\s+', ' ', txt, flags=re.I) # Substituting multiple spaces with single space
          txt = re.sub(r"(http\S+|http)", "", txt) # remove links 
          txt = ' '.join([PorterStemmer().stem(word=word) for word in txt.split(" ") if word not in stopwords.words('english') ]) # stem & remove stop words
          txt = ''.join([i for i in txt if not i.isdigit()]).strip() # remove digits ()
          return txt

      def get_model(self):
          max_fatures = 2000
          embed_dim = 128
          lstm_out = 196
          input_len = 32
          model = Sequential()
          model.add(Embedding(max_fatures, embed_dim,input_length = input_len))
          model.add(SpatialDropout1D(0.4))
          model.add(LSTM(lstm_out, dropout=0.2, recurrent_dropout=0.2))
          model.add(Dense(2,activation='softmax'))
          model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
          return model


      def load_models(self):
          with open(ROOT_DIR+'model_binaryclass/tokenizerBinaryClassification.pickle', 'rb') as handle:
              self.tokenizer = pickle.load(handle)

          self.model = self.get_model()
          self.model.load_weights(ROOT_DIR+"model_binaryclass/binaryClassificationModel.h5")


      def predict_complaint(self, text):

          #vectorizing the tweet by the pre-fitted tokenizer instance
          text = self.clean_text(text)
          twt = self.tokenizer.texts_to_sequences([text])
          #padding the tweet to have exactly the same shape as `embedding_2` input
          twt = pad_sequences(twt, maxlen=28, dtype='int32', value=0)
          complain = self.model.predict(twt,batch_size=1,verbose = 0)[0]
          if(np.argmax(complain) == 0):
              print("negative")
              return True
          elif (np.argmax(complain) == 1):
              print("positive")
              return False


In [39]:
# Complaint Category

class MulticlassComplainInference:

      def __init__(self):
          self.load_models()

      def get_model(self):
          # The maximum number of words to be used. (most frequent)
          MAX_NB_WORDS = 50000
          # Max number of words in each complaint.
          MAX_SEQUENCE_LENGTH = 250
          # This is fixed.
          EMBEDDING_DIM = 100
          model = Sequential()
          model.add(Embedding(MAX_NB_WORDS, EMBEDDING_DIM, input_length=MAX_SEQUENCE_LENGTH))
          model.add(SpatialDropout1D(0.2))
          model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
          model.add(Dense(8, activation='softmax'))
          model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
          return model


      def load_models(self):
          with open(ROOT_DIR+'model_multiclass/tokenizerMulticlassComplaintClassification.pickle', 'rb') as handle:
              self.tokenizer = pickle.load(handle)

          self.model = self.get_model()
          self.model.load_weights(ROOT_DIR+"model_multiclass/multiclassComplaintClassifier.h5")

      def clean_text(self, txt):

          """
          removing all hashtags , punctuations, stop_words  and links, also stemming words 
          """
          from nltk.corpus import stopwords
          txt = txt.lower()
          txt = re.sub(r"(?<=\w)nt", "not",txt) #change don't to do not cna't to cannot 
          txt = re.sub(r"(@\S+)", "", txt)  # remove hashtags
          txt = re.sub(r'\W', ' ', str(txt)) # remove all special characters including apastrophie 
          txt = txt.translate(str.maketrans('', '', string.punctuation)) # remove punctuations 
          txt = re.sub(r'\s+[a-zA-Z]\s+', ' ', txt)   # remove all single characters (it's -> it s then we need to remove s)
          txt = re.sub(r'\s+', ' ', txt, flags=re.I) # Substituting multiple spaces with single space
          txt = re.sub(r"(http\S+|http)", "", txt) # remove links 
          txt = ' '.join([PorterStemmer().stem(word=word) for word in txt.split(" ") if word not in stopwords.words('english') ]) # stem & remove stop words
          txt = ''.join([i for i in txt if not i.isdigit()]).strip() # remove digits ()
          return txt

      def predict_complaint_type(self, text):
          MAX_SEQUENCE_LENGTH = 250
          new_tweet = self.clean_text(text)
          seq = self.tokenizer.texts_to_sequences([new_tweet])
          padded = pad_sequences(seq, maxlen=MAX_SEQUENCE_LENGTH)
          pred = self.model.predict(padded)
          labels = ['Reschedule and Refund', 'Baggage Issue','Phone and Online Booking', 'Extra Charges', 
                    'Delay and Customer Service', 'Seating Preferences', 'Reservation Issue', 'Customer Experience']
          print(pred, labels[np.argmax(pred)])
          return labels[np.argmax(pred)]

In [40]:
class NameEntities:

      def __init__(self):

          # ROOT_DIR = ""
          self.nlp = en_core_web_sm.load() # Load the saved model and predict
          output_dir = Path(ROOT_DIR+'model_NER/')
          print("Loading from", output_dir)
          self.nlp_updated = spacy.load(output_dir)

      def clean_text(self, txt):
          """
          removing all hashtags , punctuations, stop_words  and links, also stemming words 
          """
          from nltk.corpus import stopwords
          txt = " ".join([self.camel_case_split(t) for t in txt.split(" ")])
          txt = re.sub(r"(?<=\w)nt", "not",txt) #change don't to do not cna't to cannot 
          txt = re.sub(r'\W', ' ', str(txt)) # remove all special characters including apastrophie 
          txt = txt.translate(str.maketrans('', '', string.punctuation)) # remove punctuations 
          txt = re.sub(r'\s+[a-zA-Z]\s+', ' ', txt)   # remove all single characters (it's -> it s then we need to remove s)
          txt = re.sub(r'\s+', ' ', txt, flags=re.I) # Substituting multiple spaces with single space
          txt = re.sub(r"(http\S+|http)", "", txt) # remove links 
          return txt


      def camel_case_split(self, str):
        words = [[str[0]]]

        for c in str[1:]:
            if words[-1][-1].islower() and c.isupper():
                words.append(list(c))
            else:
                words[-1].append(c)

        return " ".join([''.join(word) for word in words])


      def get_Entities(self, text):
          text = self.clean_text(text)
          doc = self.nlp_updated(text)
          labels = [(X.text, X.label_) for X in doc.ents]

          doc = self.nlp(text)
          labels_norm = [(X.text, X.label_) for X in doc.ents]
          labels.extend(labels_norm)

          return labels



In [41]:
def check_complain(text):
    bi = BinaryInference()
    return str(bi.predict_complaint(text))

In [42]:
def check_complain_type(text):
    multi = MulticlassComplainInference()
    return str(multi.predict_complaint_type(text))

In [43]:
def get_name_entities(text):
    name_entity = NameEntities()
    return json.dumps(dict(name_entity.get_Entities(text)))

In [51]:
import requests

def reply_to_tweet(tweetId, complain, complain_type, entities, user):
    # api-endpoint
    URL = "http://localhost:5000"

    # defining a params dict for the parameters to be sent to the API
#     PARAMS = {'tweetId':tweetId, "complain" : complain, "complain_type" : complain_type, "entities" : entities, "user" : user}

    # sending get request and saving the response as response object
#     r = requests.post(url = URL, data = PARAMS)

    # extracting data in json format
#     data = r.json()
    data = {"replied" : True, "tickedId" : 12345}
    if data['replied']:
        return str(data["tickedId"])
    else:
        return ""

In [52]:
def label_data(words):
    # complaint detection udf
    check_complain_udf = udf(check_complain, StringType())
    words = words.withColumn("complain", check_complain_udf(col("tweet")))
    
    # complaint category udf
    check_complain_type_udf = udf(check_complain_type, StringType())
    words = words.withColumn("complain_type", check_complain_type_udf(col("tweet")))
    
    # entity detection udf
    get_name_entities_udf = udf(get_name_entities, StringType())
    words = words.withColumn("entities", get_name_entities_udf(col("tweet")))
    
    reply_to_tweet_udf = udf(reply_to_tweet, StringType())
    words = words.withColumn("tickedId", reply_to_tweet_udf(col("tweet_id"), col("complain"), col("complain_type"), col("entities"), col("user")))
    
    return words

In [54]:
if __name__ == "__main__":
    spark = SparkSession.builder.appName('SampleTWEETOperation').getOrCreate()

    #read json from text file
    dfFromTxt=spark.read.text("my_data.txt")

    lines = dfFromTxt.selectExpr("CAST(value AS STRING) as json")

    lines = lines.withColumn("jsonData", from_json(col("json"), jsonSchema))\
                   .select("jsonData.*")
    lines.show()
    lines.printSchema()
    lines.show(truncate=False)
    
    lines = label_data(lines)
    lines.show()
    
    

+--------------------+---------+-------------------+
|               tweet|     user|           tweet_id|
+--------------------+---------+-------------------+
|RT @Bob_in_NYorks...|perrysmum|1407792704029868041|
+--------------------+---------+-------------------+

root
 |-- tweet: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet_id: string (nullable = true)

+--------------------------------------------------------------------------------------------------------------------------------------------+---------+-------------------+
|tweet                                                                                                                                       |user     |tweet_id           |
+--------------------------------------------------------------------------------------------------------------------------------------------+---------+-------------------+
|RT @Bob_in_NYorks: It’s been said many times, but I’ll repeat. Train drivers and airline pilots

In [2]:
# spark.close()