In [0]:
pip install tweepy

In [0]:
pip install pyspark

In [0]:
pip install wordcloud

In [0]:
pip install mlflow

In [0]:
import pyspark
import warnings
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
import mlflow
import pandas as pd
import tweepy

In [0]:
spark = SparkSession.builder.appName('tweets').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") 


In [0]:
df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/sanjana.jayshekar@gmail.com/twitterdata.csv")

In [0]:
print("shape", (df.count(), len(df.columns)))

In [0]:
df = df.select("*").withColumn("id", monotonically_increasing_id())

In [0]:
df.show()

In [0]:
df.na.drop().show()

In [0]:
df.printSchema()

In [0]:
df = df.drop("_c0","_c2","_c3","_c4", "_c5")

In [0]:
df.show(5)

In [0]:
 df = df.withColumnRenamed("_c1", "target")

In [0]:
 df = df.withColumnRenamed("_c6", "text")

In [0]:
df.printSchema()

In [0]:
def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  c = regexp_replace(c, "@[\w]*", "") # Remove usernames

  return c

In [0]:
df_clean = df.select(clean_text(col("text")).alias("ctext"))

In [0]:
df_clean = df_clean.select("*").withColumn("id", monotonically_increasing_id())

In [0]:
df_clean.show(20)

In [0]:
df = df.drop("text")

In [0]:
df_clean.show()

In [0]:
def get_uniform(df1_uniform, df2_uniform):
    if df1_uniform:
        return df1_uniform
    if df2_uniform:
        return df2_uniform

u_get_uniform = udf(get_uniform, StringType())
df_3 = df.join(df_clean, on = "id", how = 'outer').select("id", "target", "ctext").orderBy(col("id"))

In [0]:
df_3.show()

In [0]:
print((df_3.count(), len(df_3.columns)))

In [0]:
df_3 = df_3.withColumn('target', regexp_replace('target', '4', '1'))

In [0]:
df_3.groupBy('target').count().show()

In [0]:
(train_set,test_set) = df_3.randomSplit([0.8, 0.2], seed = 32)

In [0]:
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")

In [0]:
tokenizer = Tokenizer(inputCol="ctext", outputCol="words")
stops = StopWordsRemover(inputCol="words", outputCol="words_clean").setStopWords(englishStopWords)
hashtf = HashingTF(numFeatures=2**16, inputCol="words_clean", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer,stops,  hashtf, idf, label_stringIdx])

In [0]:
pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(test_set)

In [0]:
train_df.show(5)

In [0]:
mlflow.sklearn.autolog()

In [0]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
lr_Pred = lrModel.transform(val_df)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(lr_Pred)
auroc = evaluator.evaluate(lr_Pred, {evaluator.metricName: "areaUnderROC"})
print ("Model Accuracy: ", accuracy)
print("Area under ROC Curve: {:.4f}".format(auroc))

In [0]:
nb = NaiveBayes()
nbModel = nb.fit(train_df)
nbpred = nbModel.transform(val_df)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(nbpred)
auroc = evaluator.evaluate(nbpred, {evaluator.metricName: "areaUnderROC"})
print ("Model Accuracy: ", accuracy)
print("Area under ROC Curve: {:.4f}".format(auroc)) 

In [0]:
dt = DecisionTreeClassifier()
dtModel = dt.fit(train_df)
dtpred = dtModel.transform(val_df)


In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(dtpred)
print ("Model Accuracy: ", accuracy)
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(dtpred, {evaluator.metricName: "areaUnderROC"})))

# Streaming data

In [0]:
#access keys obtained from twitter API
consumer_key = "Ag6sgQVg4yAO3BKyqhCF5zvOO"
consumer_secret = "TElhHZ9wgkPWpsDHVNy7gbOnlDc9gwDvywG00075pYUKyCZ04H"
access_token =  "1322313338009911297-iDSJdtIVYsJ0gRxwonqSuzyRNVcyJ5"
access_token_secret ="6M9r7AmpPLzSCd8gjwFT3pM6Zp7PC4MFWrqlropkrC7Aq" 

In [0]:
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth,wait_on_rate_limit=True)

