Test MySQL

In [None]:
import sys
import os
import json
import logging
from kafka import KafkaConsumer, KafkaProducer
from json import loads
import random
import datetime
import mysql.connector
import boto3
from logging.handlers import RotatingFileHandler

In [None]:
def mysql_consumer():
    mysql_conn = mysql.connector.connect(
        host='localhost',
        user='mysql',
        password='mysql',
        database='oaken'
    )
    mysql_cursor = mysql_conn.cursor()

    # Create a consumer instance
    consumer = KafkaConsumer(
        'mysql',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',  # Start consuming from the earliest offset
        enable_auto_commit=True,       # Automatically commit offsets
        group_id='my_consumer_group',  # Specify a consumer group
        value_deserializer=lambda x: loads(x.decode('utf-8')))

    consumer.subscribe(topics=['mysql'])

    # Poll for messages
    try:
        for message in consumer:
            try:
                data = message.value
                # Customer
                storNumber = int(data.get('StoreNumber', ''))
                if not storNumber:
                    logger.error("StoreNumber is null or invalid. Skipping insertion.")
                    continue

                storeName = data.get('StoreName', '')
                address = data.get('Address', '')
                city = data.get('City', '')
                county = data.get('County', '')
                state = data.get('State', '')
                zip_code = int(data.get('ZipCode', ''))

                # vendor
                vendorNumber = int(float(data.get('VendorNumber', '')))
                if not vendorNumber:
                    logger.error("VendorNumber is null or invalid. Skipping insertion.")
                    continue

                vendorName = data.get('VendorName', '')

                # category
                category = int(float(data.get('Category','')))
                if not category:
                    logger.error("Category is null or invalid. Skipping insertion.")
                    continue

                categoryName = (data.get('CategoryName',''))

                # product
                itemNumber = int(data.get('ItemNumber', ''))
                if not itemNumber:
                    logger.error("ItemNumber is null or invalid. Skipping insertion.")
                    continue
                itemDescription = data.get('ItemDescription', '')
                pack = int(data.get('Pack', ''))
                volume = int(data.get('BottleVolumeML', ''))
                cost = float(data.get('BottleCost', '').replace('$', ''))
                retail = float(data.get('BottleRetail', '').replace('$', ''))

                # Sales
                invoice = data.get('Invoice', '')
                if not invoice:
                    logger.error("Invoice is null or invalid. Skipping insertion.")
                    continue

                date_string = data.get('Date', '')
                if not date_string:
                    logger.error("Date is null or invalid. Skipping insertion.")
                    continue

                sales_date = datetime.datetime.strptime(date_string, '%m/%d/%Y').date()

                amountSold = int(data.get('BottlesSold', ''))
                totalLiters = float(data.get('VolumeSoldLiters', ''))

                sales = float(data.get('SaleDollars', '').replace('$', ''))

                # MySQL
                '''
                Multiple try/except blocks are used due to the simplistic invoicing
                application script.

                In a real world scenario, likely each of these blocks would be done
                as a truly separate process.

                The try/except prevents the duplicates failing to be added to the
                database from preventing the rest of the processes from competing.
                '''

                try:
                    CUSTOMER_QUERY = '''
                        INSERT INTO customer (StoreNumber,StoreName,Address,City,CountyName,State,ZipCode)
                        VALUES (%s,%s,%s,%s,%s,%s,%s)
                        '''
                    customer_data = (storNumber,storeName,address,city,county,state,zip_code)
                    mysql_cursor.execute(CUSTOMER_QUERY, customer_data)
                    mysql_conn.commit()
                except Exception as e:
                    continue

                try:
                    VENDOR_QUERY = '''
                        INSERT INTO vendor (VendorNumber,VendorName)
                        VALUES (%s,%s)
                        '''
                    vendor_data = (vendorNumber,vendorName)
                    mysql_cursor.execute(VENDOR_QUERY,vendor_data)
                    mysql_conn.commit()
                except Exception as e:
                    continue

                try:
                    CATEGORY_QUERY = '''
                        INSERT INTO category (CategoryNumber,CategoryName)
                        VALUES (%s,%s)
                        '''
                    category_data = (category,categoryName)
                    mysql_cursor.execute(CATEGORY_QUERY, category_data)
                    mysql_conn.commit()
                except Exception as e:
                    print(f"Error processing message: {e}")
                    continue

                try:
                    PRODUCT_QUERY = '''
                        INSERT INTO product (ItemNumber,CategoryNumber,ItemDescription,BottleVolumeML,
                        Pack,BottleCost,BottleRetail)
                        VALUES (%s,%s,%s,%s,%s,%s,%s,)
                        '''
                    product_data = (itemNumber,category,itemDescription,volume,pack,cost,retail)
                    mysql_cursor.execute(PRODUCT_QUERY, product_data)
                    mysql_conn.commit()
                except Exception as e:
                    print(f"Error processing message: {e}")
                    continue

                try:
                    SALES_QUERY = '''
                    INSERT INTO sales (Invoice,StoreNumber,VendorNumber,SaleDate,SaleDollars,
                    ItemNumber,VolumeSoldLiters,BottlesSold)
                    VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
                    '''
                    sales_data = (invoice,storNumber,vendorNumber,sales_date,sales,itemNumber,
                                    totalLiters,amountSold)
                    mysql_cursor.execute(SALES_QUERY, sales_data)
                    mysql_conn.commit()

                    # Topic should post after MySQL processing to ensure data is in the database.
                    INVOICES_info = {
                        'Invoice': invoice,
                        'SaleDate': sales_date,
                        'saleDollars': sales
                    }

                    invoice_producer.send(INVOICES_TOPIC, value=INVOICES_info)
                    invoice_producer.flush()
                except Exception as e:
                    print(f"Error processing message: {e}", exc_info=True)
                    continue

            except Exception as e:
                print(f"Error in Kafka consumer: {e}", exc_info=True)
                continue

    # Close the consumer
    finally:
        consumer.close()
        mysql_conn.close()

