<h1>Hardware and Software for Big Data Mod. B</h1>
<h2>Giacomo Matrone</h2>     
<h3>P37000011</h3>
<h2>Christian Riccio</h2>    
<h3>P37000002</h3>

# Abstract
Twitter è un social media attraverso il quale le persone condividono i loro pensieri e le loro emozioni circa i trend del momento. In un periodo delicato quale quello dell'epidemia da coronavirus che dilaga nel mondo, e che sta toccando tutti personalmente, i social media in generale, e twitter nello specifico, sono più frequentati del solito. La circolazione libera delle informazioni porta inoltre a dover fronteggiare buone, cattive e fake news. In aggiunta a ciò, estrapolare il sentimento generale delle persone circa una problematica importante, quale quella del coronavirus, può aiutare a capire in che direzione si sta muovendo la coscineza di tutti nei confronti del virus.

# Introduzione
Il lavoro condotto riguarda l'implementazione tecnica di una motore di sentiment analysis, near real-time, utilizzando i tweet degli utenti di Twitter. L'integrazione di Spark e AWS Kinesis, ci ha permesso di condurre l'analisi della serie storica dei tweet riguardanti il coronavirus, filtrandoli per hastag (#COVID-19) e regione di provenienza. L'architettura che abbiamo attualmente usato in tale fase è rappresentata in figura e commentata di seguito:

