In [None]:
from kafka import KafkaConsumer
consumer = KafkaConsumer('bigproject', bootstrap_servers='localhost:9092')
for message in consumer:
    print(message.value)

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, StructType, StructField
from kafka import KafkaProducer

# Initialiser la session Spark
spark = (SparkSession
         .builder
         .master('local[*]')
         .appName('YouTubeCommentsProcessing')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")
         .getOrCreate())

# Fonction de nettoyage des commentaires
def clean_comment(comment):
    cleaned_comment = comment.strip().lower()
    return cleaned_comment

# UDF (User Defined Function) pour nettoyer les commentaires
clean_comment_udf = udf(clean_comment, StringType())

# Schéma des données Kafka
schema = StructType([
    StructField("source", StringType(), True),
    StructField("commentDate", StringType(), True),
    StructField("comment", StringType(), True)
])

# Lire le flux de Kafka
kafka_df = (spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "test-sentiments")
            .option("startingOffsets", "latest")  # Read only the new messages
            .load())

# Convertir les données Kafka en DataFrame avec le schéma défini
json_df = kafka_df.selectExpr("CAST(value AS STRING)").select(F.from_json("value", schema).alias("data")).select("data.*")

# Nettoyer les commentaires
cleaned_df = json_df.withColumn("cleaned_comment", clean_comment_udf(col("comment")))

# Démarrer la requête en écrivant les résultats dans la console
query = (cleaned_df
         .writeStream
         .outputMode("append")  # Utiliser 'append' pour ajouter continuellement de nouvelles lignes
         .format("console")
         .start())

# Attendre la fin de la requête
query.awaitTermination()


In [None]:
# Création de la session Spark
spark = (SparkSession
         .builder
         .master('spark://localhost:4040')  # Connect to the Spark master
         .appName('word-count')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")
         .getOrCreate())


In [None]:
import joblib

# Chemin vers le modèle joblib
model_path = 'costumer/models/SAMD.joblib'
sentiment_model = joblib.load(model_path)

# Fonction de test du modèle
def test_model(comment):
    prediction = sentiment_model.predict([comment])
    return prediction

# Testez le modèle sur des exemples de commentaires
comments = [
    "this comment is positif"
    "this comment is negatif",
    "This video is good",
]

for comment in comments:
    print(f"Comment: {comment} -> Sentiment: {test_model(comment)}")


In [8]:
import facebook_scraper as fs
import pandas as pd
from facebook_scraper import exceptions  # Import specific exceptions

In [9]:
import facebook_scraper as fs
import pandas as pd
from facebook_scraper import exceptions  # Import specific exceptions

class FacebookScraper:
    def __init__(self):
        self.MAX_COMMENTS = 10 # Maximum number of comments to retrieve

    def getPostData(self, post_url):
        try:
            post_id = post_url.split("/")[-1].split("?")[0]  # Extract post ID
            print(post_id)

            # Attempt to get post data, handling potential errors
            gen = fs.get_posts(post_urls=[post_id], options={"comments": self.MAX_COMMENTS, "progress": True})
            post = next(gen)

            # Handle missing 'comments_full' key
            comments = post.get('comments_full', [])  # Use default empty list if missing

            if comments:
                df = pd.json_normalize(comments, sep='_')
                return df
            else:
                print(f"No comments found for post: {post_id}")
                return None  # Return None to indicate no comments

        except (ValueError, IndexError, exceptions) as e:
            print(f"Error retrieving post data: {post_url} - {e}")
            return None  # Return None to signal failure




In [17]:
pip install twscrape

Note: you may need to restart the kernel to use updated packages.


In [18]:
import asyncio
import twscrape
from twscrape import API, gather
import pandas as pd

class TwitterScraper:
    def __init__(self):
        self.api = API()

    async def gather_tweets(self, query="morocco", limit=20):
        await self.api.pool.add_account("username dial twitter", "password", "email","mail_pass") #mail pass optionel  , we can have more than 1
        await self.api.pool.login_all()

        tweets = await gather(self.api.search(query, limit=limit))

        data = []
        for tweet in tweets:
            tweet_data = {
                'ID': tweet.id,
                'Username': tweet.user.username,
                'Content': tweet.rawContent,
                'Date': tweet.date
            }
            data.append(tweet_data)

            print(tweet.id, tweet.user.username, tweet.rawContent)

        df = pd.DataFrame(data)
        return df

async def display_tweets():
    scraper = TwitterScraper()
    df = await scraper.gather_tweets()

    # Afficher les tweets un par un
    for index, row in df.iterrows():
        print(f"Tweet ID: {row['ID']}")
        print(f"Username: {row['Username']}")
        print(f"Content: {row['Content']}")
        print(f"Date: {row['Date']}")
        print("\n")

# Si une boucle d'événements est déjà en cours d'exécution, ajoutez display_tweets() à cette boucle
if asyncio.get_event_loop().is_running():
    asyncio.create_task(display_tweets())
else:
    asyncio.run(display_tweets())

In [19]:
import pandas as pd

# Example DataFrame
data = {
    'date': ["Saturday 25 May 2024 - 23:34"]
}
df = pd.DataFrame(data)

# Convert the date column to datetime
df['date'] = pd.to_datetime(df['date'], format="%A %d %B %Y - %H:%M")

# Convert the datetime object to the desired format
df['date'] = df['date'].dt.strftime("%Y-%m-%d %H:%M:%S")

print(df)


                  date
0  2024-05-25 23:34:00


In [2]:
pip install scikit-learn==1.2.2


Collecting scikit-learn==1.2.2
  Downloading scikit_learn-1.2.2-cp311-cp311-win_amd64.whl (8.3 MB)
                                              0.0/8.3 MB ? eta -:--:--
                                              0.0/8.3 MB 991.0 kB/s eta 0:00:09
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.1/8.3 MB 1.7 MB/s eta 0:00:05
                                              0.

ERROR: Could not install packages due to an OSError: [WinError 5] Accès refusé: 'C:\\Users\\sejja\\anaconda3\\Lib\\site-packages\\~klearn\\.libs\\msvcp140.dll'
Consider using the `--user` option or check the permissions.

