# Project - Apache Spark & Elastichsearch

##### Students:
* Lilia IZRI      (DS)
* Yacine MOKHTARI (DS)
* Alexandre COMBEAU (DS)

##### Report
[PENSER A METTRE UN LIEN ICI]


In [2]:
# !pip install textblob
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-8.2.0-py3-none-any.whl (378 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m378.6/378.6 KB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hCollecting elastic-transport<9,>=8
  Downloading elastic_transport-8.1.2-py3-none-any.whl (59 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.3/59.3 KB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.1.2 elasticsearch-8.2.0


In [1]:
# import necessary packages
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.streaming import StreamingContext
from textblob import TextBlob

# For ML
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

In [2]:
# Initiate the SparkContext and StreamingContext with 10 second batch interval
sc = SparkContext()
ssc = StreamingContext(sc, 10)
spark = SparkSession(sc)
ssc.checkpoint("file:///tmp/spark")    # Checkpoint for backups (useful for operations by window)

## I. Process & Analyze input data (tweets)
### 1. Create our Dstream that receives data

In [3]:
# initiate streaming text from a TCP (socket) source (Our tweets received)
socket_stream = ssc.socketTextStream("127.0.0.1", 5553)

### 2. Process data and tag with sentiment 

In [4]:
# We create a function that analysis text with textblob
def sentiment(text):
    """ Function that returns -1 if a tweet is more likely negative (polarity<0)
                               0 if it's neutral  (polarity==0)
                               1 if it's positive (polarity>0)
    """
    polarity = TextBlob(text).polarity
    return 1 if polarity > 0 else -1 if polarity < 0 else 0

Here, we just took into account the polarity and choosed to ignore the subjectivity !  ;)

In [5]:
# We split the fields of the tweet received and we add tag the data with the sentiment of the tweet
#   so the rdd below 'tweets' will be of the form (user, text, date, location, hashtags, sentiment)

# def mapSplit(tweet):
#     """
#     A function that takes a tweet  (the one we sent from the other iPython file),
#     splits it into its different fields and adds the sentiment field {-1, 0, 1}
#     """
#     tmp = tweet.split(' ###:field:### ')
#     return (tweet[1], tweet[2], tweet[3], tweet[4], tweet[5], sentiment(tweet[2]))
#              #user     #text    #date    #location  #hashtags  #sentiment(= {-1,0,1})
    
tweets = socket_stream.map(lambda tweet: tweet.split(' ###:field:### '))\
                      .map(lambda tweet: (tweet[1], tweet[2], tweet[3], tweet[4], tweet[5],   tweet[6],   sentiment(tweet[2][6:]), tweet[7]))
                                           #user     #text    #date    #latitude  #longitude  #Hashtags  #sentiment(= {-1,0,1})   #Id

# tweets = socket_stream.map(mapSplit)

Form of RDD in Tweets :
``('user: userX', 'tweet: @userY blablablbabla', 'date: Thu May 05 00:16:06 +0000 2022', 'lat: 44.933143', 'lon: 7.540121', 'hashtags: #SaveTheWorld', 0, "1237288393929")``

### 3. ML : Cluster tweets according to sentiments and their location

In [6]:
# We create a training set and test set 
training_data =  tweets.map(lambda tweet: Vectors.dense([float(tweet[6]), float(tweet[3][5:]), float(tweet[4][5:])]))
testing_data  =  tweets.map(lambda tweet: LabeledPoint(float(tweet[6]), Vectors.dense([float(tweet[6]), float(tweet[3][5:]), float(tweet[4][5:])])))


# We create a model with random clusters and specify the number of clusters to find
k = 3
dimension = 3
weights = 0.0
seed = 21

# init
model = StreamingKMeans(k=k, decayFactor=1.0).setRandomCenters(dimension, weights, seed)

# Train the model
model.trainOn(training_data)  

# Predict
result = model.predictOnValues(testing_data.map(lambda lp: (lp.label, lp.features)))
result.pprint()

In [7]:
# We keep the predictions of each tweet (the index of the cluster), and we create (indexCluster, 1) pairs
predictions   = result.map(lambda x: (x[1], 1))

# We reduce by key and window to get the number of elements assigned to each cluster
size_clusters = predictions.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
size_clusters.pprint()

## II. ElasticSearch Indexing

In [8]:
from elasticsearch import Elasticsearch
port = "9200"
es = Elasticsearch("http://host.docker.internal:" + port)  # The port you use when you launch elastic search on docker

In [9]:
### Marche pass :((( dunno why



# def saveES(tweet):
#     es.index(index="sentiment_test1",
#         id=int(tweet[7][4:]),
#         document={"user": tweet[0],
#                   "text": tweet[1],
#                   "date": tweet[2],
#                   "lat":  tweet[3],
#                   "lon":  tweet[4],
#                   "hashtags": tweet[5],
#                   "sentiment": tweet[6]})


# tweets.foreachRDD(lambda rdd: saveES(rdd))

In [10]:
#### A VOIR: pour utiliser ça il faut avoir un df :)))))) 



# query = df.writeStream \
# .outputMode("append") \
# .queryName("writing_to_es") \
# .format("org.elasticsearch.spark.sql") \
# .option("checkpointLocation", "/tmp/") \
# .option("es.resource", "index/type") \
# .option("es.nodes", "http://host.docker.internal") \
# .start()

# query.awaitTermination()

In [11]:
# start streaming and wait couple of minutes to get enought tweets
ssc.start()

-------------------------------------------
Time: 2022-05-05 02:59:40
-------------------------------------------

-------------------------------------------
Time: 2022-05-05 02:59:40
-------------------------------------------

-------------------------------------------
Time: 2022-05-05 02:59:50
-------------------------------------------
(0.0, 1)
(0.0, 1)
(0.0, 1)
(0.0, 2)
(0.0, 1)
(0.0, 1)
(0.0, 1)

-------------------------------------------
Time: 2022-05-05 02:59:50
-------------------------------------------
(1, 6)
(2, 1)



In [3]:
## La bizarrement ça marche tout seul... mais quand c'est un stream ça plante


# a = "id: 234774849593"

# es.index(index="hello",
#                 id=int(a[4:]),
#                 document={"user": "lili",
#                           "text": "22"})