## Set up enviroment

In [None]:
!pip install confluent-kafka

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install "pymongo[srv]"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Import Libraries

In [None]:
from confluent_kafka import Producer, Consumer, KafkaError
import uuid
import threading
import logging
import sys
import json

In [None]:
from pymongo import UpdateOne
import pymongo

In [None]:
import pandas as pd
import numpy as np
from numpy import empty

##Setting up Kafka Consumer

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s', 
                              '%m-%d-%Y %H:%M:%S')

file_handler = logging.FileHandler('kafka_consumer.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

In [None]:
# Create consumer
def get_consumer():
  c = Consumer({
    'bootstrap.servers': 'dory-01.srvs.cloudkafka.com:9094,dory-02.srvs.cloudkafka.com:9094,dory-03.srvs.cloudkafka.com:9094',
    'sasl.mechanism': 'SCRAM-SHA-256',
    'security.protocol': 'SASL_SSL',
    'sasl.username': 'guud457o',
    'sasl.password': 'BSsdvn57dzqwvDKQTTDDkNCLt75dS3Qw',
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'latest'
    #'error_cb': error_cb,
  })

  c.subscribe(['guud457o-counted-data'])

  return c

In [None]:
def prepare_kafka_message(key = None, value = None, headers = None):   
    
      try:
          key = key.decode('utf-8')
          
          value = value.decode('utf-8')
          value = json.loads(value)
          
          value.update({'_id':key})

          if headers is not None:
            headers = headers[0][1].decode()
          
          value.update({'headers':headers})

      except Exception as e:
          logger.error(f"Kafka message error: {e} message detail -> key: {key} - value: {value}")
          value = None
      
      return value

In [None]:
def convert_kafka_list(messages):

  list_of_messages = []
  for msg in messages:  

      if msg.error():
          if msg.error().code() == KafkaError._PARTITION_EOF:
              # End of partition event
              logger.error('%% %s [%d] reached end at offset %d\n' %(msg.topic(), msg.partition(), msg.offset()))
          elif msg.error():
              logger.error("Kafka error: {}".format(msg.error()))
      else: 
          body = prepare_kafka_message(msg.key(), msg.value(), msg.headers())
          
          list_of_messages.append(body)
  
  return list_of_messages

## Setting up Mongo Connection

In [None]:
client = pymongo.MongoClient("mongodb+srv://bootcamp:MjrSCGYPhfmbxc68@cluster0.w7ren.mongodb.net/admin")

In [None]:
db = client.get_database("bootcamp")
# db.list_collection_names()

In [None]:
def getDataFromMongo(db_name, collection, query):
  db = client.get_database(db_name)
  cursor = db[collection].find(query)

  data = [doc for doc in cursor]
  df = pd.json_normalize(data)
  
  return df

In [None]:
def write_mongo_bulk_upsert(db_name, collection, df):
  db = client.get_database(db_name)
  collect_name = db["%s" % (collection)]

  dictionary = df.to_dict(orient="records")

  upserts = [UpdateOne({'_id':x['_id']}, {"$set": x, "$inc" : {"Retail_CCQTY" : 1}} , upsert=True) for x in dictionary]
  response = collect_name.bulk_write(upserts)

  discrepancy(df)

  logger.info("COUNTED STOCK ITEM SAVED: %s", response)    

In [None]:
def drop_collection (db_name, collection):
  db = client.get_database(db_name)
  collect_name = db["%s" % (collection)]  
  collect_name.drop()


## Discrepancy

In [None]:
def write_mongo_bulk_upsert_discrepancy(db_name, collection, df):
  db = client.get_database(db_name)
  collect_name = db["%s" % (collection)]

  dictionary = df.to_dict(orient="records")

  upserts = [UpdateOne({'_id':x['Retail_Product_SKU']}, {"$set": x} , upsert=True) for x in dictionary]
  response = collect_name.bulk_write(upserts)

  logger.info("DISCREPANCY SAVED: %s", response)  

In [None]:
def discrepancy(df_counted):  
  query = {'Retail_Product_SKU':{"$in":df_counted['Retail_Product_SKU'].to_list()}}
  df_expected = getDataFromMongo(db_name="bootcamp", collection="expected_CRO", query=query)
  df_counted = getDataFromMongo(db_name="bootcamp", collection="counted_CRO", query=query)
  
  selected_columns = ['Retail_Product_Color','Retail_Product_Level1', 'Retail_Product_Level1Name','Retail_Product_Level2Name','Retail_Product_Level3Name','Retail_Product_Level4Name','Retail_Product_Name','Retail_Product_SKU','Retail_Product_Size','Retail_Product_Style', 'Retail_SOHQTY']
  df_A = df_expected[selected_columns]

  df_B = df_counted[["Retail_Product_SKU","Retail_CCQTY"]]

  df_discrepancy = pd.merge(df_A, df_B, how='outer', left_on='Retail_Product_SKU', right_on = 'Retail_Product_SKU', indicator = True)
  
  df_discrepancy['Retail_CCQTY'] = df_discrepancy['Retail_CCQTY'].fillna(0).astype(int) 
  df_discrepancy['Retail_SOHQTY'] = df_discrepancy['Retail_SOHQTY'].fillna(0).astype(int)

  #Create Diff column which is the difference between Retail_CCQTY and Retail SOHQTY
  df_discrepancy["Diff"] = df_discrepancy["Retail_CCQTY"] - df_discrepancy["Retail_SOHQTY"]

  #Create Unders column which is the absolute value of Diff values that are less than 0
  df_discrepancy.loc[df_discrepancy["Diff"]<0, "Unders"] = df_discrepancy["Diff"] * (-1)

  #Unders column fill NaN values with 0's and set type to int
  df_discrepancy["Unders"] = df_discrepancy["Unders"].fillna(0).astype(int)

  #Create Overs column which is the Diff values that are greater than 0
  df_discrepancy.loc[df_discrepancy["Diff"]>0, "Overs"] = df_discrepancy["Diff"]

  #Overs column fill NaN values with 0's and set type to int
  df_discrepancy["Overs"] = df_discrepancy["Overs"].fillna(0).astype(int)

  #Create Match column which stores a 0 if the inventories does not match and a 1 if the inventories match
  df_discrepancy.loc[df_discrepancy['Diff'] == 0, 'Match'] = 1
  df_discrepancy.loc[df_discrepancy['Diff'] != 0, 'Match'] = 0
  df_discrepancy["Match"] = df_discrepancy["Match"].astype(int)

  #SKUSide column show which sku inventory as values > 0
  df_discrepancy.loc[(df_discrepancy['Retail_CCQTY'] > 0) & (df_discrepancy['Retail_SOHQTY'] == 0), 'SKUSide'] = 'CC Only'
  df_discrepancy.loc[(df_discrepancy['Retail_SOHQTY'] > 0) & (df_discrepancy['Retail_CCQTY'] == 0), 'SKUSide'] = 'SOH Only'
  df_discrepancy.loc[(df_discrepancy['Retail_SOHQTY'] > 0) & (df_discrepancy['Retail_CCQTY'] > 0), 'SKUSide'] = 'SOH & CC'

  #Accuracy Calculation
  df_discrepancy['SKUAccuracy'] = df_discrepancy['Match'] / df_discrepancy['Retail_SOHQTY']
  df_discrepancy.loc[df_discrepancy['SKUAccuracy'] == np.inf, 'SKUAccuracy'] = 0

  df_discrepancy['ItemAccuracy'] = df_discrepancy['Retail_CCQTY'] / df_discrepancy['Retail_SOHQTY']
  df_discrepancy.loc[df_discrepancy['ItemAccuracy'] == np.inf, 'ItemAccuracy'] = 0

  df_discrepancy['UnitLevelAccuracy'] = (df_discrepancy['Retail_SOHQTY'] - df_discrepancy['Unders'] - df_discrepancy['Overs'] ) / df_discrepancy['Retail_SOHQTY']
  df_discrepancy.loc[df_discrepancy['UnitLevelAccuracy'] == -np.inf, 'UnitLevelAccuracy'] = 0  
  logger.info("DISCREPANCY CALCULATION COMPLETED")    

  write_mongo_bulk_upsert_discrepancy(db_name="bootcamp", collection="discrepancy_CRO", df=df_discrepancy)  




## Consume data and save it to MongoDB

Header set to CRO

In [None]:
try:
  c = get_consumer()
  count = 0
  while True:
      
      # msg = c.poll(1)
      # if msg is None: continue
      ##print(msg.key(), msg.value(), msg.error())
      msg = c.consume(num_messages=500, timeout=6)
      if len(msg)==0:
          continue
      
      msg = convert_kafka_list(msg)
      
      df = pd.DataFrame(msg)
      df = df[df["headers"]=="CRO"]      
      if not df.empty:        
        # print(msg)        
        write_mongo_bulk_upsert(db_name="bootcamp", collection="counted_CRO", df=df)
      else:
        print('done')

      c.commit(asynchronous=True)

except KeyboardInterrupt:
    pass

finally:
    # Leave group and commit final offsets
    c.close()

INFO:root:DISCREPANCY CALCULATION COMPLETED
INFO:root:DISCREPANCY SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4cd171e90>
INFO:root:COUNTED STOCK ITEM SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4cab88ed0>
INFO:root:DISCREPANCY CALCULATION COMPLETED
INFO:root:DISCREPANCY SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4c60ffa10>
INFO:root:COUNTED STOCK ITEM SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4c60f02d0>
INFO:root:DISCREPANCY CALCULATION COMPLETED
INFO:root:DISCREPANCY SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4cab8abd0>
INFO:root:COUNTED STOCK ITEM SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4c606c6d0>
INFO:root:DISCREPANCY CALCULATION COMPLETED
INFO:root:DISCREPANCY SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4c5f0d710>
INFO:root:COUNTED STOCK ITEM SAVED: <pymongo.results.BulkWriteResult object at 0x7fa4cd015d50>
INFO:root:DISCREPANCY CALCULATION COMPLETED
INFO:root:DISCREPANCY SAVED: <pymongo.results.Bu

In [None]:
# Delete Collection -- for testing
# drop_collection(db_name="bootcamp", collection="discrepancy_CRO")
# drop_collection(db_name="bootcamp", collection="counted_CRO")

In [None]:
query = {}
# query = {'Retail_Product_Level1':{"$eq":"H"}}
df_A = getDataFromMongo(db_name="bootcamp", collection="discrepancy_CRO", query=query)
df_A.shape

(3817, 22)

In [None]:
df_A.head()

Unnamed: 0,_id,Diff,ItemAccuracy,Match,Overs,Retail_CCQTY,Retail_Product_Color,Retail_Product_Level1,Retail_Product_Level1Name,Retail_Product_Level2Name,...,Retail_Product_Name,Retail_Product_SKU,Retail_Product_Size,Retail_Product_Style,Retail_SOHQTY,SKUAccuracy,SKUSide,Unders,UnitLevelAccuracy,_merge
0,9735510000,1,2.0,0,1,2,,H,ELETRO-ELETRONICOS,HZ,...,RELOGIO FEM MONDAINE PRATA KIT COLAR ANA,9735510000,,KIT FEMININO PRATA,1,0.0,SOH & CC,0,0.0,both
1,9639750000,3,2.0,0,3,6,,H,ELETRO-ELETRONICOS,HB,...,NOKIA C30 BRANCO 64GB NK043,9639750000,,OUTROS,3,0.0,SOH & CC,0,0.0,both
2,9216280000,1,2.0,0,1,2,,H,ELETRO-ELETRONICOS,HZ,...,RELOGIO LINCE FEMININO DOURADO P4 C PULS,9216280000,,KIT FEMININO DOURADO,1,0.0,SOH & CC,0,0.0,both
3,8687780000,3,2.0,0,3,6,,W,BELEZA,WB,...,JEQUITI SENSI SABONETE HIDRATANTE EM BAR,8687780000,,BARRA,3,0.0,SOH & CC,0,0.0,both
4,9392380001,2,1.666667,0,2,5,JEANS,C,VESTUARIO,CY,...,BERMUDA APP JEANS PS SLO INK CICLISTA BL,9392380001,46.0,BOTTOM CURTO JEANS PLUS,3,0.0,SOH & CC,0,0.333333,both
