In [17]:
import pandas as pd
from sodapy import Socrata
import json
# from datetime import datetime, timedelta
import datetime
import pymysql

In [18]:
# Socrata API 
with open('config\socrata_config.json') as f:
  socrata_config = json.load(f)

AppToken = socrata_config['app_token']
UserName = socrata_config['user_name']
Password = socrata_config["password"]

client = Socrata("data.iowa.gov",
                 AppToken,
                 username = UserName,
                 password = Password,
                 timeout=30)


In [19]:
# MySQL connection settings
with open('config\mysql_config.json') as f:
  mysql_config = json.load(f)

host = mysql_config['hostname']
user = mysql_config['username']
password = mysql_config['password']



In [20]:
# Placeholder and data type conversion dictionaries
with open('dicts/placeholders.json', 'r') as f:
    placeholders = json.load(f)

with open('dicts/num_col_dtype_map.json', 'r') as f:
    num_col_dtype_map = json.load(f)

In [21]:
# Function for extracting data via Socrata API
def extract_data(client, f_year, batch_size, offset):
    start_date = f"{f_year - 1}-07-01T00:00:00.000" 
    today = datetime.datetime.now()
    end_date = datetime.datetime(today.year, today.month, 1).strftime('%Y-%m-%dT%H:%M:%S.%f')
    results = client.get("m3tr-qhgy",
                         select=col_selected, 
                         where=f"(LOWER(name) LIKE '%hy-vee%' OR name LIKE '%WALL TO WALL WINE AND SPIRITS%') AND date >= '{start_date}' AND date < '{end_date}'", 
                         limit=batch_size, 
                         offset=offset)
    return results

In [22]:
# Function for transforming data 
def transform_data(df):
    # df = df.drop_duplicates()
    df.fillna(placeholders, inplace=True)
    df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
    for col, col_type in num_col_dtype_map.items():
        if col_type == 'int':
            df[col] = df[col].astype(float).astype(int)
        else:
            df[col] = df[col].astype(float)
    df = df[(df['state_bottle_cost'] > 0) & (df['state_bottle_retail'] > 0) & (df['sale_bottles'] > 0)]
    return df

In [23]:
# Function for loading data to a MySQL database
def load_data(conn, cursor, df, batch_size, sql_insert_query):
    data_tuples = list(df.itertuples(index=False, name=None))
    for batch in [data_tuples[i:i + batch_size] for i in range(0, len(data_tuples), batch_size)]:
        for row in batch:
            try:
                cursor.execute(sql_insert_query, row)
            except pymysql.err.IntegrityError as e:
                if 'Duplicate entry' in str(e):
                    # Log the error and skip the duplicated row
                    print(f"Duplicate entry skipped: {row}")
                    continue
                else:
                    raise
        conn.commit()

In [24]:
load_sql = """
INSERT INTO sales (
    invoice_line_no, date, store, name, address, city, zipcode, county, category, category_name, vendor_no, vendor_name, itemno, im_desc, bottle_volume_ml, state_bottle_cost, state_bottle_retail, sale_bottles
) 
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

In [25]:
col_selected = 'invoice_line_no, date, store, name, address, city, zipcode, county, category, category_name, vendor_no, vendor_name, itemno, im_desc, bottle_volume_ml, state_bottle_cost, state_bottle_retail, sale_bottles'


In [27]:
# Establish connections to both STG_HYVEE and INT_HYVEE databases
conn_stg = pymysql.connect(host=host, user=user, password=password, db='STG_HYVEE')
cursor_stg = conn_stg.cursor()

conn_int = pymysql.connect(host=host, user=user, password=password, db='INT_HYVEE')
cursor_int = conn_int.cursor()

# Start ETL procedure
cursor_stg.execute("SELECT MAX(date) FROM sales")
latest_date_result = cursor_stg.fetchone()

today = datetime.datetime.now()

batch_size = 10000

# If records exist in the database, load the data following the most recent entry
if latest_date_result and latest_date_result[0]:
    latest_date = latest_date_result[0]
    start_date = latest_date.strftime('%Y-%m-%dT%H:%M:%S.%f')
    print("Start Date:", start_date)

    end_date = datetime.datetime(today.year, today.month, 1).strftime('%Y-%m-%dT%H:%M:%S.%f')
    print("End Date:", end_date)

    results = client.get("m3tr-qhgy",
                        select=col_selected,
                        where=f"(LOWER(name) LIKE '%hy-vee%' OR name LIKE '%WALL TO WALL WINE AND SPIRITS%') AND date >= '{start_date}' AND date < '{end_date}'",
                        limit=batch_size
                        )
    
    df = pd.DataFrame.from_records(results)
    df_transformed = transform_data(df)

    load_data(conn_stg, cursor_stg, df_transformed, batch_size, load_sql)
    load_data(conn_int, cursor_int, df_transformed, batch_size, load_sql)

    cursor_stg.close()
    conn_stg.close()

    cursor_int.close()
    conn_int.close()

# If the database contains no records, then load data starting from three fiscal years ago
else:
    # Determine FY based on the current month (FY starts in July)
    if today.month < 7:
        current_f_year = today.year 
    else:
        current_f_year = today.year + 1

    start_f_year = current_f_year -3

    for f_year in range(start_f_year, current_f_year +1):
        offset = 0
        more_data = True

        while more_data:
            results = extract_data(client, f_year, batch_size, offset)

            if not results:
                more_data = False
            else:
                offset += len(results)
                df = pd.DataFrame.from_records(results)
                df_transformed = transform_data(df)

                load_data(conn_stg, cursor_stg, df_transformed, batch_size, load_sql)
                load_data(conn_int, cursor_int, df_transformed, batch_size, load_sql)

    cursor_stg.close()
    conn_stg.close()

    cursor_int.close()
    conn_int.close()



Start Date: 2023-12-30T00:00:00.000000
End Date: 2024-02-01T00:00:00.000000
Duplicate entry skipped: ('INV-65808500001', '2023-12-30', 6208, 'HY-VEE FAST AND FRESH / GRIMES', '1401 1ST STREET', 'GRIMES', 50111, 'POLK', 1011200, 'STRAIGHT BOURBON WHISKIES', 65, 'JIM BEAM BRANDS', 19061, 'JIM BEAM MINI', 50, 7.0, 10.5, 3)
Duplicate entry skipped: ('INV-65807200063', '2023-12-30', 2712, 'HY-VEE / DECORAH', '915 SHORT ST #107', 'DECORAH', 52101, 'WINNESHIEK', 1011100, 'BLENDED WHISKIES', 240, 'WILLIAM GRANT & SONS INC', 5844, 'MONKEY SHOULDER', 1750, 35.5, 53.25, 6)
Duplicate entry skipped: ('INV-65800400030', '2023-12-30', 2507, 'HY-VEE FOOD STORE #1 (1042) / BURLINGTON', '939 ANGULAR', 'BURLINGTON', 52601, 'DES MOINES', 1081400, 'AMERICAN SCHNAPPS', 434, 'LUXCO INC', 80578, 'ARROW PEPPERMINT SCHNAPPS', 1750, 7.5, 11.25, 6)
Duplicate entry skipped: ('INV-65799200065', '2023-12-30', 2506, 'HY-VEE #2 (1044) / BURLINGTON', '3140 AGENCY', 'BURLINGTON', 52601, 'DES MOINES', 1011100, 'BLENDED W