# Retail Sales Forecasting Pipeline
### Oluchi Obinna

## Overview:
A simple pipeline to load, clean, and transform retail sales data, then save it for analysis or AI modeling.

## Steps:

**Load Data**: Upload from Google Drive or Kaggle. ZIP files are automatically extracted.

**Clean Data**: Remove duplicates, fill missing values.

**Feature Engineering**: Create new features like year, month, weekday, and numeric ratios.

**Save Data**: Processed data saved to /data/processed/ and /data/output/.

**AI Model (Optional)**: Train a simple Linear Regression to test ML readiness.


## How to Run:

**Mount Google Drive**: drive.mount('/content/drive')

**Set the path to your dataset**: path = "/content/drive/MyDrive/archive (3).zip"

**Run the pipeline cells in order**: ingest → clean → feature engineering → save → optional model.

## Notes:
- Logging tracks each step.

- Easy to add more features or ML models.

- Folder structure keeps data organized.


# Part 1 – Data Ingestion

In [1]:
# Install needed packages
!pip install pandas pyarrow scikit-learn

import pandas as pd
import numpy as np
import os
import logging



In [2]:
# Set up folders
os.makedirs("data/raw", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)
os.makedirs("data/output", exist_ok=True)

# Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

In [3]:
from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


In [4]:
zip_path = "/content/drive/MyDrive/archive (3).zip"

In [5]:
# Install needed packages
!pip install pandas pyarrow scikit-learn

import pandas as pd
import numpy as np
import os
import logging

# --- Setup directories ---
os.makedirs("data/raw", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)
os.makedirs("data/output", exist_ok=True)

# --- Setup logging ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# --- FUNCTION: ingest_data ---
def ingest_data(path):
    """
    Loads the retail dataset and returns a pandas DataFrame.
    path: path to your uploaded CSV or ZIP file
    """
    logging.info("Loading dataset...")
    if path.endswith(".zip"):
        import zipfile
        with zipfile.ZipFile(path, 'r') as zip_ref:
            zip_ref.extractall("data/raw")  # extract into data/raw
            logging.info("Extracted ZIP contents to data/raw/")
            # Try to read the first CSV inside the extracted folder
            csv_files = [f for f in os.listdir("data/raw") if f.endswith(".csv")]
            if len(csv_files) == 0:
                raise FileNotFoundError("No CSV found inside the ZIP file!")
            df = pd.read_csv(f"data/raw/{csv_files[0]}")
    else:
        df = pd.read_csv(path)
    logging.info(f"Loaded {df.shape[0]} rows and {df.shape[1]} columns")
    return df

# --- REPLACE this path with your actual Drive file path ---
path = "/content/drive/MyDrive/archive (3).zip"

# --- Run ingestion ---
df_raw = ingest_data(path)
df_raw.head()




Unnamed: 0,data,venda,estoque,preco
0,2014-01-01,0,4972,1.29
1,2014-01-02,70,4902,1.29
2,2014-01-03,59,4843,1.29
3,2014-01-04,93,4750,1.29
4,2014-01-05,96,4654,1.29


# Part 2 – Data Cleaning + Feature Engineering

In [6]:
# ---- FUNCTION: clean_data ----
def clean_data(df):
    logging.info("Cleaning data...")

    # Drop duplicates
    df = df.drop_duplicates()

    # Fill numeric NaNs with mean
    for col in df.select_dtypes(include=np.number):
        df[col] = df[col].fillna(df[col].mean())

    # Fill object NaNs with 'Unknown'
    for col in df.select_dtypes(include="object"):
        df[col] = df[col].fillna("Unknown")

    logging.info("Data cleaned.")
    return df


# ---- FUNCTION: feature_engineering ----
def feature_engineering(df):
    logging.info("Adding features...")

    # If there’s a Date column, create time parts
    date_cols = [c for c in df.columns if "date" in c.lower()]
    if date_cols:
        col = date_cols[0]
        df[col] = pd.to_datetime(df[col], errors="coerce")
        df["year"] = df[col].dt.year
        df["month"] = df[col].dt.month
        df["day"] = df[col].dt.day
        df["weekday"] = df[col].dt.weekday

    # Example derived numeric feature
    num_cols = df.select_dtypes(include=np.number).columns
    if len(num_cols) >= 2:
        df["feature_ratio"] = df[num_cols[0]] / (df[num_cols[1]] + 1e-5)

    logging.info("Features added.")
    return df


df_clean = clean_data(df_raw)
df_feat = feature_engineering(df_clean)
df_feat.head()

Unnamed: 0,data,venda,estoque,preco,feature_ratio
0,2014-01-01,0,4972,1.29,0.0
1,2014-01-02,70,4902,1.29,0.01428
2,2014-01-03,59,4843,1.29,0.012183
3,2014-01-04,93,4750,1.29,0.019579
4,2014-01-05,96,4654,1.29,0.020627


