# Producer Notebook

This notebook contains the code for the Producer which creates the messages. The data source is the websocket from binance (https://binance-docs.github.io/apidocs/websocket_api/en/#change-log).

## Imports

In [2]:
from kafka import KafkaProducer, KafkaConsumer
import json
import uuid
import websocket
from datetime import datetime
import requests

url = "https://api.twitter.com/2/tweets/search/recent?query=%23crypto%20OR%20%23ethereum%20OR%20%23eth%20OR%20%23btc%20OR%20%23bitcoin&max_results=100&tweet.fields=created_at,text,author_id"

with open("secrets.env", "r") as f:
    secrets = json.loads(str(f.read()))

bearerToken = secrets['bearerToken']

## Defining functions for producing and consuming messages, defining producer and consumer

In [3]:
# defining connection
server1, server2, server3 = 'broker1:9093', 'broker2:9095', 'broker3:9097'
servers = [server1, server2, server3]
binance_topic, twitter_topic = "binance-ws", "twitter"

# using functions from example notebook
def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
        print("Created Producer")
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer
    
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

def consume_xy(consumer, topic_name):
    messages = []
    for msg in consumer:
        messages.append((msg.key.decode('utf-8'), msg.value))
    print(f"Consumed {len(messages)} messages from Kafka Cluster")
    return messages

def connect_kafka_producer(servers):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=servers, api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

# setting up consumer for binance
binance_consumer = KafkaConsumer(binance_topic, 
                         auto_offset_reset='earliest',
                         bootstrap_servers=servers,
                         api_version=(0, 10), 
                         value_deserializer = json.loads,
                         consumer_timeout_ms=1000)

# setting up new consumer for twitter messages
twitter_consumer = KafkaConsumer(twitter_topic, 
                         auto_offset_reset='earliest',
                         bootstrap_servers=servers,
                         api_version=(0, 10), 
                         value_deserializer = json.loads,
                         consumer_timeout_ms=1000)

producer = connect_kafka_producer(servers)

## Adjusting websocket code to write data directly into Kafka Cluster.

In [6]:
test_array = []

def on_open(ws):
    print("Connection opened")
    # Subscribe to different ticker streams
    subscribe_message = {
        "method": "SUBSCRIBE",
        "params": [
            "btcusdt@ticker",
            "ethusdt@ticker",
            "maticusdt@ticker",
            "shibusdt@ticker",
            "solusdt@ticker"
        ],
        "id": 1
    }
    ws.send(json.dumps(subscribe_message))

def on_message(ws, message):
    # close after 3 seconds for testing
    if (datetime.now() - start).seconds > 3:
        ws.close()

    # write data into cluster
    key = str(uuid.uuid4())
    publish_message(producer, binance_topic, key, message)
    print(f"{datetime.now()}: writing message to cluster: {str(message)[:50]}...")
    
    # append data to array for testing
    test_array.append((key, json.loads(message)))

def on_close(ws):
    print("Connection closed")

binance_start = datetime.timestamp(datetime.now())

# consume messages from previous sessions
_ = consume_xy(binance_consumer, twitter_topic)

if __name__ == "__main__":
    # Initialize WebSocket connection
    ws = websocket.WebSocketApp(
        "wss://stream.binance.com:9443/ws",
        on_open=on_open,
        on_message=on_message,
        on_close=on_close
    )
    # only for testing
    start = datetime.now()
    # Start WebSocket connection
    ws.run_forever()

Consumed 0 messages from Kafka Cluster
Connection opened
Message published successfully.
2023-03-07 11:36:51.269999: writing message to cluster: {"result":null,"id":1}...
Message published successfully.
2023-03-07 11:36:51.273198: writing message to cluster: {"e":"24hrTicker","E":1678189011148,"s":"SHIBUSDT"...
Message published successfully.
2023-03-07 11:36:51.498224: writing message to cluster: {"e":"24hrTicker","E":1678189011481,"s":"BTCUSDT",...
Message published successfully.
2023-03-07 11:36:51.501286: writing message to cluster: {"e":"24hrTicker","E":1678189011473,"s":"ETHUSDT",...
Message published successfully.
2023-03-07 11:36:51.780064: writing message to cluster: {"e":"24hrTicker","E":1678189011587,"s":"MATICUSDT...
Message published successfully.
2023-03-07 11:36:51.826710: writing message to cluster: {"e":"24hrTicker","E":1678189011510,"s":"SOLUSDT",...
Message published successfully.
2023-03-07 11:36:52.289921: writing message to cluster: {"e":"24hrTicker","E":167818901

### Testing produced and consumed messages

In [7]:
# retrieving messages
consumed_messages = consume_xy(binance_consumer, binance_topic)

# testing if write & read operations work as intended
for i, message in enumerate(consumed_messages):
    assert message[1] == test_array[i][1]
    assert message[0] == test_array[i][0]
print("Consumed messages match produced messages.")

Consumed 13 messages from Kafka Cluster
Consumed messages match produced messages.


## Twitter Producer

In [8]:
test_array = []
start, timestamp_last_request = datetime.now(), datetime.now()

twitter_start = datetime.timestamp(datetime.now())

# consume messages from other sessions
_ = consume_xy(twitter_consumer, twitter_topic)

if __name__ == "__main__":
    while (datetime.now() - start).seconds < 21:
        # time limit to stay within api regulations
        if (datetime.now() - timestamp_last_request).seconds > 10:
            # get tweets
            tweets = json.loads(requests.get(url=url, headers={'Authorization': f"Bearer {bearerToken}"}).text)['data']
            for tweet in tweets:
                # formatting for json serializing
                formatted_tweet = json.dumps(tweet)
                
                # write to cluster
                key = str(uuid.uuid4())
                publish_message(producer, twitter_topic, key, formatted_tweet)
                print(f"{datetime.now()}: writing message to cluster: {formatted_tweet[:50]}...")
                
                # append to test_array
                test_array.append((key, formatted_tweet))
            
            timestamp_last_request = datetime.now()

Consumed 1900 messages from Kafka Cluster
Message published successfully.
2023-03-07 11:37:13.154971: writing message to cluster: {"author_id": "1451191701695909912", "text": "RT @...
Message published successfully.
2023-03-07 11:37:13.165359: writing message to cluster: {"author_id": "1602982306574123008", "text": "RT @...
Message published successfully.
2023-03-07 11:37:13.167047: writing message to cluster: {"author_id": "1598245017499291648", "text": "RT @...
Message published successfully.
2023-03-07 11:37:13.168276: writing message to cluster: {"author_id": "1505898808483491847", "text": "Pric...
Message published successfully.
2023-03-07 11:37:13.169807: writing message to cluster: {"author_id": "1514635813543239681", "text": "RT @...
Message published successfully.
2023-03-07 11:37:13.171177: writing message to cluster: {"author_id": "1592179954275348480", "text": "RT @...
Message published successfully.
2023-03-07 11:37:13.172588: writing message to cluster: {"author_id": "153

### Testing produced and consumed messages

In [9]:
# retrieve messages
consumed_messages = consume_xy(twitter_consumer, twitter_topic)

# testing if write & read operations work as intended
for i, message in enumerate(consumed_messages):
    assert message[1] == json.loads(test_array[i][1])
    assert message[0] == test_array[i][0]
print("Consumed messages match produced messages.")

Consumed 100 messages from Kafka Cluster
Consumed messages match produced messages.
