In [None]:
import json, pika, sys, os

In [1]:
# Define a function to load JSON data from a file
def loadJson(filename):
    # Open the specified file in read mode, specifying utf-8 encoding
    with open(filename, 'r', encoding='utf-8') as file:
        # Load the JSON data from the file into a Python dictionary
        data = json.load(file)
    # Return the loaded data
    return data

In [None]:
def isValidJson(myStr):
    try:
        json.loads(myStr)
        return True
    except ValueError:
        return False

def isValidDict(myStr):
    # Check if string starts and ends with curly braces
    if myStr.startswith('{') and myStr.endswith('}'):
        try:
            # Attempt to evaluate the string as a dictionary
            if isValidJson(myStr):
                return True
            else:
                return False
        except (SyntaxError, NameError):
            return False
    else:
        return False

def filterNonJsonValues(inputJson):
    dictWithNonJsonValues = {}
    for key, value in inputJson.items():
        if isinstance(value, str):
            if not isValidDict(value):
                dictWithNonJsonValues[key] = value
        else:
            dictWithNonJsonValues[key] = value
    return dictWithNonJsonValues

In [None]:
def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

def produceKafkaMessages(producer, topic, data, threshold):
    for index, (key, value) in enumerate(data, start=1):
        # Convert data to JSON
        jsonData = json.dumps(value, indent=4)

        # Produce the message to Kafka
        producer.produce(topic, key=key, value=jsonData, callback=acked)
        producer.flush()

        if index % threshold == 0:
            break

In [None]:
def consumeAndProcessKafkaMessages(consumer, topic, serviceTopicMapping, batchSize, timeout):
    consumer.subscribe([topic])

    try:
        msgCount = 0
        minCommitCount = 10

        while True:
            messages = consumer.consume(batchSize, timeout=timeout)
            if messages is None:
                continue

            if not messages:
                # No messages received
                continue

            for msg in messages:
                if msg.error():
                    print(f"Error consuming message: {msg.error()}")
                else:
                    # Parse the received message
                    value = msg.value().decode('utf-8')
                    jsonValue = json.loads(value)

                    print(f" [x] Received {value}")
                    serviceName = jsonValue['SERVICENAME']
                    try:
                        rabbitMqQueueToSend = serviceTopicMapping[serviceName]
                    except KeyError:
                        # Handle the case where 'SERVICENAME' is not found in jsonValue
                        print(f"{serviceName} not found in rules.")
                        rabbitMqQueueToSend = serviceTopicMapping['Default']  # or some default value

                    body = json.dumps(filterNonJsonValues(jsonValue), indent=4)
                    rabbitMqProducer(body=body, queueName=rabbitMqQueueToSend)
                    #print(f"{jsonValue['SERVICENAME']}")

                    msgCount += 1
                    #Commit the message offset
                    if msgCount % minCommitCount == 0:
                        consumer.commit(asynchronous=False)

    except KeyboardInterrupt:
        pass
    finally:
        # Close the consumer gracefully
        consumer.close()

In [None]:
def rabbitMqProducer(body, queueName='sample', host='127.0.0.1', port=5672, username='guest', password='guest', virtual_host='/', erase_on_connect=True):
    # If you want to have a more secure SSL authentication, use ExternalCredentials object instead
    credentials = pika.PlainCredentials(username=username, password=password, erase_on_connect=erase_on_connect)
    parameters = pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)

    # We are using BlockingConnection adapter to start a session. It uses a procedural approach to using Pika and has most of the asynchronous expectations removed
    connection = pika.BlockingConnection(parameters)
    # A channel provides a wrapper for interacting with RabbitMQ
    channel = connection.channel()

    # Check for a queue and create it, if necessary
    channel.queue_declare(queue=queueName)

    # For the sake of simplicity, we are not declaring an exchange, so the subsequent publish call will be sent to a Default exchange that is predeclared by the broker
    channel.basic_publish(exchange='', routing_key=queueName, body=body)
    print(f" [x] Sent {body}")

    # Safely disconnect from RabbitMQ
    connection.close()

In [None]:
def rabbitMqConsumer(queue_name='sample', host='127.0.0.1', port=5672, username='guest', password='guest', virtual_host='/',):
    credentials = pika.PlainCredentials(username, password)
    parameters = pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)
    
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)
    
    # Since RabbitMQ works asynchronously, every time you receive a message, a callback function is called. We will simply print the message body to the terminal 
    def callback(ch, method, properties, body):
        print(f" [x] Received {body.decode('utf-8')}")
        #print(f" [x] Received {type(body.decode('utf-8'))}")
        #print(' [*] Waiting for messages. To exit press CTRL+C')

    # Consume a message from a queue. The auto_ack option simplifies our example, as we do not need to send back an acknowledgement query to RabbitMQ which we would normally want in production
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    try:
        # Start listening for messages to consume
        channel.start_consuming()
    except KeyboardInterrupt:
        print("Interrupted")
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)