![img](https://miro.medium.com/max/1400/1*sccUyqc0VR_JBhwNGkfNbg.jpeg)

- E' stata creata un'istanza su AWS EC2 (piano free, visto che siamo **#barboni**), seguendo gli steps indicati qui sotto:
  - E' stato registrato un account su AWS;
  - E' stata creata un'istanza EC2;
  - E' stata creata una chiave SSH tramite RSA;
  - E' stato usato Putty al fine di poter connettere l'istanza EC2 con il computer di casa;
  - E' stato usato uno script python contenuto in questo [git](https://github.com/joking-clock/twitter-capture-python) al fine di poter prelevare i tweet da twitter per poi indirizzarli verso Kinesis;
- E' stata creata un'istanza su AWS Kinesis che preleva i dati tramite uno script python, li renderizza come stream, per poi, infine, trasmetterli su Databricks, dove verranno resi in spark sql.
  
Inoltre, tecnicamente il processo si esegue nell'ordine definito qui di seguito: 

- Il Kinesis Data Stream monitora il flusso di dati fornendoci delle metriche dettagliate sulla corretta ingestione dei dati;
- Tramite il notebook di Databricks è stata effettuata la connesisone tra Spark Structured Streaming e il flusso di dati proveniente da Kinesis;
- L'applicazione scritta, ci ha permesso di costruire un motore di sentiment analysis usando: SparkSQL per eseguire query sui dati collezionati in stream e TextBlob (modulo di python) per strutturare la sentiment dei tweet;
- Infine, tramite il modulo seaborn di python, abbiamo ottenuto un plot del conteggio dei tweet in relazione alla propria sentiment.


In maniera del tutto generale, l'implementazione di un modello di gestione dei Big Data segue il workflow, ben descritto, nell'immagine di seguito:
![image](https://docs.microsoft.com/it-it/azure/architecture/data-guide/big-data/images/real-time-pipeline.png)

Le architetture per la gestione dei Big Data includono i seguenti componenti:

- Sorgente dati (es. DBs relazionali, dati in tempo reali quali logs di sensori e etc.);
- Sistema di archiviazione dati;
- Elaborazione batch: la quantità dimensioni considerevoli, una soluzione per Big Data deve spesso elaborare i file di dati mediante processi batch con esecuzione prolungata per filtrare, aggregare e preparare in altro modo i dati per l'analisi;
- Inserimento di messaggi in tempo reale: se i dati sono di provenienza real-time, l'architettura deve includere un modo per acquisire e archiviare i messaggi in tempo reale per l'elaborazione del flusso;
- Elaborazione del flusso: tale fase si conduce dopo avere acquisito i dati in tempo reale, filtrandoli, aggregandoli e preparandoli per l'analisi;
- Archivio dati analitici: in cui dopo aver preparato i dati con le fasi precedenti si possono elaborare i dati in un formato strutturato, sui quali si possono eseguire query. Numerose soluzioni per Big Data preparano i dati per l'analisi e quindi servono i dati elaborati in un formato strutturato su cui è possibile eseguire query con strumenti analitici;
- Analisi e creazione di report: si forniscono informazioni dettagliate sui dati tramite strumenti di analisi e report.

# Cattura dei tweet
Twitter fornisce una API per la cattura dei tweet chiama Twitter Labs, con la quale vengono fornite delle credenziali segrete necessarie ad accedere e procedere con il compito preposto. In python il modulo Tweepy da la possibilità di catturare i tweet, real-timr, filtrandoli per key-words. Contestualmente, fornendo le credenziali di AWS ci si collega con Kinesis Data Stream, tramite una macchina remota EC2, che fornisce il necessario per memorizzare, processare e analizzare i dati.

## Setup dello streaming e sentiment process
Si è rovveduto a:

- Definire la spark session;
- Definire lo spark streaming database;
- Controllare il processo di data collection;
- Lanciare query di prova;
- Visualizzare i dati;
- Implementare il motore di sentiment analysis;
- Visualizzare i risultati.

## Definiamo la SparkSession
Apache Spark segue l’architettura master slave, dove il master è rappresentato dal driver e gli slaves dagli esecutori. Le applicazioni Spark vengono eseguite come serie indipendenti di processi su un cluster, coordinate dall’oggetto SparkContext o SparkSession nel programma principale (chiamato programma del driver). In particolare, per essere eseguito su un cluster, SparkSession può connettersi a diversi tipi di gestori di cluster o Cluster Manager (es.YARN), che allocano risorse tra le applicazioni. Una volta connesso, Spark acquisisce gli esecutori sui nodi del cluster, che sono processi che eseguono calcoli e archiviano i dati per l’applicazione. Successivamente, invia il codice dell’applicazione agli esecutori. Infine, SparkSession invia le attività agli esecutori per l’esecuzione. Gli esecutori sono responsabili dell’esecuzione effettiva del lavoro assegnatogli dal driver. Ciò significa che ogni esecutore è responsabile solo di due cose: eseguire il codice assegnatogli dal driver e riportare lo stato del calcolo, su quell’esecutore, al nodo del driver. Questa modalità di esecuzione, in cui il driver è uno degli esecutori nel cluster, viene detta Cluster Mode. Procediamo quindi all'import dei pacchetti necessari.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession.builder\
                    .master("local")\
                    .appName("Structured Streaming")\
                    .getOrCreate()

L'entità centrale di Spark è **l'RDD** (Resilient Distribuited Dataset). L'RDD è una collezione di elementi partizionata tra i nodi computazionali (nel caso di Databricks Community Edition sono disponibili 2 nodi computazionali), ai fini di **introdurre un parallelismo**. L'architettura Master-Slave, unitamente al concetto di RDD, hanno reso Spark a tutti gli effetti fault-tollerant.

## Da Kinesis Data Stream a Spark Streaming DataFrame

Tutte le funzionalità di Apache Spark dipendono da Spark Core. Poggiati su Spark Core, troviamo:

- Spark Streaming: che consente l’elaborazione di flussi live di dati real time
- SparkSQL: che consente di lavorare con dati strutturati, eseguendo query sugli stessi. Di seguito è riportato il codice per definire la struttura dello stream di dati:

In [8]:
from pyspark.sql.types import StructType, StringType
pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("tweet", StringType(), True) \
          .add("ts", StringType(), True)

Creiamo ora il flusso di dati da **kines a Spark Streaming** fornendo alcuni parametri di collegamento, unitamente allo schema della struttura dati di cui sopra:

In [10]:
awsAccessKeyId = "****************" # valore access key
awsSecretKey = "********************"   # valore secret key
kinesisStreamName = "***********"  # nome kinesis stream
kinesisRegion = "***********"
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "LATEST") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()

## Injection dei dati

Con lo script python per la cattura dei tweet in esecuzione sulla Virtual Machine e su Kinesis Data Stream di AWS, possiamo vedere come i dati provenienti da Kinesis, entrano in Spark Streaming:

In [12]:
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

`df` ci permette di eseguire query sullo streming dati e con `.format('memory')` ci assicuriamo che l'output del flusso sia salvato in memoria come tabella avente lo schema definito sopra. Per sapere se i dati sono disponibili e di conseguenza iniziare ad esplorarli, si è proceduto come di seguito, eseguendo anche una prima query:

In [14]:
df.status

## Data Visualization

In [16]:
import pandas as pd

In [17]:
tweets = spark.sql("select cast(data as string) from tweets")

## Visualizziamo e valutiamo i dati:

In [19]:
tweets.show(5,truncate=True)

### User defined function

Al fine di poter caricare un dataframe pandas, usiamo una user defined funcion che agisca sui tweet collezionati a livello di stream. Questo ci consente di creare un datamodel più regolare, al fine, poi, di caricare il tutto in un formato correttamente interpretabile da Pandas.

In [21]:
from pyspark.sql.functions import col
import json
from pyspark.sql.functions import UserDefinedFunction
def parse_tweet(text):
    data = json.loads(text)
    id = data[0]['id']
    ts = data[0]['ts']
    tweet = data[0]['tweet'] 
    return (id, ts, tweet)
    
# Define your function
getID = UserDefinedFunction(lambda x: parse_tweet(x)[0], StringType())
getTs = UserDefinedFunction(lambda x: parse_tweet(x)[1], StringType())
getTweet = UserDefinedFunction(lambda x: parse_tweet(x)[2], StringType())
# Apply the UDF using withColumn
tweets = (tweets.withColumn('id', getID(col("data")))
               .withColumn('ts', getTs(col("data")))
               .withColumn('tweet', getTweet(col("data")))
         )

In [22]:
tweets.show(5,truncate=True)

In [23]:
!pip install textblob

## Sentiment

La sentiment analisi e parte del Natural Language Processing e che ha los scopo di classificare e interpretare il sentiment di un trend dal testo. Il modulo textblob di python fornisce di base un modello precaricato necessario per ottemperare al nostro goal. L'idea di base è quella di classificare la polarità di un dato tweet in:

- neutrale;
- positivo;
- negativo.

attribuendo a ciascuno di essi un valore compreso tra [-1,1]

In [25]:
import textblob
def get_sentiment(text):
    from textblob import TextBlob
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
      sentiment = "negative"
    elif tweet.sentiment.polarity == 0:
        sentiment = "neutral"
    else:
        sentiment = "positive"
    return sentiment

In [26]:
pandas_tweets = tweets.toPandas()

Tramite il metodo `map` di pandas series, possiamo poi ottenere una nuova colonna che ci dia il sentiment rilevato in relazione ai singoli threshold definiti sopra.

In [28]:
pandas_tweets['polarity'] = pandas_tweets.tweet.map(lambda x: get_sentiment(x))

In [29]:
pandas_tweets.head()

Unnamed: 0,data,id,ts,tweet,polarity
0,"[{""id"": ""1269223863780589568"", ""tweet"": ""b'RT ...",1269223863780589568,Sat Jun 06 11:05:24 +0000 2020,b'RT @WHO: Media briefing on #COVID19 with @Dr...,neutral
1,"[{""id"": ""1269223864984383488"", ""tweet"": ""b'Cap...",1269223864984383488,Sat Jun 06 11:05:25 +0000 2020,b'Cappuccino e cornetto? Cappuccino e brioche?...,neutral
2,"[{""id"": ""1269223865630236679"", ""tweet"": ""b'RT ...",1269223865630236679,Sat Jun 06 11:05:25 +0000 2020,b'RT @WHO: WHO updated guidance on the use of ...,positive
3,"[{""id"": ""1269223867274399744"", ""tweet"": ""b'RT ...",1269223867274399744,Sat Jun 06 11:05:25 +0000 2020,b'RT @BerndPulverer: Calling on @TheLancet &am...,neutral
4,"[{""id"": ""1269223867484119041"", ""tweet"": ""b'RT ...",1269223867484119041,Sat Jun 06 11:05:25 +0000 2020,b'RT @BeachMilk: Trump mentioned U.V. light \x...,positive


In [30]:
import seaborn as sns
import matplotlib.pyplot as plt

### Visualizzazione del sentiment

Al fine di poter avere un'idea del sentiment medio, andiamo poi a creare un grafico a barre in relazione al sentiment rilevato nei tweet. Una possibile applicazione di questo risultato consisterebbe nel poter studiare, in un secondo momento, l'andamento della sentiment durante i periodi di lock down.

In [32]:
fig, ax = plt.subplots(figsize=(10,10))
sns.countplot(pandas_tweets.polarity,ax = ax)
plt.title("Tweets sentiment at %s"%(pandas_tweets.ts.max()))
plt.show()

In [33]:
! pip install TwitterAPI

## Batch analysis

Per analizzare l'andamento dei tweet nel tempo, prepariamo una batch analysis con i dati di twitter storici, presi in quattro giorni consecutivi nei mesi di Febbraio, Marzo ed Aprile.
L'analisi mostra l'andamento nel tempo del sentiment notato.

In [35]:
from TwitterAPI import TwitterAPI, TwitterPager
import csv
# SEARCH_TERM = 'pizza'

# api = TwitterAPI(<consumer key>, 
#                  <consumer secret>,
#                  <access token key>,
#                  <access token secret>)

# pager = TwitterPager(api, 'search/tweets', {'q': SEARCH_TERM})

# for item in pager.get_iterator():
#     print(item['text'] if 'text' in item else item)

In [36]:
import csv
import pandas as pd
import sys
from textblob import TextBlob
import re
import datetime
####input your credentials here
consumer_key = '76gdbmQlVhj0JZE4UpoVzovJ1'
consumer_secret = 'YfPKo66ptcI96d1xU1rpXJT8cYMRpCcYFZQxBq9cudCPcHMUMo'
access_token = '1218643738026815488-zgfH6FM4w4DlxZdq9Uazvu3hCq4hNf'
access_token_secret = '5wwuolifLJrJm6EqZSJtyIT45HD8iUDF4HNc72FUzR4HC'

In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split 

spark = SparkSession.builder\
                    .master("local")\
                    .appName("Structured Streaming")\
                    .getOrCreate()

In [38]:
SEARCH_TERM = ['#coronavirus']
PRODUCT = 'fullarchive'
LABEL = 'DBMS2'

api = TwitterAPI(consumer_key, 
             consumer_secret, 
             access_token, 
             access_token_secret)
hist_tweets=[]
for i in ["202002220000","202002230000","202002240000","202002250000","202002260000","202003220000","202003230000","202003240000","202003250000","202003260000","202004220000","202004230000","202004240000","202004250000","202004260000"]:
  r = api.request('tweets/search/%s/:%s' % (PRODUCT, LABEL), 
              {'query':SEARCH_TERM,
              'toDate':i,
              }
              )
  if r.status_code != 200:
    raise Exception("error on API %s"%(r.status_code))

  for item in r:

    payload={'id':str(item['id']),'tweet':str(item['text']),'created_at':str(item['created_at'])}
    if 'next' not in json:
        break
    hist_tweets.append(payload)
    next = json['next']
  

In [39]:
hist_data = pd.DataFrame(hist_tweets)

created_at,id,tweet,polarity
Wed Mar 25 23:59:54 +0000 2020,1242964458361716737,Está partiendo desde Miami un avión de @Helidosa con destino a China que traerá la ayuda anunciada para combatir el… https://t.co/FOYoEl7nJ6,neutral
Wed Mar 25 23:59:54 +0000 2020,1242964457854287872,RT @fatourgente: Governo dos Estados Unidos orienta que seus cidadãos deixem o Brasil imediatamente #coronavírus https://t.co/thYkPHWHsn,neutral
Wed Mar 25 23:59:54 +0000 2020,1242964457787207681,RT @SenTedCruz: The Chinese Communist Party did everything it could to keep the origin & spread of #coronavirus a secret. Now that thousand…,negative
Wed Mar 25 23:59:54 +0000 2020,1242964457611018241,#Creatividad en caricaturas. [📹] Esta es la forma más divertida para hacerle frente a la contingencia mundial del… https://t.co/8t42I1GOgn,neutral
Wed Mar 25 23:59:54 +0000 2020,1242964457292267521,RT @arepandro: Cuando salgan las cachiporreras del Gobierno corrupto de Moreno @ramirogarciaf @martharoldos etc a querer endilgar la inoper…,neutral


In [41]:
hist_data['polarity'] = hist_data.tweet.map(lambda x: get_sentiment(x))

In [42]:
feb_data = hist_data[:500]
march_data = hist_data[500:1000]
april_data = hist_data[1000:]

In [43]:
fig, ax = plt.subplots(figsize=(10,10))
sns.countplot(feb_data.polarity,ax = ax)
plt.title("Tweets sentiment at February")
plt.show()

In [44]:
fig, ax = plt.subplots(figsize=(10,10))
sns.countplot(march_data.polarity,ax = ax)
plt.title("Tweets sentiment at March")
plt.show()

In [45]:
fig, ax = plt.subplots(figsize=(10,10))
sns.countplot(april_data.polarity,ax = ax)
plt.title("Tweets sentiment at April")
plt.show()

# Conclusioni
La costruzione di un motore di near real-time sentiment analysis dei tweet degli utenti di Twitter ci ha permesso di dedurre che la magigor parte dei twet postati presenta un sentimento neutrale nei conforni del virus e dell'andamento dello stesso in generale nel tempo, portando alla luce aspetti che potrebbero interessare la salute pubblica. In particolare un sentiment neutrale ci può far considerare che le persone non sembrano essere state interessate da quanto ci sta toccando. Non di meno, questo lavoro ha voluto rappresentare una prima mesa in pratica di una pipeline di gestione e analisi di un sistema di Big Data, combinando le potenzialità offerte dai servizi in cloud di Amazon AWS e l'elasticità garantita da Apache Spark in fatto di elaborazione e analisi su un grande volume di dati, quale nel caso in easame, i tweet di una fetta di utenti di Twitter. Ovviamente, le considerazioni sino ad ora portate, sono relative e risentono dell'esiguità del campione considerato, per vincoli del piano free di twitter. Il sentiment appare comunque altamente altalenante.