## Consumer Script for Loading Data from Kafka in MongoDB

In [3]:
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin import NewTopic
import pymongo
import threading
import json

In [4]:
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")       # connecting to Kafka

if "preise"not in admin.list_topics() and "stationen" not in admin.list_topics():       # checking if topics already exist
    newTopics = list()
    newTopics.append(NewTopic(name="preise", num_partitions=1, replication_factor=1))
    newTopics.append(NewTopic(name="stationen", num_partitions=1, replication_factor=1))
    admin.create_topics(newTopics)        # Adding preise and stationen to topics if they are not already existing

# creating preise consumer
preisConsumer = KafkaConsumer(bootstrap_servers=["localhost:9092"])
preisConsumer.subscribe("preise")

#creating stationen consumer
tankeConsumer = KafkaConsumer(bootstrap_servers=["localhost:9092"])
tankeConsumer.subscribe("stationen")

#connecting to MongoDB Collections
client = pymongo.MongoClient('localhost', 27017)
db = client.tanken
preise = db.preise
stationen = db.stationen

In [5]:

# function that gets executed for every consumer with parameters:
# consumer: the consumer to listen on
# collection: the reference to the MongoDB Collection in which the data should be inserted

def listenToConsumer(consumer, collection):
    print("started ",collection.name," listener.")
    for msg in consumer:                                 # loop gets executed for every new message 'msg' in the Queue
        item = json.loads(msg.value.decode("utf-8"))     # Decode message and convert to JSON-format
        print(collection.name, " msg: ",str(item))
        if collection==preise:                           # Inserting the message into the right collection
            collection.insert_one(item)
        elif collection==stationen:
            collection.update_one({"uuid":item["uuid"]}, {"$set": item}, upsert=True)
    print("finished consumer")


In [6]:
#starting Threads for each consumer so that they are both listening at the same time
threading.Thread(target=listenToConsumer, args=(preisConsumer, preise)).start()
threading.Thread(target=listenToConsumer, args=(tankeConsumer, stationen)).start()

started  preise  listener.
started  stationen  listener.
