# Fetch Tweets, Kafka Topic Creation and Sending Twitter Streaming Data to Topics

In [1]:
import json
import tweepy
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, KafkaException
import pickle

# Kafka Configuration
servers = "localhost:9092"
conf = {'bootstrap.servers': servers, 'partitioner': 'consistent_random'}
producer = Producer(conf)

def create_kafka_topics(topics, num_partitions=3, replication_factor=1):
    admin_client = AdminClient({'bootstrap.servers': servers})
    new_topics = [NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor) for topic in topics]
    try:
        admin_client.create_topics(new_topics, request_timeout=15.0)
    except KafkaException as e:
        print(f"Failed to create topics: {e}")
    else:
        print(f"Topics {', '.join(topics)} created successfully.")

def send_message(data:dict, name_topic:str, id:str) -> None:
    serialized_data = json.dumps(data)
    bytes_data = serialized_data.encode('utf-8')
    producer.produce(topic=name_topic, value=bytes_data, key=f"{name_topic[:2].upper()}{id}".encode('utf-8'))
    producer.flush()

# Tweepy API Initialization
def get_twitter_api(bearer_token):
    client = tweepy.Client(bearer_token)
    return client

def fetch_tweets(api, query, max_results=10):
    tweets = api.search_recent_tweets(query=query, max_results=max_results)
    if not tweets.data:
        return []
    tweet_data = []
    for tweet in tweets.data[:5]:
        tweet_info = {'id': tweet.id, 'text': tweet.text}
        tweet_data.append(tweet_info)
    return tweet_data

def main():
    # Load config
    with open('config1.json', 'r') as config_file:
        config = json.load(config_file)

    # Initialize Tweepy API
    api = get_twitter_api(config.get('bearer_token'))

    # Define Kafka topics based on tags
    topics = ['NATO', 'Biden', 'Putin', 'Zelensky', 'NoFlyZone']

    # Create Kafka topics
    create_kafka_topics(topics)

    # Define search queries
    queries = [f"{config.get('nato')} {config.get('query')}", 
               f"{config.get('biden')} {config.get('query')}", 
               f"{config.get('putin')} {config.get('query')}", 
               f"{config.get('zelensky')} {config.get('query')}", 
               f"{config.get('noflyzone')} {config.get('query')}"]

    # Fetch tweets and send to Kafka
    for topic, query in zip(topics, queries):
        tweets = fetch_tweets(api, query)
        for tweet in tweets:
            send_message(tweet, name_topic=topic, id=str(tweet['id']))
            
            
            
    # Create dictionaries to store tweets for each topic
    tweets_by_topic = {topic: [] for topic in topics}

    # Fetch tweets and store in the respective dictionary
    for topic, query in zip(topics, queries):
        tweets = fetch_tweets(api, query)
        for tweet in tweets:
            tweets_by_topic[topic].append(tweet)


if __name__ == '__main__':
    main()


Topics NATO, Biden, Putin, Zelensky, NoFlyZone created successfully.


# To Check if Kafka Topics has Received Messages

In [2]:
from confluent_kafka import Consumer, KafkaException

def consume_messages(topic):
    conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'my_consumer_group', 'auto.offset.reset': 'earliest'}

    consumer = Consumer(conf)
    consumer.subscribe([topic])

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException:
                    continue
                else:
                    print(msg.error())
                    break
            print('Received message: {}'.format(msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

# Replace 'your_topic' with the actual topic name
consume_messages('NATO')


Received message: {"id": 1724979134294639012, "text": "Here for those that argue \"Palestinians were celebrating 7th of October\".\n\n\"Gaza you are a wh*re\".\n\nThis been happening for 75yrs btw, constant, routinely humiliations.\n\n#Gaza #Palestine #Israel #Iran #Yemen #Russia #US #NATO #UN #BRICS #Hamas #Hezbollah\nhttps://t.co/XsJ9EaghDP"}
Received message: {"id": 1724983738000347307, "text": "How crazy it is to watch Israelis celebrating \"winning\" a conflict against unarmed woman and children? Surreal.\n\n#Gaza #Palestine #Israel #Iran #Yemen #Russia #US #NATO #UN #BRICS #Hamas #Hezbollah"}
Received message: {"id": 1724979134294639012, "text": "Here for those that argue \"Palestinians were celebrating 7th of October\".\n\n\"Gaza you are a wh*re\".\n\nThis been happening for 75yrs btw, constant, routinely humiliations.\n\n#Gaza #Palestine #Israel #Iran #Yemen #Russia #US #NATO #UN #BRICS #Hamas #Hezbollah\nhttps://t.co/XsJ9EaghDP"}
Received message: {"id": 1724978719742115972, "