# Project - Apache Spark & Elastichsearch
## 2nd Notebook : Spark Streaming
##### Students:
* Lilia IZRI      (DS)
* Yacine MOKHTARI (DS)
* Alexandre COMBEAU (DS)

##### Report
[PENSER A METTRE UN LIEN ICI]


In [1]:
# import necessary packages
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext



# For ML
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

# From our util.py file
from utils import sentiment, tweetToJSON
import textblob

In [2]:
import elasticsearch
elasticsearch.__version__

(7, 17, 3)

## I. Process & Analyze input data (tweets)
### 1. Create our Dstream that receives data

In [3]:
# Initiate the SparkContext and StreamingContext with 10 second batch interval
sc = SparkContext()
ssc = StreamingContext(sc, 10)
spark = SparkSession(sc)
ssc.checkpoint("file:///tmp/spark")    # Checkpoint for backups (useful for operations by window)

In [4]:
# initiate streaming text from a TCP (socket) source (Our tweets received)
socket_stream = ssc.socketTextStream("127.0.0.1", 5567)

### 2. Process data and tag with sentiment 

Here, we just took into account the polarity and choosed to ignore the subjectivity !  ;)

In [5]:
# We split the fields of the tweet received and we add tag the data with the sentiment of the tweet
#   so the rdd below 'tweets' will be of the form (user, text, date, latitude, locations, hashtags, sentiment, tweet_id)
def mapSplit(tweet):
    """
    A function that takes a tweet  (the one we sent from the other iPython file),
    splits it into its different fields and adds the sentiment field {-1, 0, 1}
    """
    return (tweet[1],    tweet[2],  tweet[3],    tweet[4],    tweet[5],    tweet[6],   "sentiment: "+ str(sentiment(tweet[2][6:])), tweet[7])
             #user        #text       #date      #latitude   #longitude    #Hashtags    #sentiment(= {-1,0,1})   #Id

In [6]:
# from elasticsearch import Elasticsearch
# es = Elasticsearch("http://host.docker.internal:9200") # Same port when running elasticsearch with docker

In [7]:
#### 1. Process the received tweets ( we will catch them the same way we sent them into the socket  :  " ###field### field_name: ... ### ...."
tweets_split = socket_stream.map(lambda tweet: tweet.split(' ###:field:### '))
tweets = tweets_split.map(mapSplit)
# tweets.pprint()
#### 2. Tweets contains RDDs representented as tuples
# tweets.pprint() # uncomment this line to see the tweets in the tuple format

#### 3. json_list_per_stream is a list of tweets tuples converted  as a string following the JSON/Dict format
json_list_per_stream = tweets.map(tweetToJSON)
json_list_per_stream.pprint() # uncomment this line to see the tweets in the JSON format

Form of RDD in Tweets :
``('user: userX', 'tweet: @userY blablablbabla', 'date: Thu May 05 00:16:06 +0000 2022', 'lat: 44.933143', 'lon: 7.540121', 'hashtags: #SaveTheWorld', 0, "1237288393929")``

### 3. ML : Cluster tweets according to sentiments and their location

In [8]:
# We create a training set and test set 
training_data =  tweets.map(lambda tweet: Vectors.dense([float(tweet[6][11:]), float(tweet[3][5:]), float(tweet[4][5:])]))
testing_data  =  tweets.map(lambda tweet: LabeledPoint(float(tweet[6][11:]), Vectors.dense([float(tweet[6][11:]), float(tweet[3][5:]), float(tweet[4][5:])])))

# training_data.pprint()
# We create a model with random clusters and specify the number of clusters to find
k = 3
dimension = 3
weights = 0.0
seed = 21

# init
model = StreamingKMeans(k=k, decayFactor=0.5).setRandomCenters(dimension, weights, seed)

# Train the model
model.trainOn(training_data)  

# Predict
result = model.predictOnValues(testing_data.map(lambda lp: (lp.label, lp.features)))
result.pprint()

In [9]:
# We keep the predictions of each tweet (the index of the cluster), and we create (indexCluster, 1) pairs
predictions   = result.map(lambda x: (x[1], 1))

# We reduce by key and window to get the number of elements assigned to each cluster
size_clusters = predictions.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
size_clusters.pprint()

In [10]:
# json_list_per_stream.saveAsTextFiles("tmp/")

## Setting up the environment for Elasticsearch

In [11]:
# from elasticsearch import Elasticsearch


# ###
# def sendToIndex(json_tweet):
#     es = Elasticsearch("http://host.docker.internal:9200") # Same port when running elasticsearch with docker
    
#     json_tweet.cache().count()
#     es.index(index="test",
#              id=1, 
#              document=json_tweet)
    
#     return json_tweet

    
    
# lel = json_list_per_stream.foreachRDD(sendToIndex)
# lel.pprint()

In [12]:
# start streaming and wait couple of minutes to get enought tweets
ssc.start()

-------------------------------------------
Time: 2022-05-06 11:52:00
-------------------------------------------
{
   "user": " tarolime",
   "tweet": " RT @_POPPYTHEPUPPY: Tant d'\u00e9nergie et de mauvaise foi pour d\u00e9fendre Johnny Depp sous pr\u00e9texte que vous le simpiez en \u00e9tant ado c'est trop g\u2026",
   "date": " Fri May 06 11:51:53 +0000 2022",
   "lat": " 44.933143",
   "lon": " 7.540121",
   "hashtags": " ",
   "sentiment": " 0",
   "id": " 471919111"
}

-------------------------------------------
Time: 2022-05-06 11:52:00
-------------------------------------------
(0.0, 1)

-------------------------------------------
Time: 2022-05-06 11:52:00
-------------------------------------------
(1, 1)

-------------------------------------------
Time: 2022-05-06 11:52:10
-------------------------------------------
{
   "user": " Boot_Shrew",
   "tweet": " RT @CathFawr: Bad angle mid-work profile photo, usually in a car or high vis  A nice photo of someone else's car in 

In [13]:
print("Clusters coordinates: " + str(model.latestModel().clusterCenters))

Clusters coordinates: [[-0.05196425 -0.11119605  1.0417968 ]
 [-1.25673929  0.74538768 -1.71105376]
 [-0.20586438 -0.23457129  1.12814404]]


In [None]:
## La bizarrement ça marche tout seul... mais quand c'est un stream ça plante


# a = "id: 234774849593"

# es.index(index="hello",
#                 id=int(a[4:]),
#                 document={"user": "lili",
#                           "text": "22"})