In [1]:
!pip install psycopg2 --break-system-packages

Defaulting to user installation because normal site-packages is not writeable


# Consumer for soh streamline

The notebook's purpose is to create a subscriber for `soh-topic` where data related to soh is processed

In [2]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import pandas as pd
import sys
sys.path.append('../../libraries')
import utils

from json import loads
from dotenv import load_dotenv
import os

import psycopg2
from psycopg2 import sql

### Defining the connection to datawarehouse

In [3]:
db_params = {
    'dbname': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
}

In [4]:
def insert_row(soh_record):
    """
    Insert soh row to datawarehouse with postgres connection

    Params:
    soh_record (dict): Row record of soh, processed before
    """
    try: 
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        insert_query = """
            INSERT INTO soh (site_code, sku, quantity, date) 
            VALUES (%s, %s, %s, %s)
        """

        cursor.execute(insert_query, (
            soh_record['site_code'], 
            soh_record['sku'], 
            soh_record['quantity'], 
            soh_record['date']
        ))

        conn.commit()

        verify_query = """
            SELECT * FROM soh 
            WHERE site_code = %s AND sku = %s AND date = %s
        """
        cursor.execute(verify_query, (
            soh_record['site_code'],
            soh_record['sku'],
            soh_record['date']
        ))

        inserted_record = cursor.fetchone()

        if inserted_record:
            print(f"Inserted record successfully: {inserted_record}")
        else:
            print("Record not found after insertion.")        
    except Exception as e:
        print(e)
        conn.rollback()
    finally: 
        cursor.close()
        conn.close()

### Kafka Consumer for `soh-topic`

In [5]:
consumer = Consumer({
    'bootstrap.servers': os.getenv('KAFKA_SERVER'),
    'group.id': 'soh_consumer_group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,            
})

topic = 'soh_topic'
consumer.subscribe([topic])

In [6]:
def parse_date(date):
    """Function to parse the date format to SQL storage"""
    date = pd.to_datetime(date)
    if date is None or pd.isna(date): 
        return pd.to_datetime('today')
    return date.strftime('%Y-%m-%d')

In [7]:
def pipeline(data):
    """Process soh rows and handling special cases"""
    if not isinstance(data, dict):
        return dict()
    if 'date' not in data.keys(): 
        data['date'] = 'today'

    if 'quantity' not in data.keys() or data['quantity'] is None:
        data['quantity'] = 0
    
    data['date'] = parse_date(data['date'])
    
    for col in ['site_code', 'store']:
        if col in data.keys() and data[col] is not None:
            data['site_code'] = data[col].upper()
    
    data['quantity'] = max(data['quantity'], 0)
    
    return data

In [8]:
def process_message(msg):
    """
    Function to process Kafka messages

    Params:
    msg (cimp.Message): Message to process
    """
    try:
        soh_record = loads(msg.value().decode('utf-8'))

        soh_record = pipeline(soh_record)
    
        cols = ['sku', 'date', 'quantity', 'site_code']
    
        if set(cols).issubset(soh_record.keys()) and all(soh_record[key] is not None for key in cols):
            insert_row(soh_record)
        print(f"Processed message: {soh_record}")
    except Exception as e:
        print(e)

In [9]:
def consume_messages():
    try:
        while True:
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                continue  # no message

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached {msg.partition}, offset {msg.offset}")
                else:
                    raise KafkaException(msg.error())
            else:
                process_message(msg)

                consumer.commit(message=msg, asynchronous=False)
    except KeyboardInterrupt:
        print("Consuming interrupted.")
    finally:
        consumer.close()

In [10]:
consume_messages()

Consuming interrupted.
