<a href="https://colab.research.google.com/github/cheikhb/ML_Projects/blob/main/Twitter_avec_Spark_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Analyser les Hashtags tendances sur Twitter

In [None]:
!apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!pip install -q findspark pyspark
# Cette cellule peut prendre 2 à 3 minutes pour s'exécuter

Reading package lists... Done
[K     |██████████████▍                 | 95.4MB 31kB/s eta 1:02:55
[31mERROR: THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS FILE. If you have updated the package versions, please update the hashes. Otherwise, examine the package contents carefully; someone may have tampered with them.
    pyspark from https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz#sha256=5e25ebb18756e9715f4d26848cc7e558035025da74b4fc325a0ebc05ff538e65:
        Expected sha256 5e25ebb18756e9715f4d26848cc7e558035025da74b4fc325a0ebc05ff538e65
             Got        a02c5df9515b97a56f0adaeeb1453b1451f46fef852d59ba251f59121f518bfc
[0m
[?25h

In [None]:
!pip install findspark pyspark

Collecting findspark
  Using cached https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 55kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 9.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=c22b95934794b101208740d49c5460b1e8587e9675b7122bd562f2ae478aca80
  Stored in directory: /root/.cache/pip/wheels/40/1b/

## Récupération des Tweets en temps réel

### Authentification

Depuis la version 2 de l'API Twitter, l'authentification peut être réalisée par l'intermédiaire d'un **jeton d'accès** (appelée *bearer token*).

In [None]:
import json
import requests
import re

# Inscrire son jeton d'accès ici
BEARER_TOKEN = ""

Essayons de faire un test.

In [None]:
URL = "https://api.twitter.com/2/tweets/sample/stream?tweet.fields=lang"
HEADERS = {"Authorization": "Bearer {}".format(BEARER_TOKEN)}

with requests.request("GET", URL, headers=HEADERS, stream=True) as response:
    print(response.status_code)

200


In [None]:
tweets = []

n_iter = 0

with requests.request("GET", URL, headers=HEADERS, stream=True) as response:
    for response_line in response.iter_lines():
        if response_line:
            json_response = json.loads(response_line)
            
            if json_response["data"]["lang"] != "en":  # On ne récupère que les tweets en anglais
                continue
                
            # On veut au moins un hashtag
            if len(re.findall(r"#(\w*[0-9a-zA-Z]+\w*[0-9a-zA-Z])", json_response["data"]["text"])) == 0:
                continue
            
            tweets.append(json_response)
            n_iter += 1
            if n_iter >= 10:  # On ne récupère que 20 Tweets
                response.close()  # On détruit la connexion
                break

In [None]:
tweets

[{'data': {'id': '1402227415582081026',
   'lang': 'en',
   'text': 'RT @tttyongie: "do you think I\'m cute?" #TAEYONG https://t.co/aogWrY0bua'}},
 {'data': {'id': '1402227419780718597',
   'lang': 'en',
   'text': 'RT @KBSWorldTV: [KBS WORLD Indonesian] #ENHYPEN | Behind Photos🌟\nMore photos on KBS WORLD Facebook \n▶https://t.co/XCV26DXWX6\n\n#Heeseung #Ja…'}},
 {'data': {'id': '1402227419793301504',
   'lang': 'en',
   'text': "@br_cath Thanks for joining the party, we'll see you at Square Enix Presents on Sunday at 8:15pm BST! 🥳 \n \nWe'll send you a reminder pre-show so you don't miss out, but you can also opt out by replying #stop. https://t.co/hHFpnsN9xS"}},
 {'data': {'id': '1402227423974854657',
   'lang': 'en',
   'text': 'Video footage of the Hpakant’s Tamakhan guerrilla strikes continuing this morning to defy military rules. #WhatsHappeningInMyanmar #RejectCoupRejectASEAN #June8Coup https://t.co/LJxdgh5Pit'}},
 {'data': {'id': '1402227423970725892',
   'lang': 'en',
   'text

Chaque Tweet contient deux champs : `id`, qui correspond à l'identifiant de l'utilisateur ayant posté le message, et `text`, le contenu du Tweet. C'est évidemment ce contenu qui nous intéresse.

En voyant ces quelques Tweets, nous voyons tout d'abord qu'il y a quelques transformations à appliquer sur les textes.

In [None]:
import re

def process_tweet(tweet):
    text = tweet["data"]["text"]
    return re.findall(r"#(\w*[0-9a-zA-Z]+\w*[0-9a-zA-Z])", text)

for tweet in tweets:
    hashtags = process_tweet(tweet)
    if len(hashtags) > 0:
        print(hashtags)

['TAEYONG']
['ENHYPEN', 'Heeseung', 'Ja']
['stop']
['WhatsHappeningInMyanmar', 'RejectCoupRejectASEAN', 'June8Coup']
['nctdiamond']
['MaheshBabu']
['EzishaTheProcess']
['EURO2020']
['premierleague', 'PL', 'epl', 'premierleaguefoxsports', 'football', 'FootballDaily', 'UKGovernment', 'bbcfootball', 'NUFCTakeover', 'skynews', 'BBCnews', 'fifa', 'Euro2021', '10downingstreet', 'chroniclenufc', 'DailyMail', 'talksport', 'nufc', 'ESPN', 'politicalnews', 'itvnews']
['JasminiansPrideJasmin']


In [None]:
n_iter = 0

with requests.request("GET", URL, headers=HEADERS, stream=True) as response:
    for response_line in response.iter_lines():
        if response_line:
            json_response = json.loads(response_line)
            hashtags = process_tweet(json_response)
            
            if len(hashtags) > 0 and json_response["data"]["lang"] == "en":
                print(hashtags)
                n_iter += 1
                if n_iter >= 30:  # On s'arrête après 30 hashtags
                    response.close()
                    break

['IDOs', 'CTBot']
['ActiveTeachersForce']
['TumhareRobertJija']
['tuesdaymotivations']
['SeethaRAMaRajuCHARAN', 'RRR', 'Siddha']
['enoughvit']
['CryptoSlate']
['Cyberpunk2077', 'XboxSeriesX', 'VirtualPhotography']
['GodMorningTuesday']
['Fastly']
['23JuneUKwideProtest']
['sharinhallbarrelhorses', 'shproseriestack', 'whatareyoutrainingin', 'shproseries', 'tack', 'saddles', 'saddlepads', 'bits', 'sharinhall']
['FORL1S']
['internetdown', 'InternetShutdown']
['0X1LOVESONG1stWin', 'txt']
['STARPLAY']
['PeopleDefenceForce', 'RejectCoupRejectASEAN', 'June8Coup', 'WhatsHappeningInMyanmar']
['MyfoodieJDlivexPatrick']
['THEBOYZ']
['beetle15k']
['June7Coup', 'WhatsHappeningInMyanmar']
['WeTV']
['AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin', 'AlbumEXOdariMimin']
['PremiosMTVMIAW', 'MTVLAKPOPTWICE']
['RrahulSudhir', '

In [None]:
from datetime import datetime

with requests.request("GET", URL, headers=HEADERS, stream=True) as response:
    headers = response.headers
    print("Appels possibles :", headers["x-rate-limit-limit"])
    print("Appels restants :", headers["x-rate-limit-remaining"])
    print("Date de ré-initalisation :", datetime.fromtimestamp(int(headers["x-rate-limit-reset"])))

Appels possibles : 50
Appels restants : 43
Date de ré-initalisation : 2021-06-08 07:33:42


## Création du serveur TCP



<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/real_time_processing/img/ssc_twitter1.png" />

In [None]:
%%writefile /tmp/ssc_twitter_tcp_server.py
import sys
import socket
import requests
import json
import re

# TODO : Inscrire son jeton d'accès ici
BEARER_TOKEN = ""
URL = "https://api.twitter.com/2/tweets/sample/stream?tweet.fields=lang"
HEADERS = {"Authorization": "Bearer {}".format(BEARER_TOKEN)}

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 9009))
sock.listen()

response = requests.request("GET", URL, headers=HEADERS, stream=True)
if response.status_code != 200:
    print("Unable to connect to stream :")
    print(response.content)
    sys.exit() 

while True:
    (clientsocket, address) = sock.accept()
    with clientsocket:
        print('Connection from', address)
        while True:
            try:
                if not response:
                    break
                for response_line in response.iter_lines():
                    if response_line:
                        content = json.loads(response_line.decode("utf-8"))
                        if content["data"]["lang"] != "en":  # On ne récupère que les tweets en anglais
                            continue

                        # On veut au moins un hashtag
                        if len(re.findall(r"#(\w*[0-9a-zA-Z]+\w*[0-9a-zA-Z])", content["data"]["text"])) == 0:
                            continue
                            
                        payload = content["data"]["text"] + "\n"
                        clientsocket.send(payload.encode())
            except BrokenPipeError:
                print("Client", address, "disconnected.")
                sys.exit()
            except Exception as e:
                print("Unknown error :", e)
                sys.exit()

Writing /tmp/ssc_twitter_tcp_server.py


## Application Spark

In [None]:
import os
import time
import findspark

# Spécifie le chemin où est stocké Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"

findspark.init()  # Trouve les exécutables dans le dossier SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
import subprocess
import socket
import signal

def start_tcp_server():
    stest = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result_of_check = stest.connect_ex(("127.0.0.1", 9009))

    # Cette condition permet de savoir si le port 9009 n'est pas déjà utilisé par un autre processus
    if result_of_check != 0:
        server = subprocess.Popen("python /tmp/ssc_twitter_tcp_server.py", shell=True, preexec_fn=os.setsid)
    else:
        print("Le port 9009 est déjà utilisé : il faut stopper le serveur avant de le relancer.")

    stest.close()

start_tcp_server()

In [None]:
import re
import time

def get_hashtags(tweet):
    hashtags = re.findall(r"#(\w*[0-9a-zA-Z]+\w*[0-9a-zA-Z])", tweet)
    hashtags = [(x.lower(), 1) for x in hashtags]
    return hashtags

def process_tweets():
    start_tcp_server()
    time.sleep(1)
    
    sc = SparkContext(master="local[*]")  # Crée un SparkContext local
    ssc = StreamingContext(sc, 1)  # Fréquence de rafraîchissement
    
    tweets = ssc.socketTextStream("127.0.0.1", 9009)
    hashtags = tweets.flatMap(get_hashtags)
    
    hashtags.pprint()
    
    ssc.start()
    time.sleep(3)
    ssc.stop()
    
process_tweets()

-------------------------------------------
Time: 2021-06-08 11:41:28
-------------------------------------------

-------------------------------------------
Time: 2021-06-08 11:41:29
-------------------------------------------
('myfoodiejdlivexpatrick', 1)
('kai', 1)
('superfanfriday', 1)
('zeroplastic', 1)
('projectparadise', 1)
('xiaozhan', 1)
('xiaozhanxusmile', 1)

-------------------------------------------
Time: 2021-06-08 11:41:30
-------------------------------------------
('ahgasiste', 1)
('freepalestine', 1)
('freepalastin', 1)
('whatshappeninginmyanmar', 1)
('june8coup', 1)
('premiosmtvmiaw', 1)
('mtvlafandomarmy', 1)

-------------------------------------------
Time: 2021-06-08 11:41:31
-------------------------------------------
('troll', 1)
('rejectcouprejectasean', 1)
('june8coup', 1)
('whatshappeninginmyanmar', 1)
('imscudetto', 1)
('iminter', 1)



### Window Functions en streaming

In [None]:
def process_tweets_windowed():
    start_tcp_server()
    time.sleep(1)
    
    sc = SparkContext(master="local[*]")  # Crée un SparkContext local
    sc.setCheckpointDir('/tmp/spark/checkpoints')
    ssc = StreamingContext(sc, 1)
    
    tweets = ssc.socketTextStream("127.0.0.1", 9009)
    hashtags = tweets.flatMap(get_hashtags)
    reduced = hashtags \
        .reduceByKeyAndWindow(
            lambda x, y: x + y,
            lambda x, y: x + y,
            30,
            1
        ) \
        .transform(
            lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
        )
    
    reduced.pprint()
    
    ssc.start()
    time.sleep(60)
    ssc.stop()
    
process_tweets_windowed()

Le port 9009 est déjà utilisé : il faut stopper le serveur avant de le relancer.
-------------------------------------------
Time: 2021-06-08 11:48:45
-------------------------------------------
('2021btsfesta', 2)
('mtvlakpoptwice', 2)
('rejectcouprejectasean', 2)
('fridaylivestream', 2)
('premiosmtvmiaw', 2)
('100asianheartthrobs2021', 2)
('june8coup', 2)
('whatshappeninginmyanmar', 2)
('standwithstudents', 1)
('fullyloaded', 1)
...



ValueError: ignored