In [1]:
ls

 Volume in drive C is Windows
 Volume Serial Number is 2283-99E3

 Directory of C:\Users\Mohan\retail_pipeline

03-12-2025  11:49    <DIR>          .
03-12-2025  10:43    <DIR>          ..
03-12-2025  11:49    <DIR>          .ipynb_checkpoints
03-12-2025  10:43    <DIR>          data
03-12-2025  10:43    <DIR>          docs
03-12-2025  10:43    <DIR>          notebooks
03-12-2025  11:49    <DIR>          scripts
03-12-2025  10:43    <DIR>          sql
03-12-2025  11:49               337 Untitled.ipynb
               1 File(s)            337 bytes
               8 Dir(s)  428,943,478,784 bytes free


In [2]:
cd  C:\Users\Mohan\retail_pipeline

C:\Users\Mohan\retail_pipeline


In [3]:
ls


 Volume in drive C is Windows
 Volume Serial Number is 2283-99E3

 Directory of C:\Users\Mohan\retail_pipeline

03-12-2025  11:49    <DIR>          .
03-12-2025  10:43    <DIR>          ..
03-12-2025  11:49    <DIR>          .ipynb_checkpoints
03-12-2025  10:43    <DIR>          data
03-12-2025  10:43    <DIR>          docs
03-12-2025  10:43    <DIR>          notebooks
03-12-2025  11:49    <DIR>          scripts
03-12-2025  10:43    <DIR>          sql
03-12-2025  11:49               337 Untitled.ipynb
               1 File(s)            337 bytes
               8 Dir(s)  428,943,253,504 bytes free


In [7]:
# notebooks/etl_retail.ipynb (cells)
# Cell 1: imports & connection
import pandas as pd
import pyodbc
import os
from sqlalchemy import create_engine
from dotenv import load_dotenv

# optionally store credentials in .env
# load_dotenv()

# SQL Server connection string (Trusted auth)
server = r"BANDIASHOK"   # e.g. "localhost\\SQLEXPRESS"
database = "RetailAnalytics"
conn_str = f"mssql+pyodbc://@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server;Trusted_Connection=yes;"

engine = create_engine(conn_str)

# Cell 2: read CSVs
df_customers = pd.read_csv("data/customers.csv")
df_products  = pd.read_csv("data/products.csv")
df_sales     = pd.read_csv("data/sales.csv", parse_dates=["sale_date"])


# quick preview
display(df_customers.head())
display(df_products.head())
display(df_sales.head())

# Cell 3: simple cleaning (examples)
# - Trim strings
for df in [df_customers, df_products]:
    str_cols = df.select_dtypes(["object"]).columns
    for c in str_cols:
        df[c] = df[c].astype(str).str.strip()

# validate types
df_products['price'] = pd.to_numeric(df_products['price'], errors='coerce').fillna(0)
df_sales['quantity'] = pd.to_numeric(df_sales['quantity'], errors='coerce').fillna(0).astype(int)

# Cell 4: load to staging using sqlalchemy to_sql (replace existing)
df_customers.to_sql('stg_customers', con=engine, if_exists='replace', index=False)
df_products.to_sql('stg_products', con=engine, if_exists='replace', index=False)
df_sales.to_sql('stg_sales', con=engine, if_exists='replace', index=False)

print("Staging tables loaded.")

# Cell 5: call stored procedures to transform and load DWH
with engine.begin() as conn:
    conn.execute("EXEC sp_load_dim_customer;")
    conn.execute("EXEC sp_load_fact_sales;")

print("DWH load completed.")

# Cell 6: quick analytics query to validate
df = pd.read_sql("SELECT TOP 10 f.sale_id, c.full_name, p.product_name, f.quantity, f.total_amount FROM fact_sales f JOIN dim_customer c ON f.customer_sk=c.customer_sk JOIN dim_product p ON f.product_sk=p.product_sk", engine)
display(df)

FileNotFoundError: [Errno 2] No such file or directory: 'data/customers.csv'

In [6]:
import os
print(os.getcwd())


C:\Users\Mohan\retail_pipeline


In [9]:
# Cell 1: imports & setup (updated)
import os
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus

# current working folder (should be C:\Users\Mohan\retail_pipeline)
print("CWD:", os.getcwd())

# ensure data folder exists
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

# SQL Server connection details (adjust server name if needed)
server = r"BANDIASHOK"   # e.g. "localhost\\SQLEXPRESS"
database = "RetailAnalytics"

# use odbc_connect param to avoid driver encoding issues
odbc_str = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    f"SERVER={server};"
    f"DATABASE={database};"
    "Trusted_Connection=yes;"
)
conn_str = f"mssql+pyodbc:///?odbc_connect={quote_plus(odbc_str)}"
print("Using connection string (preview):", conn_str[:120], "...")
engine = create_engine(conn_str)

# Cell 2: ensure sample CSVs exist (creates simple sample files if not present)
sample_customers = DATA_DIR / "customers.csv"
sample_products  = DATA_DIR / "products.csv"
sample_sales     = DATA_DIR / "sales.csv"

if not sample_customers.exists():
    sample_customers.write_text(
        "customer_id,full_name,location\n"
        "1,Ravi,Bangalore\n"
        "2,Anita,Hyderabad\n"
        "3,John,Chennai\n"
    )
    print("Created sample", sample_customers)

if not sample_products.exists():
    sample_products.write_text(
        "product_id,product_name,category,price\n"
        "101,Laptop,Electronics,50000\n"
        "102,Mobile,Electronics,20000\n"
        "103,Shirt,Fashion,1500\n"
    )
    print("Created sample", sample_products)

