# First Spark Streaming Example
_____

# Twitter Example
Set up the credentials for a twitter app at https://apps.twitter.com/
    
install python-twitter, a python library to connect your Python to the twitter dev account.

Begin by running the TweetRead.py file. Make sure to add your own IP Adress and your credential keys.

In [None]:
import findspark

In [None]:
# your path will likely not have 'matthew' in it. Change it to reflect your path.
findspark.init('/Users/kevinblum/Apache-Spark/spark-3.1.2-bin-hadoop3.2')

In [None]:
# May cause deprecation warnings, safe to ignore, they aren't errors
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml import PipelineModel
from pyspark.ml.feature import CountVectorizer

In [None]:
spark = spark = SparkSession \
    .builder \
    .appName("Twitter Streaming App") \
    .getOrCreate()

tweet_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 5556) \
    .load()

tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")

In [None]:
tweet_df_string = tweet_df_string.withColumnRenamed("value" , "tweet")

In [None]:
writeTweet = tweet_df_string.writeStream.format("csv").\
    option("format" , "append").\
    option("path", "/Users/kevinblum/BigDataProj/SparkTwitterStream/tweets.csv").\
    option("checkpointLocation" , "checkpoints").\
    queryName("tweetquery"). \
    start()


In [None]:
import time
time.sleep(60) 
writeTweet.stop()

In [None]:
import pandas as pd

In [None]:
import os
import glob
import pandas as pd
os.chdir("/Users/kevinblum/BigDataProj/SparkTwitterStream/tweets.csv")

In [None]:
extension = 'csv'
all_filenames = [i for i in glob.glob('*.{}'.format(extension))]


In [None]:
dataframe = pd.DataFrame()
for i in all_filenames:
    if (os.stat(f"/Users/kevinblum/BigDataProj/SparkTwitterStream/tweets.csv/{i}").st_size != 0):
        try:
            d = pd.read_csv(f"/Users/kevinblum/BigDataProj/SparkTwitterStream/tweets.csv/{i}" , engine="python")
            dt = d.T
            dt = dt.reset_index()
            dataframe = dataframe.append(dt)
        except:
            pass




In [None]:
dataframe = dataframe.rename(columns={"index" : "Tweet"})
dataframe = dataframe.astype(str).apply(lambda x: x.str.encode('ascii', 'ignore').str.decode('ascii'))
dataframe = dataframe.reset_index(drop=True)

In [None]:

dataframe = dataframe[dataframe['Tweet'] != ""]
dataframe = dataframe[dataframe['Tweet'] != "."]
dataframe = dataframe[dataframe['Tweet'] != None]

dataframe = dataframe.reset_index(drop=True)


In [None]:
range1 = range(100, 100 + dataframe['Tweet'].size  )
list1 = list(range1)


In [None]:
dfg = pd.DataFrame(list1 , columns = ['tweet_id'])

In [None]:
dfg = dfg['tweet_id'].astype('str')

In [None]:
list_polarity =[]
for b in range(dataframe['Tweet'].size):
    list_polarity.append(0)
    

In [None]:
dfp = pd.DataFrame(list_polarity , columns = ['polarity'])

In [None]:
dataframe = dataframe.join(dfg)

In [None]:
dataframe = dataframe.join(dfp)

In [None]:
dataframe

In [None]:
sparkDF = spark.createDataFrame(dataframe)

In [None]:
sparkDF.show(5)

In [None]:
sparkDF.write.option("header",True).csv("/Users/kevinblum/BigDataProj/sparkDF")

In [None]:
import nltk

In [None]:
nltk.download()

In [None]:
from nltk.corpus.reader.wordnet import *
from pyspark.sql.functions import udf
from sklearn.feature_extraction.text import TfidfTransformer
from nltk.tokenize import word_tokenize
from pyspark.sql.types import StringType,DoubleType,IntegerType
import pyspark.sql.functions as F
wn = nltk.WordNetLemmatizer()
worddict = set(nltk.corpus.words.words())


