# Sentiment Analysis using Forbes

#### WHAT IS KAFKA ?

Event streaming is the practice of capturing data in real-time from event sources like databases, sensors,
mobile devices, cloud services, and software applications in the form of streams of events; storing these
event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in
real-time as well as retrospectively; and routing the event streams to different destination technologies
as needed. Event streaming thus ensures a continuous flow and interpretation of data so that the right
information is at the right place, at the right time.

#### EVENTS

An event records the fact that “something happened” in the world or in your business. It is also called record
or message in the documentation. When you read or write data to Kafka, you do this in the form of events.
Conceptually, an event has a key, value, timestamp, and optional metadata headers. Here’s an example
event:

- Event key : " Alice "
- Event value : " Made a payment of $200 to Bob "
- Event timestamp : " Jun . 25 , 2020 at 2:06 p . m ."

#### PRODUCERS / CONSUMERS

Producers are those client applications that publish (write) events to Kafka, and consumers are those that
subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and
agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for.
For example, producers never need to wait for consumers.

#### TOPICS

Events are organized and durably stored in topics. Topics in Kafka are always multi-producer and multisubscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many
consumers that subscribe to these events. Topics in Kafka are always multi-producer and multi-subscriber: a
topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers
that subscribe to these events. Events in a topic can be read as often as needed, you define for how long
Kafka should retain your events through a per-topic configuration setting, after which old events will be
discarded.

![Illustrations](image_kafka2.png)

#### PROJECT SUMMARY

The goal of this project is to create an end-to-end Machine Learning project, including :
- extract tweets of specifics topics from Twitter, in real-time using Apache Kafka
- transform, using you trained-model for sentiments analysis classification
- load data into a data-warehouse using PostgreSQL
- real-time dashboard, to monitor the results for each topics using PowerBI 
Each parts can be start independently.

In [1]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import queue
import time
time.sleep(5) # Pause de 5 secondes après le chargement de la page

import pandas as pd
import warnings
import nltk
warnings.filterwarnings("ignore")
import numpy as np
import matplotlib.pyplot as plt 
from confluent_kafka import Producer, Consumer
from nltk.sentiment import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')
import xgboost as xgb
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from io import StringIO
from xgboost import plot_importance
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, confusion_matrix, roc_curve, auc
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.orm import sessionmaker
import json
from confluent_kafka import Producer, Consumer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from bs4 import BeautifulSoup


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /Users/macbookpro/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


#### EXTRACT

First you need to install Kafka (and Zookeeper to manage it) on your system. At least 4GB of RAM is needed.
You can use this tutorial : https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafkaon-ubuntu-18-04
You will use the website forbes.com and try to make your code generic in a way that it caters to multiple themes and publish them into the Kafka server using topics. You can have multiple #keyword to monitor for one specific show.

In [2]:
def get_data(query):
    driver = webdriver.Chrome()
    driver.get(f"https://www.forbes.com/search/?q={query}")

    # Temps pour que le bouton "More articles" soit cliquable
    more_articles_button = WebDriverWait(driver, 30).until(
        EC.element_to_be_clickable((By.CLASS_NAME, 'search-more'))
    )

    # Cliquer sur le bouton "More articles" jusqu'à ce qu'il n'y en ait plus
    while more_articles_button:
        try:
            more_articles_button.click()
            more_articles_button = WebDriverWait(driver, 30).until(
                EC.element_to_be_clickable((By.CLASS_NAME, 'search-more'))
            )
        except:
            break

    # Récupérer le contenu dès que tous les articles sont chargés
    soup = BeautifulSoup(driver.page_source, 'html.parser')

    all_elt = soup.find_all("article", class_="stream-item et-promoblock-removeable-item et-promoblock-star-item")
    titles = []
    dates = []
    url_articles = []
    contents = []

    # Récupération des contenus de chaque élément
    for elt in all_elt:
        title = elt.find("a", class_="stream-item__title").text

        # Vérifier si l'élément a été trouvé
        url_article_elt = elt.find("a", class_="stream-item__image ratio16x9")

        if url_article_elt:
            url_article = url_article_elt["href"]
        else:
            url_article = "Nan"

        content = elt.find("div", class_="stream-item__description").text
        date = elt.find("div", class_="stream-item__date").text

        # Ajout des contenus à la liste
        titles.append(title)
        dates.append(date)
        contents.append(content)
        url_articles.append(url_article)

    # Fermer le navigateur après avoir terminé
    driver.quit()

    # Retourner un dictionnaire contenant les listes
    return {"Titles": titles, "Dates": dates, "Contents": contents, "URLs": url_articles}

