# Challenge :Sentiment Analysis System using Big Data Solution

## I. Web scraping and data ingestion to a MongoDB cluster:

In [3]:
import requests
from bs4 import BeautifulSoup
from pymongo import MongoClient
# Define the URL to scrape
url="https://www.hespress.com/%d8%b1%d8%a6%d9%8a%d8%b3-%d9%86%d8%a7%d8%af%d9%8a-%d8%a7%d9%84%d9%88%d8%af%d8%a7%d8%af-%d9%8a%d8%ad%d8%a7%d9%88%d9%84-%d8%a5%d8%a8%d9%82%d8%a7%d8%a1-%d8%b9%d8%b7%d9%8a%d8%a9-%d8%a7%d9%84%d9%84%d9%87-1121462.html/"
# Make a GET request to the website
response = requests.get(url)

# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")

# Extract the comments from the HTML
commentaires = soup.find_all("div", class_="commentaires")

# Connect to the MongoDB instance
client = MongoClient("mongodb+srv://maryamElmouim:azera1232000@serverlessinstance0.ww03u.mongodb.net/test")

# Get the comments collection
comments_collection = client.test.comments

comments_to_insert = []
# Iterate through the comments and extract the text of the two div elements
for comment in commentaires:
    for ul in comment.find_all('ul'):
        for li in ul.find_all('li'):
            div_container = li.find('div', {'class': 'comment-body'})
            div1 = div_container.find('div', {'class': 'comment-head'})
            div2 = div_container.find('div', {'class': 'comment-text'})
            comment_obj = {"author": div1.text.split("\n")[2], "text": div2.text}
            comments_to_insert.append(comment_obj)

# insert all comments
comments_collection.insert_many(comments_to_insert)


<pymongo.results.InsertManyResult at 0x1bc5dfd3790>

Step 2. Data ingestion into a Kafka topic.

In [None]:
from confluent_kafka import Producer, Consumer
import json # saving format 

# setting the Kafka producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})

# extracting the text of the two div elements
for comment in comments:
    for ul in comment.find_all('ul'):
        for li in ul.find_all('li'):
            div_container = li.find('div', {'class': 'comment-body'})
            div1 = div_container.find('div', {'class': 'comment-head'})
            div2 = div_container.find('div', {'class': 'comment-text'})
            comment_obj = {"author": div1.text.split("\n")[2], "text": div2.text}
            comment_json = json.dumps(comment_obj)
            producer.produce('exam', value=bytes(comment_json, 'utf-8'))
            producer.flush()
            print(f'Published message: {comment_json}')


# setting the Kafka consumer
conf = {'bootstrap.servers': 'localhost:9092',
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

consumer.subscribe(['exam'])



while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Reached end of topic {} [{}] at offset {}'.format(
                msg.topic(), msg.partition(), msg.offset()))
        else:
            print('Error occured: {}'.format(msg.error()))
    else:
        comment = json.loads(msg.value())
        print(comment)
consumer.close()



step 3. real-time sentiment analysis using the 'NLTK' library.

In [None]:
from textblob import TextBlob

for comment in comments:
    for ul in comment.find_all('ul'):
        for li in ul.find_all('li'):
            div_container = li.find('div', {'class': 'comment-body'})
            div2 = div_container.find('div', {'class': 'comment-text'})
            comment_text = div2.text
            blob = TextBlob(comment_text)
            sentiment = blob.sentiment
            print(sentiment)
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Reached end of topic {} [{}] at offset {}'.format(
                msg.topic(), msg.partition(), msg.offset()))
        else:
            print('Error occured: {}'.format(msg.error()))
    else:
        comment = json.loads(msg.value())
        print(comment)
# close the consumer after the loop
consumer.close()


## III. Apache Spark.

In [7]:
import os
import sys
#setting spark environement

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

<li>Batch processing </li>

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("BatchProcessing").getOrCreate()

# Load data from the local JSON file into a DataFrame
df = spark.read.option("multiline","true").json(r"C:\Users\21263\Downloads\comments.json")

# Specify the batch size for processing
batch_size = 82

# Get the number of records in the DataFrame
num_records = df.count()
print(num_records)
# Calculate the number of batches to process
num_batches = (num_records + batch_size - 1) // batch_size
print(num_batches)

# Loop through the batches and process the data
for i in range(num_batches):
    # Get the start and end index for the current batch
    start_index = i * batch_size
    end_index = min(start_index + batch_size, num_records)
    
    # Get the DataFrame for the current batch
    batch_df = df.limit(batch_size).coalesce(1)
    
    
    # Perform batch processing on the data
    result = batch_df.rdd\
        .map(lambda x: (x.author, x.text))\
        .groupByKey()\
        .mapValues(list)\
        .collect()
    
    # Show result
    for author, text in result:
        print(f"Author: {author}\nText: {text}\n")

# Stop Spark session
spark.stop()



<li>Sentiment analysis using the library 'TextBlob' by batch processing</li>

In [11]:
from pyspark.sql import SparkSession
from textblob import TextBlob

# Create Spark session
spark = SparkSession.builder.appName("BatchProcessing").getOrCreate()

# Load data from the local JSON file into a DataFrame
df = spark.read.option("multiline","true").json(r"C:\Users\21263\Downloads\comments.json")

# Perform batch processing on the data
result = df.rdd\
    .map(lambda x: (x.author, x.text))\
    .groupByKey()\
    .mapValues(list)\
    .collect()

# Perform sentiment analysis using TextBlob
for author, text in result:
    sentiment_scores = [TextBlob(t).sentiment.polarity for t in text]
    average_sentiment = sum(sentiment_scores) / len(sentiment_scores)
    if average_sentiment != 0:
        print(f"Author: {author}\nAverage sentiment: {average_sentiment}\n")

# Stop Spark session
spark.stop()


Author: bernoussi 
Average sentiment: 0.03333333333333333

Author: lahcen 
Average sentiment: 0.16666666666666669

Author: Oujdi 
Average sentiment: 0.25

Author: Ahmed faras 
Average sentiment: 0.2