In [None]:
mysql_consumer()

In [None]:
import pandas as pd

df = pd.read_csv("data/Iowa_Liquor_Sales.csv", nrows=10)

print(df)

In [None]:
print(df.columns)

In [None]:
# sample output
'''
{'Invoice/Item Number': 'S29198800001', 'Date': '11/20/2015', 'Store Number': '2191', 'Store Name': 'Keokuk Spirits', 
'Address': '1013 MAIN', 'City': 'KEOKUK', 'Zip Code': '52632', 'Store Location': '1013 MAIN\nKEOKUK 52632\n(40.39978, -91.387531)', 
'County Number': '56', 'County': 'Lee', 'Category': '', 'Category Name': '', 'Vendor Number': '255', 'Vendor Name': 'Wilson Daniels Ltd.', 
'Item Number': '297', 'Item Description': 'Templeton Rye w/Flask', 'Pack': '6', 'Bottle Volume (ml)': '750', 'State Bottle Cost': '$18.09', 
'State Bottle Retail': '$27.14', 'Bottles Sold': '6', 'Sale (Dollars)': '$162.84', 'Volume Sold (Liters)': '4.50', 'Volume Sold (Gallons)': '1.19'}
'''

In [None]:
with open("../data/Iowa_Liquor_Sales.csv", 'r') as file:
    reader = csv.DictReader(file)
    while True:
        chunk = []
        for _ in range(1000):
            try:
                chunk.append(next(reader))
            except StopIteration:
                break
        if not chunk:
            break
        for row in chunk:
            print(row)

In [None]:
# money conversion test
data = {'SaleDollars': '$12.34'}

sales = float(data.get('SaleDollars','').replace('$', ''))

sales

In [None]:
# datetime test
from datetime import datetime

date_data = {"Date":"11/09/2015"}

date_string = date_data.get('Date', '')
invoice_date = datetime.strptime(date_string,'%m/%d/%Y').date()

print(invoice_date)