In [None]:
def produce_to_kafka(data_dict, kafka_bootstrap_servers='localhost:9092', kafka_topic='Forbes'):
    # Configuration du producteur Kafka
    kafka_producer_config = {
        'bootstrap.servers': kafka_bootstrap_servers,
        # D'autres configurations peuvent être ajoutées selon vos besoins
    }

    producer = Producer(kafka_producer_config)

    try:
        # Parcourir les données du dictionnaire et les publier dans Kafka
        for i, record in enumerate(zip(data_dict["Titles"], data_dict["Dates"], data_dict["Contents"], data_dict["URLs"])):
            title, date, content, url = record

            # Construire un dictionnaire pour représenter un enregistrement
            record_dict = {
                'Title': title,
                'Date': date,
                'Content': content,
                'URL': url
            }

            # Convertir le dictionnaire en chaîne JSON
            json_message = json.dumps(record_dict)

            # Publier le message dans le topic Kafka
            producer.produce(kafka_topic, key=str(i), value=json_message)

        # Attendre que tous les messages soient envoyés
        producer.flush()

        print(f"Données publiées avec succès dans le topic {kafka_topic}")

    except Exception as e:
        print(f"Erreur lors de la publication dans Kafka : {e}")

# Appeler la fonction get_data avec le query "Finance"
query = "Finance"
data_dict = get_data(query)

# Appeler la fonction produce_to_kafka avec le dictionnaire
produce_to_kafka(data_dict)


%3|1709208997.683|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709208998.684|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1709208999.687|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1709209000.689|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1709209001.690|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709209002.691|FAIL|rdkafka#producer-1| [thrd:localhost:9092

%3|1709209082.868|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709209084.872|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709209086.877|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1709209089.886|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709209091.886|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1709209093.889|FAIL|rdkafka#producer-1| [thrd:localhost:9092/boo

In [None]:
def consume_from_kafka(kafka_bootstrap_servers='localhost:9092', kafka_topic='Forbes'):
    consumer_config = {
        'bootstrap.servers': kafka_bootstrap_servers,
        'group.id': 'Jos_Gnon_Chim',  # Assurez-vous d'utiliser un nouveau groupe de consommateurs si nécessaire
        'auto.offset.reset': 'earliest'
    }

    consumer = Consumer(consumer_config)
    consumer.subscribe([kafka_topic])

    try:
        while True:
            msg = consumer.poll(timeout=1000)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(msg.error())
                    break

            # Charger le message JSON
            json_message = msg.value().decode('utf-8')
            print("Message reçu :")
            print(json_message)

    except KeyboardInterrupt:
        pass

    finally:
        consumer.close()

# Appeler la fonction pour consommer depuis Kafka
consume_from_kafka()

#### TRANSFORM

First you need to create a Sentiment Analysis model with the IMDB database using Scikit-learn (or XGBoost).

Dataset :  imdb.csv 
Using python, create a Kafka “consumer” with your trained-model to classify each articles from Kafka.

In [None]:
def read_csv_with_encoding(file_path, encodings=['utf-8', 'latin-1', 'ISO-8859-1']):
    for encoding in encodings:
        try:
            with open(file_path, 'rb') as f:
                content = f.read()
                decoded_content = content.decode(encoding)
            return pd.read_csv(StringIO(decoded_content), dtype=str)
        except UnicodeDecodeError:
            continue
    raise UnicodeDecodeError(f"Unable to decode the file {file_path} with the provided encodings.")

# Exemple d'utilisation
file_path = "./imdb.csv"
data = read_csv_with_encoding(file_path)


In [None]:
def data_categorization(data):
    """
    Cette fonction prend la donnée d'origine (data) et renvoie une version catégorisée de data.
    Les classes sont représentées par des entiers.
    """
    new_data = data.copy()
    
    # Convertir la colonne 'label' en minuscules et mapper les classes
    print(f"Type de données avant la conversion : {new_data['label'].dtype}")
    new_data['label'] = new_data['label'].str.lower().map({"neg": 0, "pos": 1, "unsup": 2})
    
    # Gérer la conversion de la colonne "review" en chaînes de caractères
    new_data["review"] = new_data["review"].astype(str, errors='ignore')
    
    # Supprimer les échantillons avec 'unsup' comme label
    new_data = new_data[new_data['label'] != 2]

    # Afficher des informations sur les données après chaque étape
    print("Informations sur les données après la conversion :")
    print(new_data.info())
    
    return new_data


