In [14]:
import os  
import pandas as pd  
import numpy as np  
from datetime import datetime, timedelta  
from fredapi import Fred 
import logging  
import smtplib  

In [15]:
API_KEY = os.getenv("<Fred API>")  
START_YEAR = 2024  
NUM_PRODUCTS = 50  
OUTPUT_PATH = "merged_data.csv" 
EMAIL_RECIPIENTS = ["amitdubeypa@gmail.com"] 

In [None]:
pip install fredapi

In [16]:
def generate_week_starts(year):
    idx = pd.date_range(
        start=f"{year}-01-01",
        end=f"{year}-12-31",
        freq="W-SUN"     # Weekly on Sunday
    )
    return list(idx.to_pydatetime())

In [17]:
def create_product_catalog(n):
    return pd.DataFrame({
        'Product ID': range(1, n+1),
        'Product Name': [f"Product_{i}" for i in range(1, n+1)]
    })


In [18]:
def generate_sales_data(week_starts, catalog):
    records = []
    for week_idx, week_start in enumerate(week_starts):
        # Seasonality: base units fluctuate sinusoidally over the year
        seasonal_factor = 1 + 0.3 * np.sin(2 * np.pi * week_idx / len(week_starts))
        for _, prod in catalog.iterrows():
            base_units = 100 + 20 * np.random.randn()  
            units_sold = max(0, base_units * seasonal_factor)
            price = round(10 + 2 * np.random.rand(), 2)  
            discount = np.random.choice([0, 0.1, 0.2], p=[0.7, 0.2, 0.1])
            revenue = units_sold * price * (1 - discount)
            records.append({
                'Week Start Date': week_start,
                'Product ID': prod['Product ID'],
                'Product Name': prod['Product Name'],
                'Units Sold': round(units_sold),
                'Price ($)': price,
                'Discount Percentage': discount * 100,
                'Revenue ($)': round(revenue, 2),
                'Region': 'USA'
            })
    return pd.DataFrame(records)

In [19]:
def fetch_economic_data(start_date, end_date, api_key):
    fred = Fred(api_key=api_key)
    gas = fred.get_series('GASREGW', observation_start=start_date, observation_end=end_date)
    cpi = fred.get_series('CPIAUCSL', observation_start=start_date, observation_end=end_date)
    
    df_gas = gas.resample('W-SUN').mean().rename('Gas Price')
    df_cpi = cpi.resample('W-SUN').ffill().rename('CPI')
    
    econ = pd.concat([df_gas, df_cpi], axis=1).reset_index()
    econ.rename(columns={'index': 'Week Start Date'}, inplace=True)
    return econ

In [20]:
def run_integrity_checks(df):
    errors = []
    # 1) Missing in any critical column?
    if df[['Units Sold','Price ($)','Discount Percentage','Revenue ($)']].isna().any(axis=None):
        errors.append("Missing values in sales fields.")  
    # 2) Outliers: Z‑score >3 or <–3
    zs = (df['Units Sold'] - df['Units Sold'].mean()) / df['Units Sold'].std()
    if (zs.abs() > 3).any():
        errors.append("Spikes/dips in Units Sold.")   

    # 3) Any NAs in economic columns?
    if df[['Gas Price','CPI']].isna().any(axis=None):
        errors.append("Missing economic data post-merge.") 

    return errors

In [24]:
def send_alert(subject, message):
    logging.basicConfig(filename='data_pipeline1.log', level=logging.ERROR)
    logging.error(f"{subject}: {message}")
   
    # try:
    #     with smtplib.SMTP('localhost') as server:
    #         email_text = f"Subject: {subject}\n\n{message}"
    #         for recipient in EMAIL_RECIPIENTS:
    #             server.sendmail('amitdubeypa@gmail.com', recipient, email_text)
    # except Exception as e:
    #     logging.error(f"Failed to send email alert: {e}")
    logging.shutdown()

In [22]:
def refresh_and_merge():
    # Generate or load sales data
    week_starts = generate_week_starts(START_YEAR)
    catalog = create_product_catalog(NUM_PRODUCTS)
    sales_df = generate_sales_data(week_starts, catalog)

    # Fetch economic indicators
    econ_df = fetch_economic_data(
        start_date=min(week_starts),
        end_date=max(week_starts),
        api_key="<Fred API>"
    )

    # Merge datasets on Week Start Date
    merged = pd.merge(sales_df, econ_df, on='Week Start Date', how='left')

    # Run integrity checks
    errors = run_integrity_checks(merged)
    if errors:
        send_alert("Data Integrity Issues", "; ".join(errors))

    # Save final dataset
    merged.to_csv(OUTPUT_PATH, index=False)
    print(f"Merged data saved to {OUTPUT_PATH}")

In [23]:
refresh_and_merge()

Merged data saved to merged_data.csv
