In [4]:
!pip install -q pandas sqlalchemy openpyxl
print("✅ Installed: pandas, sqlalchemy, openpyxl")

✅ Installed: pandas, sqlalchemy, openpyxl


In [6]:
import os
import shutil
import time
import logging
from datetime import datetime

import pandas as pd
from sqlalchemy import create_engine, text
from google.colab import files as gfiles
from IPython.display import display

WATCH_FOLDER = "data"
PROCESSED_FOLDER = "processed"
FAILED_FOLDER = "failed"
DB_FILE = "sales.db"
TABLE_NAME = "sales"
PROCESSED_TABLE = "processed_files"
POLL_SECONDS = 5
SUPPORTED_EXTENSIONS = (".csv", ".xlsx", ".xls")

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")



In [7]:
for folder in (WATCH_FOLDER, PROCESSED_FOLDER, FAILED_FOLDER):
    os.makedirs(folder, exist_ok=True)
print("✅ Folders ready:", WATCH_FOLDER, PROCESSED_FOLDER, FAILED_FOLDER)



✅ Folders ready: data processed failed


In [8]:
print("⬆️ Upload your sales.csv file...")
uploaded = gfiles.upload()
for fname in uploaded.keys():
    dest = os.path.join(WATCH_FOLDER, fname)
    if os.path.exists(dest):
        os.remove(dest)
    os.rename(fname, dest)
    print(f"✅ Moved {fname} → {dest}")
print("Files in data/:", os.listdir(WATCH_FOLDER))


⬆️ Upload your sales.csv file...


Saving sales.csv to sales.csv
✅ Moved sales.csv → data/sales.csv
Files in data/: ['sales.csv']


In [10]:
 Helper Functions for DB & File Processing

def create_db_engine(db_file=DB_FILE):
    """Create a connection to the SQLite database using SQLAlchemy."""
    return create_engine(f"sqlite:///{db_file}", future=True)

def init_db(engine):
    """Create processed_files table if it doesn't exist."""
    with engine.begin() as conn:
        conn.execute(text(f"""
            CREATE TABLE IF NOT EXISTS {PROCESSED_TABLE} (
                filename TEXT PRIMARY KEY,
                processed_at TEXT
            )
        """))

def has_been_processed(engine, filename):
    """Check if this filename already exists in processed_files table."""
    with engine.connect() as conn:
        return conn.execute(
            text(f"SELECT 1 FROM {PROCESSED_TABLE} WHERE filename = :fn"),
            {"fn": filename}
        ).first() is not None

def mark_processed(engine, filename):
    """Add this filename to processed_files with the current timestamp."""
    with engine.begin() as conn:
        conn.execute(
            text(f"INSERT OR REPLACE INTO {PROCESSED_TABLE} VALUES (:fn, :ts)"),
            {"fn": filename, "ts": datetime.utcnow().isoformat()}
        )

def read_file_to_df(filepath):
    """Read CSV or Excel file into a Pandas DataFrame (with encoding fallback)."""
    ext = os.path.splitext(filepath)[1].lower()
    if ext == ".csv":
        try:
            return pd.read_csv(filepath, encoding="utf-8")
        except:
            return pd.read_csv(filepath, encoding="latin1")
    elif ext in (".xlsx", ".xls"):
        return pd.read_excel(filepath)
    else:
        raise ValueError(f"Unsupported file: {ext}")

def deduplicate_dataframe(df, engine, unique_col="ORDERNUMBER"):
    """
    Remove rows from DataFrame whose unique_col already exists in the DB.
    Defaults to ORDERNUMBER as the unique identifier.
    """
    if unique_col not in df.columns:
        return df
    try:
        existing = pd.read_sql(f"SELECT {unique_col} FROM {TABLE_NAME}", con=engine)
        existing_ids = set(existing[unique_col].dropna().astype(str))
    except:
        existing_ids = set()
    mask_new = ~df[unique_col].astype(str).isin(existing_ids)
    return df[mask_new].copy()

def process_file(engine, filepath):
    """
    Process a single file:
    - Skip if already processed
    - Read file into DataFrame
    - Clean column names
    - Deduplicate based on ORDERNUMBER
    - Insert into DB
    - Move file to processed/ or failed/
    """
    filename = os.path.basename(filepath)
    logging.info(f"Processing {filename}")
    try:
        if has_been_processed(engine, filename):
            shutil.move(filepath, os.path.join(PROCESSED_FOLDER, filename))
            return
        df = read_file_to_df(filepath)
        df.columns = [c.strip() for c in df.columns]
        df_to_insert = deduplicate_dataframe(df, engine, "ORDERNUMBER")
        with engine.begin() as conn:
            df_to_insert.to_sql(TABLE_NAME, conn, if_exists="append", index=False)
        mark_processed(engine, filename)
        shutil.move(filepath, os.path.join(PROCESSED_FOLDER, filename))
    except Exception as e:
        logging.exception(f"Failed {filename}: {e}")
        shutil.move(filepath, os.path.join(FAILED_FOLDER, filename))

def scan_and_process_once():
    """Loop through all files in data/ and process them."""
    for fname in sorted(os.listdir(WATCH_FOLDER)):
        if fname.lower().endswith(SUPPORTED_EXTENSIONS):
            process_file(engine, os.path.join(WATCH_FOLDER, fname))


In [11]:



# This creates the DB engine, ensures the tracking table exists,
# and processes everything currently in data/.

engine = create_db_engine(DB_FILE)
init_db(engine)
scan_and_process_once()
print("✅ Processing complete")


✅ Processing complete


In [12]:

# Show first 5 rows from sales table and processed_files table.

print("=== Sales table preview ===")
display(pd.read_sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5", con=engine))

print("=== Processed files table ===")
display(pd.read_sql(f"SELECT * FROM {PROCESSED_TABLE}", con=engine))


=== Sales table preview ===


Unnamed: 0,ORDERNUMBER,QUANTITYORDERED,PRICEEACH,ORDERLINENUMBER,SALES,ORDERDATE,STATUS,QTR_ID,MONTH_ID,YEAR_ID,...,ADDRESSLINE1,ADDRESSLINE2,CITY,STATE,POSTALCODE,COUNTRY,TERRITORY,CONTACTLASTNAME,CONTACTFIRSTNAME,DEALSIZE
0,10107,30,95.7,2,2871.0,2/24/2003 0:00,Shipped,1,2,2003,...,897 Long Airport Avenue,,NYC,NY,10022.0,USA,,Yu,Kwai,Small
1,10121,34,81.35,5,2765.9,5/7/2003 0:00,Shipped,2,5,2003,...,59 rue de l'Abbaye,,Reims,,51100.0,France,EMEA,Henriot,Paul,Small
2,10134,41,94.74,2,3884.34,7/1/2003 0:00,Shipped,3,7,2003,...,27 rue du Colonel Pierre Avia,,Paris,,75508.0,France,EMEA,Da Cunha,Daniel,Medium
3,10145,45,83.26,6,3746.7,8/25/2003 0:00,Shipped,3,8,2003,...,78934 Hillside Dr.,,Pasadena,CA,90003.0,USA,,Young,Julie,Medium
4,10159,49,100.0,14,5205.27,10/10/2003 0:00,Shipped,4,10,2003,...,7734 Strong St.,,San Francisco,CA,,USA,,Brown,Julie,Medium


=== Processed files table ===


Unnamed: 0,filename,processed_at
0,sales.csv,2025-08-08T08:31:31.372154


In [13]:

# Downloads sales.db to your computer for submission or backup.

gfiles.download(DB_FILE)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>