<h3> Installing java

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

<h3> Installing spark nlp and pyspark libraries

In [0]:
!pip install spark-nlp==2.6.0
!pip install pyspark==2.4.4

In [0]:
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
from numpy import *
import sparknlp
spark = sparknlp.start()
import os
import tweepy as tw
from array import array
import time

In [0]:
print("Spark NLP version", sparknlp.version()) 
print("Apache Spark version:", spark.version)

<h3> Reading twitter data file

In [0]:
trainDataset = spark.read.csv('/FileStore/Data/cleaned_200k.csv',header=True)
      
trainDataset.show(10, truncate=False)

In [0]:
from pyspark.sql.functions import col
trainDataset.groupBy("category").count().orderBy(col("count").desc()).show()

<h4> Pre-processing - creating tokenizer, normalizer and stopWordsCleaner

In [0]:
document_assembler = DocumentAssembler() \
    .setInputCol("description") \
    .setOutputCol("document")
    
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
    
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")
 
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

<h3> Creating bert embeddings

In [0]:
bert_embeddings = BertEmbeddings\
 .pretrained('bert_base_cased', 'en') \
 .setInputCols(["document",'cleanTokens'])\
 .setOutputCol("bert")\
 .setCaseSensitive(False)
 
embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")
    
embeddings_finisher = EmbeddingsFinisher() \
    .setInputCols(["sentence_embeddings"]) \
    .setOutputCols(["finished_sentence_embeddings"]) \
    .setOutputAsVector(True)\
    .setCleanAnnotations(False)

<h3> Pipelining the data

In [0]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
 
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
 
 
nlp_pipeline_bert = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            bert_embeddings,
            embeddingsSentence,
            embeddings_finisher,
           label_stringIdx])

<h3> Training bert data

In [0]:
nlp_model_bert = nlp_pipeline_bert.fit(trainDataset)
processed_bert = nlp_model_bert.transform(trainDataset)

<h3> Parameter tuning

In [0]:
from pyspark.sql.functions import explode,length
from pyspark.sql.functions import udf

@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()


processed_bert= processed_bert.withColumn("features", explode(processed_bert.finished_sentence_embeddings))
processed_bert_2 = processed_bert.select('description','features','label').where(num_nonzeros("features") == 768)
processed_bert_2.show(5)

<h3> Splitting data

In [0]:
(trainingData, testData) = processed_bert_2.select('features','label').randomSplit([0.75, 0.25], seed = 100)

<h3> Creating Logistic Regression model

In [0]:
from pyspark.ml.classification import LogisticRegression
 
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
 
lrModel = lr.fit(trainingData)

<h3> Finding prediction

In [0]:
predictions = lrModel.transform(testData)
 
predictions.select("features","probability","label","prediction")\
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

<h3> Finding accuracy of ML model

In [0]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

df = predictions.select("probability","label","prediction").toPandas()

print(classification_report(df.label, df.prediction))
print(accuracy_score(df.label, df.prediction))

<h3> Saving the model

In [0]:
'''
#lrModel.write().save('dbfs:/saveModel')
import tempfile
path = tempfile.mkdtemp()
lrModel.save(sc, path)
'''

In [0]:
'''
%py
basePath = "/tmp/sa"
#model.save(basePath + "/lrModel")

# You may also specify "overwrite" just as when saving Datasets and DataFrames:
#lrModel.write().save(basePath)
lrModel.write().overwrite().save(basePath)
'''

In [0]:
'''
%sh 
rm -rf /tmp/mleap_python_model_export
mkdir /tmp/mleap_python_model_export'''

In [0]:
'''
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

lrModel.serializeToBundle("jar:file:/tmp/mleap_python_model_download/sentiment_analysis_pipeline-json.zip", predictions)
'''

In [0]:
'''
%sh 
#rm -rf /tmp/mleap_python_model_export
'''