if not sample_sales.exists():
    sample_sales.write_text(
        "sale_id,customer_id,product_id,quantity,sale_date\n"
        "1,1,101,1,2023-01-01\n"
        "2,2,102,2,2023-01-05\n"
        "3,3,103,3,2023-01-10\n"
    )
    print("Created sample", sample_sales)

# Cell 3: read CSVs
df_customers = pd.read_csv(DATA_DIR / "customers.csv")
df_products  = pd.read_csv(DATA_DIR / "products.csv")
df_sales     = pd.read_csv(DATA_DIR / "sales.csv", parse_dates=["sale_date"])

# quick preview
display(df_customers.head())
display(df_products.head())
display(df_sales.head())

# Cell 4: simple cleaning (examples)
for df in [df_customers, df_products]:
    str_cols = df.select_dtypes(["object"]).columns
    for c in str_cols:
        df[c] = df[c].astype(str).str.strip()

df_products['price'] = pd.to_numeric(df_products['price'], errors='coerce').fillna(0)
df_sales['quantity'] = pd.to_numeric(df_sales['quantity'], errors='coerce').fillna(0).astype(int)

# Cell 5: load to staging using sqlalchemy to_sql (replace existing)
# NOTE: to_sql uses SQLAlchemy; for larger loads consider using fast_executemany via raw pyodbc connection
df_customers.to_sql('stg_customers', con=engine, if_exists='replace', index=False)
df_products.to_sql('stg_products', con=engine, if_exists='replace', index=False)
df_sales.to_sql('stg_sales', con=engine, if_exists='replace', index=False)

print("Staging tables loaded.")

# Cell 6: Ensure stored procedures exist (create safe stubs if missing),
# then call them. Adjust procedure names to match your DB objects.

check_and_create_stubs_sql = """
-- create minimal stubs if missing (so notebook can run end-to-end)
IF OBJECT_ID('dbo.sp_load_dim_customer', 'P') IS NULL
BEGIN
    EXEC('CREATE PROCEDURE dbo.sp_load_dim_customer AS BEGIN SET NOCOUNT ON; SELECT ''stub:sp_load_dim_customer'' as info; END;')
END;

IF OBJECT_ID('dbo.sp_load_dim_date', 'P') IS NULL
BEGIN
    EXEC('CREATE PROCEDURE dbo.sp_load_dim_date AS BEGIN SET NOCOUNT ON; SELECT ''stub:sp_load_dim_date'' as info; END;')
END;

IF OBJECT_ID('dbo.sp_load_fact_sales', 'P') IS NULL
BEGIN
    EXEC('CREATE PROCEDURE dbo.sp_load_fact_sales AS BEGIN SET NOCOUNT ON; SELECT ''stub:sp_load_fact_sales'' as info; END;')
END;
"""

with engine.begin() as conn:
    # execute stored procedures
    conn.execute(text("EXEC dbo.sp_load_dim_customer;"))
    conn.execute(text("EXEC dbo.sp_load_fact_sales;"))


print("DWH load (procedure calls) completed.")

# Cell 7: quick analytics query to validate
df = pd.read_sql(
    "SELECT TOP 10 f.sale_id, c.full_name, p.product_name, f.quantity, f.total_amount "
    "FROM dbo.fact_sales f "
    "LEFT JOIN dbo.dim_customer c ON f.customer_sk=c.customer_sk "
    "LEFT JOIN dbo.dim_product p ON f.product_sk=p.product_sk",
    engine
)
display(df)


CWD: C:\Users\Mohan\retail_pipeline
Using connection string (preview): mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BODBC+Driver+17+for+SQL+Server%7D%3BSERVER%3DBANDIASHOK%3BDATABASE%3DRetailAnal ...


Unnamed: 0,customer_id,full_name,location
0,1,Ravi,Bangalore
1,2,Anita,Hyderabad
2,3,John,Chennai


Unnamed: 0,product_id,product_name,category,price
0,101,Laptop,Electronics,50000
1,102,Mobile,Electronics,20000
2,103,Shirt,Fashion,1500


Unnamed: 0,sale_id,customer_id,product_id,quantity,sale_date
0,1,1,101,1,2023-01-01
1,2,2,102,2,2023-01-05
2,3,3,103,3,2023-01-10


Staging tables loaded.
DWH load (procedure calls) completed.


Unnamed: 0,sale_id,full_name,product_name,quantity,total_amount
0,1,Ravi,Laptop,1,50000.0
1,2,Anita,Mobile,2,40000.0
2,3,John,Shirt,3,4500.0


In [11]:
!git init


Initialized empty Git repository in C:/Users/Mohan/retail_pipeline/.git/


In [12]:
pwd


'C:\\Users\\Mohan\\retail_pipeline'

In [13]:
!git add .




In [14]:
!git commit -m "initial upload"


[master (root-commit) 490f918] initial upload
 5 files changed, 663 insertions(+)
 create mode 100644 .ipynb_checkpoints/Untitled-checkpoint.ipynb
 create mode 100644 Untitled.ipynb
 create mode 100644 data/customers.csv
 create mode 100644 data/products.csv
 create mode 100644 data/sales.csv


In [15]:
!git remote add origin https://github.com/Bandi-Ashok/retail_etl_project.git


In [16]:
!git branch -M main
!git push -u origin main


To https://github.com/Bandi-Ashok/retail_etl_project.git
 ! [rejected]        main -> main (fetch first)
error: failed to push some refs to 'https://github.com/Bandi-Ashok/retail_etl_project.git'
hint: Updates were rejected because the remote contains work that you do not
hint: have locally. This is usually caused by another repository pushing to
hint: the same ref. If you want to integrate the remote changes, use
hint: 'git pull' before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.