def preprocessing(text):
    wordset_n = set(wn.lemmatize(w, NOUN) for w in word_tokenize(text.lower().strip()))
    wordset_v = set(wn.lemmatize(w, VERB) for w in wordset_n)
    wordset = set(wn.lemmatize(w, ADJ) for w in wordset_v)
    wordset = wordset & worddict
    return ' '.join(list(wordset))


brand_udf=udf(preprocessing,StringType())
sparkDF=sparkDF.withColumn('text',brand_udf(sparkDF['Tweet']))


In [None]:
from pyspark.ml.feature import  Tokenizer

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [None]:
sparkDF=tokenizer.transform(sparkDF)

In [None]:
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")

In [None]:
model1=count.fit(sparkDF)

In [None]:
sparkDF=model1.transform(sparkDF)

In [None]:
from pyspark.ml.feature import  IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [None]:
idfModel = idf.fit(sparkDF)
sparkDF = idfModel.transform(sparkDF)

In [None]:

sparkDF=sparkDF.repartition(10)

In [None]:
sparkDF.printSchema()

In [None]:
sparkDF = sparkDF.withColumn('polarity', sparkDF['polarity'].cast(IntegerType()))

In [None]:
sparkDF

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel
rf_model = RandomForestClassificationModel.load("/Users/kevinblum/BigDataProj/SparkTwitterStream/models/rf_classifier")

In [None]:
sparkDF

In [None]:
rf_predictions=rf_model.transform(sparkDF.select(['text','words','rawFeatures','features','polarity']))

In [None]:
rf_predictions.select(['text']).show(10)

In [None]:
import pyspark.sql.functions as F

print(F.length("polarity"))

In [None]:
sparkDF

In [None]:
rd_predict = rf_predictions.withColumn("prediction" , rf_predictions["prediction"].cast(StringType()))

In [None]:
rf_predictions.select(['rawFeatures']).show(10)

In [None]:
rd_predict.schema['prediction'].nullable = True

In [None]:
# Can only run this once. restart your kernel for any errors.
sc = SparkContext()

In [None]:
ssc = StreamingContext(sc, 10 )
sqlContext = SQLContext(sc)

In [None]:
socket_stream = ssc.socketTextStream("127.0.0.1", 5556)

In [None]:
lines = socket_stream.window( 20 )

In [None]:
def process(rdd):
    #print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        return wordCountsDataFrame
       # wordCountsDataFrame.show()
    except:
        pass

lines.foreachRDD(process)

In [None]:
def process(rdd):
    rdd.toDF()
    
lines.foreachRDD(process)

In [None]:
print(lines)

In [None]:
from collections import namedtuple
fields = ["tweet", "count"]
Tweet = namedtuple( 'Tweet', fields )

In [None]:
# Use Parenthesis for multiple lines or use \.
( lines.flatMap( lambda text: text.split( " " ) ) #Splits to a list
     .filter( lambda word: word.lower().startswith("#") ) # Checks for hashtag calls
     .map( lambda word: ( word.lower(), 1 ) ) # Lower cases the word
     .reduceByKey( lambda a, b: a + b ) # Reduces
     .map( lambda rec: Tweet( rec[0], rec[1] ) ) # Stores in a Tweet Object
     .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) # Sorts Them in a DF
     .limit(10).registerTempTable("tweets") ) ) # Registers to a table. 

__________
### Run the TweetRead.py file at this point
__________

In [None]:
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# Only works for Jupyter Notebooks!
%matplotlib inline 

In [None]:
ssc.start()

In [None]:
print(lines)

In [None]:
count = 0
while count < 10:
    
    time.sleep( 3 )
    top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait=True)
    plt.figure( figsize = ( 10, 8 ) )
    sns.barplot( x="count", y="tag", data=top_10_df)
    plt.show()
    count = count + 1

In [None]:
ssc.stop()

In [None]:
wordCountsDataFrame