In [None]:
def get_split_data(data, label_column="label", feature_columns="review"):
    """
    Cette fonction prend la donnée d'origine (data) et crée une version catégorisée de cette donnée.
    Elle renvoie les données fractionnées (données d'entraînement et données de test).
    """
    # Prétraitement des données
    new_data = data_categorization(data)

    # Vectorisation des données textuelles avec TF-IDF
    tfidf_vectorizer = TfidfVectorizer()
    X_tfidf = tfidf_vectorizer.fit_transform(new_data[feature_columns])

    # Création du DataFrame avec les caractéristiques TF-IDF
    df_tfidf = pd.DataFrame(X_tfidf.toarray(), columns=tfidf_vectorizer.get_feature_names_out())

    # Concaténation des caractéristiques TF-IDF avec les autres colonnes
    df_processed = pd.concat([new_data[[label_column]], df_tfidf], axis=1)

    # Séparation des ensembles de données d'entraînement et de test
    X_train, X_test, y_train, y_test = train_test_split(
        df_processed.drop(label_column, axis=1),
        df_processed[label_column],
        test_size=0.2,
        random_state=42
    )

    return X_train, X_test, y_train, y_test

In [None]:
X_train,X_test,y_train,y_test = get_split_data(data)
X_train,X_test,y_train,y_test

In [None]:
def get_model(X_train, y_train):
    """
    Cette fonction prend X_train et y_train et renvoie un modèle XGBoost entraîné sur ces derniers.
    """
    model = XGBClassifier()
    model.fit(X_train, y_train)
    return model

In [None]:
model_set = get_model(X_train, y_train)

In [None]:
def data_standardization(X_train,X_test):
    """Cette fonction prend en paramètres X_train et X_test et retourne X_train et X_test standardisés"""
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    return X_train_scaled,X_test_scaled


In [None]:
X_train_scaled,X_test_scaled = data_standardization(X_train,X_test)

In [None]:
def plot_performance(model, X_test, y_test): 
    """Cette fonction prend un modèle, un X_test, y_test et affiche les indicateurs de performances pour ce modèle"""

    y_pred = model.predict(X_test)
    precision = precision_score(y_test, y_pred, average='macro')
    recall = recall_score(y_test, y_pred, average='macro')
    cm = confusion_matrix(y_test, y_pred)

    # Courbe ROC n'est pas applicable à la régression logistique, mais nous pouvons utiliser
    # la distribution des probabilités prédites pour afficher la courbe ROC
    y_probs = model.predict_proba(X_test)
    fpr, tpr, _ = roc_curve(y_test, y_probs[:, 1])
    roc_auc = auc(fpr, tpr)

    # Tracer la courbe ROC avec Matplotlib
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='orange', lw=2, label='ROC curve (area = {:.2f})'.format(roc_auc))
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend()
    plt.show()

    print('\nPerformance Indicators')
    print("==========================================")
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'Confusion Matrix:\n{cm}')

In [None]:
plot_performance(model_set, X_test, y_test)

#### LOAD

Make another Kafka consumer to export articles and their label inside a PostgreSQL.

In [None]:
Base = declarative_base()

class KafkaMessage(Base):
    __tablename__ = 'kafka_messages'

    id = Column(Integer, primary_key=True)
    topic = Column(String)
    value = Column(JSON)
    timestamp = Column(DateTime, default=datetime.utcnow)

def connect_to_postgresql(user, password, host, port, database):
    try:
        conn = psycopg2.connect(user=user, password=password, host=host, port=port, database=database)
        engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}')
        return conn, engine
    except Exception as e:
        print(f"Erreur lors de la connexion à la base de données PostgreSQL : {e}")
        raise

def init_db(db_url):
    engine = create_engine(db_url)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    return Session()

def receive_events(consumer, topic):
    consumer.subscribe(topic)  # Correction de la faute de frappe
    ##while True:
    msg = consumer.poll()
    #if msg is None:
    #continue
    message = msg.value().decode("utf-8")
    return consumer

def consume_and_save_to_db(consumer, session):
    for message in consumer:
        try:
            json_value = json.loads(message.value.decode('utf-8'))
            kafka_message = KafkaMessage(topic=message.topic, value=json_value)
            session.add(kafka_message)
            session.commit()
        except Exception as e:
            print(f"Erreur lors de la gestion du message : {str(e)}")
            session.rollback()

In [None]:
producer_config = {
    'bootstrap.servers': 'localhost:9092'
}

consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'wiada',
    'auto.offset.reset': 'earliest'
}

producer = Producer(producer_config)
consumer = Consumer(consumer_config)

# Exemple d'utilisation
try:
    db_config = {
        'user': 'postgres',
        'password': 'lola1205',
        'host': 'localhost',
        'port': '5432',
        'database': 'postgres',
    }

    conn, engine = connect_to_postgresql(**db_config)
    
    db_session = init_db(engine)

    # Utiliser la fonction pour consommer et enregistrer
    consume_and_save_to_db(your_kafka_consumer, db_session)

finally:
    # Fermer la connexion lorsqu'elle n'est plus nécessaire
    if conn:
        conn.close()

#### REAL-TIME DASHBOARD

Create one PowerBI dashboard connected on the Kafka in order to monitor in real-time the number of
articles coming by topics
Create another PowerBI conencted to the PostgreSQL database to monitor the results of your classifier.