In [0]:
#top 150 tweets on iphone 12 taken on November 4th, 2020
text_query = 'iphone 12'
count = 150
try:
 # Creation of query method using parameters
 tweets = tweepy.Cursor(api.search,q=text_query, lang='en').items(count)
 # Pulling information from tweets iterable object
 tweets_list = [[tweet.id, tweet.text] for tweet in tweets]
 
 # Creation of dataframe from tweets list
 # Add or remove columns as you remove tweet information
 tweets_df = pd.DataFrame(tweets_list)
 
except BaseException as e:
    print('failed on_status,',str(e))
    time.sleep(3)

In [0]:
tweets_df.columns = ["id", "text"]

In [0]:
pd.set_option('display.max_colwidth', -1)
tweets_df.head(5)

Unnamed: 0,id,text
0,1324073295373869062,"RT @Nikspearsjk: ME having a CONVO with BAE \n\nME: Babe here’s the IPHONE 12 I \n promised you, it’s 3 MONTHS…"
1,1324073293540954112,iPhone 12 Pro Durability Test - Is 'Ceramic Shield' Scratchproof?! https://t.co/puf5W2KNp0 via @YouTube
2,1324073262419238914,"RT @iPLUGNGBACKUP: The cosmetic issue on the iPhone 12 does not seem to be isolated, I am worried it will spread to the Nigerian market on…"
3,1324073250553552906,I had a old iPhone and now I’m rocking my new iPhone 12 come on I been on a red 8 through that time period I’m on a… https://t.co/FhlmI44eco
4,1324073247986667523,RT @thomasjhenrylaw: 📱 IPHONE 12 GIVEAWAY! 📱\n \nWe're excited to be giving away the new iPhone 12 with 5G speed. Follow the steps below to…


In [0]:
from pyspark.sql.types import *
mySchema = StructType([ StructField("id", LongType(), True),StructField("text", StringType(), True)])

In [0]:
tweets = spark.createDataFrame(tweets_df,schema=mySchema)

In [0]:
clean_tweets = tweets.select(clean_text(col("text")).alias("ctext"))

In [0]:
Tweets_pipeline= pipelineFit.transform(clean_tweets)

In [0]:
tweet_pred = lrModel.transform(Tweets_pipeline)


In [0]:
tweet_pred.show(45)

In [0]:
display(tweets)

id,text
1324073295373869062,"RT @Nikspearsjk: ME having a CONVO with BAE ME: Babe here’s the IPHONE 12 I promised you, it’s 3 MONTHS…"
1324073293540954112,iPhone 12 Pro Durability Test - Is 'Ceramic Shield' Scratchproof?! https://t.co/puf5W2KNp0 via @YouTube
1324073262419238914,"RT @iPLUGNGBACKUP: The cosmetic issue on the iPhone 12 does not seem to be isolated, I am worried it will spread to the Nigerian market on…"
1324073250553552906,I had a old iPhone and now I’m rocking my new iPhone 12 come on I been on a red 8 through that time period I’m on a… https://t.co/FhlmI44eco
1324073247986667523,RT @thomasjhenrylaw: 📱 IPHONE 12 GIVEAWAY! 📱  We're excited to be giving away the new iPhone 12 with 5G speed. Follow the steps below to…
1324073228034220033,RT @haekjoyce: Hyuk would be getting a new iPhone 12 Pro 512GB for one of his 2020 birthday presents this year by 光芒赫宰ShineHyuk! ✨ Other b…
1324073197633966080,RT @Akburkz: Iphone 12 Promo shoot with @mohframes 🔥🤝 https://t.co/OPMPDs4Q6V
1324073194051915776,Preorder the iPhone 12 Mini and iPhone 12 Pro Max beginning 8:00AM on November 6! https://t.co/8I1lqnCmAE
1324073192466583554,"RT @Nikspearsjk: ME having a CONVO with BAE ME: Babe here’s the IPHONE 12 I promised you, it’s 3 MONTHS…"
1324073190419697664,iPhone 12 Pro vs Galaxy Note 20 Ultra Speed Test: Which Phone Is the Fastest? https://t.co/iKFBiAh5jJ https://t.co/2mQlrho8mQ