In [0]:
'''
#dbutils.fs.rm("/FileStore/tables", True)
dbutils.fs.cp("file:/tmp/mleap_python_model_export/sentiment_analysis_pipeline-json.zip", "dbfs:/FileStore/sentiment_analysis_pipeline-json.zip")
display(dbutils.fs.ls("file:/tmp/mleap_python_model_export"))
'''

In [0]:
'''
from pyspark.ml import PipelineModel
deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:/tmp/mleap_python_model_export/sentiment_analysis_pipeline-json.zip")
'''

In [0]:
'''
# test_df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/test").select("text", "topic")
# test_df.cache()
# display(test_df)
testData.show(5)
'''

<h3> Streaming Twitter data

<h4> Getting developer access keys and setting connection

In [0]:
consumer_key= ''
consumer_secret= ''    #INPUT TWITTER API KEY
access_token= ''
access_token_secret= ''

In [0]:
auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tw.API(auth, wait_on_rate_limit=True)

<h4> Filter tweets for covid data

In [0]:
search_words = "FDA+covid+vaccine -filter:retweets"
date_since = "2020-12-11"
date_until = "2020-12-12"

<h4> Collect tweets

In [0]:
tweets = tw.Cursor(api.search,
              q=search_words,
              tweet_mode='extended',
              geocode="42.3601,-71.0589,5mi",
              lang="en",
              since=date_since,
              until=date_until).items(5000)
tweets

In [0]:
users_locs = [[tweet.user.screen_name, tweet.user.location, tweet.full_text] for tweet in tweets]
dataset = pd.DataFrame(data=users_locs, 
                    columns=['user', "location", "description"])
dataset.head()

Unnamed: 0,user,location,description
0,ethicalpsycholo,"Boston, MA",Is Trump deliberately politicizing vaccine app...
1,BU_Law,"Boston, MA",Confused about the FDA approval process for fo...
2,ragoninstitute,"Cambridge, MA",“The process lasts in the body for about 36 ho...
3,CecilWebsterMD,Boston,Looking forward to FDA officials not being ant...
4,BenjaminConteh9,"Medford, MA","Would you buy:\n1. A product that is so good, ..."


In [0]:
len(dataset)

<h4>Cleaning live twitter data

In [0]:
import re
corpus = []
for i in range(len(dataset)):     #  for i in range(0, 1000):
  text = dataset['description'][i]
  #text = text.lower()
  text = re.sub('(@\w+)', '', text)  # Removing @ followed by words i.e. usernames
  text = re.sub('\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', text)   # Removing hyperlinks
  text = re.sub('\s+', ' ', text) # Removing multiple spaces with single space
  text = text.strip()
  dataset['description'][i] = text 
 # print('The current sample is: ', i)
print('COMPLETED')

In [0]:
spark_df = spark.createDataFrame(dataset)
spark_df.show(5)

<h4> Encoding bert embeddings

In [0]:

processed_bert_new_tweet = nlp_model_bert.transform(spark_df)

In [0]:
processed_bert_new_tweet= processed_bert_new_tweet.withColumn("features", explode(processed_bert_new_tweet.finished_sentence_embeddings))
processed_bert_new_tweet_2 = processed_bert_new_tweet.select('description','features').where(num_nonzeros("features") == 768)
processed_bert_new_tweet_2.show(5)

In [0]:
predictions_new_tweet = lrModel.transform(processed_bert_new_tweet_2)

In [0]:
predictions_new_tweet.select("description","features","probability","prediction").show(3)

In [0]:
pandas_df = predictions_new_tweet.select("prediction").toPandas()
pandas_df.head()

Unnamed: 0,prediction
0,0.0
1,1.0
2,0.0
3,0.0
4,0.0


In [0]:
pandas_df['CLASS'] = pandas_df['prediction'].map({0.0: 'negative', 1.0 : 'positive'})
pie_chart = pandas_df.groupby(['CLASS']).size()

In [0]:
%matplotlib inline
import matplotlib.pyplot as plt

plt.pie(pie_chart, labels=['Negative', 'Positive'], colors=['red', 'green'])
image=plt.show()
display(image)