In [0]:
%python
import subprocess

# Packages to install
packages_to_install = ['kafka-python', 'ntscraper']

# Use subprocess to run the pip install commands
for package in packages_to_install:
    subprocess.call(['pip', 'install', package])


In [1]:
%python
import json
from kafka import KafkaProducer
from ntscraper import Nitter

# Define the Kafka Producer configuration
KAFKA_BOOTSTRAP_SERVERS = ['kafka-broker:29092']
KAFKA_TOPIC_NAME = 'sentiment_analysis'
KAFKA_PRODUCER_CONFIG = {
    'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS
}

terms = ["genocide", "gaza", "world"]
continents = ["France", "Morocco", "Austria", "Belgium", "Italy"]

def get_twitter_data(terms, continents):
    Twitter_data_list = []
    scraper = Nitter(0)  # Initialize the Nitter scraper

    for term in terms:
        for country in continents:
            # Fetch tweets using the Nitter scraper
            tweets = scraper.get_tweets(term, mode='term', language='en', number=100, near=country)
            
            # Print information about the tweets variable (optional, for debugging)
            # print(f"Term: {term}, Country: {country}, Tweets: {tweets}")
            
            for x in tweets['tweets']:
                # Extract relevant information from each tweet and structure it in a dictionary
                data = {
                    'text': x['text'],
                    'date': x['date'],
                    'likes': x['stats']['likes'],
                    'is_retweet': x['is-retweet'],  # Assuming it's a boolean indicating if it's a retweet
                    'retweets': x['stats']['retweets'],
                    'country': country  # Add the country name to the data
                }
                Twitter_data_list.append(data)

    return Twitter_data_list

def main(terms):
    try:
        # Initialize Kafka producer
        producer = KafkaProducer(**KAFKA_PRODUCER_CONFIG)

        # Fetch Twitter data using the defined terms and continents
        twitter_data = get_twitter_data(terms, continents)

        # Send each tweet as a JSON-encoded message to the Kafka topic
        for tweet in twitter_data:
            json_data = json.dumps(tweet)  # Convert the data to a JSON string
            print(json_data)  # Print the JSON data (optional, for debugging)
            producer.send(KAFKA_TOPIC_NAME, json_data.encode())  # Send JSON data to Kafka topic

    except Exception as e:
        # Handle exceptions and print errors if any
        print(f'Error: {e}')

# Execute the main function with the specified search terms
main(terms)