# Part 3 – Save Cleaned Data

In [7]:
# Save to CSV
clean_path = "data/processed/retail_clean.csv"
df_feat.to_csv(clean_path, index=False)
logging.info(f"Saved processed data to {clean_path}")

# Preview
df_feat.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 937 entries, 0 to 936
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   data           937 non-null    object 
 1   venda          937 non-null    int64  
 2   estoque        937 non-null    int64  
 3   preco          937 non-null    float64
 4   feature_ratio  937 non-null    float64
dtypes: float64(2), int64(2), object(1)
memory usage: 36.7+ KB


# Part 4 – Workflow (Pipeline)

In [8]:
# ---- PART 4: WORKFLOW ORCHESTRATION ----
!pip install prefect

from prefect import flow, task

@task
def ingest_task(path):
    return ingest_data(path)

@task
def clean_task(df):
    return clean_data(df)

@task
def feature_task(df):
    return feature_engineering(df)

@task
def save_task(df, filename="data/output/final_retail_data.csv"):
    df.to_csv(filename, index=False)
    print(f"✅ Saved cleaned data to {filename}")

@flow(name="Retail Sales Forecasting Pipeline")
def retail_pipeline_flow(path):
    df = ingest_task(path)
    df = clean_task(df)
    df = feature_task(df)
    save_task(df)

# Run the flow
path = "/content/drive/MyDrive/archive (3).zip"  # update this path
retail_pipeline_flow(path)


Collecting prefect
  Downloading prefect-3.4.24-py3-none-any.whl.metadata (13 kB)
Collecting aiosqlite<1.0.0,>=0.17.0 (from prefect)
  Downloading aiosqlite-0.21.0-py3-none-any.whl.metadata (4.3 kB)
Collecting apprise<2.0.0,>=1.1.0 (from prefect)
  Downloading apprise-1.9.5-py3-none-any.whl.metadata (56 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.1/56.1 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asgi-lifespan<3.0,>=1.0 (from prefect)
  Downloading asgi_lifespan-2.1.0-py3-none-any.whl.metadata (10 kB)
Collecting asyncpg<1.0.0,>=0.23 (from prefect)
  Downloading asyncpg-0.30.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.0 kB)
Collecting coolname<3.0.0,>=1.0.4 (from prefect)
  Downloading coolname-2.2.0-py2.py3-none-any.whl.metadata (6.2 kB)
Collecting dateparser<2.0.0,>=1.1.1 (from prefect)
  Downloading dateparser-1.2.2-py3-none-any.whl.metadata (29 kB)
Collecting docker<8.0,>=4.0 (from prefect)
  Downloading dock

INFO:prefect:Starting temporary server on http://127.0.0.1:8166
See https://docs.prefect.io/v3/concepts/server#how-to-guides for more information on running a dedicated Prefect server.
INFO:prefect.flow_runs:Beginning flow run 'astonishing-gharial' for flow 'Retail Sales Forecasting Pipeline'
INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.task_runs:Finished in state Completed()
INFO:prefect.flow_runs:Finished in state Completed()


✅ Saved cleaned data to data/output/final_retail_data.csv


# Part 5 – AI Data Readiness (Simple Model)

In [9]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Just pick numeric columns for demo
num_cols = df_feat.select_dtypes(include=np.number).columns
X = df_feat[num_cols].drop(num_cols[-1], axis=1, errors="ignore")
y = df_feat[num_cols[-1]]  # last numeric col as target (for demo)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = LinearRegression()
model.fit(X_train, y_train)
preds = model.predict(X_test)

mse = mean_squared_error(y_test, preds)
print("Model trained! MSE:", mse)

Model trained! MSE: 3625197360449.6875


1. What makes this pipeline “enterprise-grade”?

This pipeline is considered “enterprise-grade” because it’s built in a structured and organized way, just like big companies manage their data. It separates each step (like loading, cleaning, and saving data) into clear parts, so it’s easier to understand and maintain. It also uses logging to track what happens at each stage and saves data in different folders (raw, processed, and output) to avoid confusion or overwriting. This setup makes it reliable, repeatable, and scalable. These are all things large organizations need when working with data.

2. Which best practices were applied?

Several best practices were used:

Modular functions: Each task (like cleaning or feature engineering) is its own function, so the code is reusable and easy to update.

Logging: The logging system records what happens in each step, helping catch problems fast.

Folder structure: Data is organized into folders for raw, processed, and output files to keep things clean and traceable.

Config files: Using a configuration file for paths or settings makes the code flexible and easy to adjust without editing the main script.

3. How orchestration improves maintainability

Orchestration tools like Prefect or Airflow make it easier to manage and run all the pipeline steps in order. Instead of manually running each part, the orchestrator connects everything and makes sure tasks run in the right sequence. If something fails, it can show exactly where the problem happened. This makes the pipeline easier to fix, monitor, and automate. This means less manual work and fewer mistakes.