<a href="https://colab.research.google.com/github/bhuguvi26/RetailStream-Comprehensive-Data-Integration-and-Analytics-Pipeline/blob/main/RetailStream_Comprehensive_Data_Integration_and_Analytics_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Single-cell Colab pipeline: GitHub -> S3 -> Spark -> Analysis -> S3
# Run in Google Colab (Python 3). Installs required packages automatically.
# IMPORTANT: Enter AWS credentials when prompted.

# -------------------------
# Install dependencies
# -------------------------
# (may take a minute)
!pip install -q boto3 gitpython findspark pyspark==3.3.2

import os, sys, time, shutil, tempfile, json, subprocess
from getpass import getpass

# -------------------------
# User inputs (confirm)
# -------------------------
print("\n--- Config: provide details when prompted ---\n")
GIT_REPO = "https://github.com/bhuguvi26/RetailStream-Comprehensive-Data-Integration-and-Analytics-Pipeline.git"
# Default repo folders to scan for CSVs (adjust if your layout differs)
REPO_CSV_FOLDERS = ["Features_data_set", "Sales_Dataset", "Store_Dataset", "Stores_data_set", "Sales_Dataset"]
DEFAULT_RAW_BUCKET = "retailstream-raw"
DEFAULT_PROCESSED_BUCKET = "retailstream-processed"

aws_key = "AKIAXJ6ZHROTHSV54HF7"
aws_secret ="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"
aws_region = input("AWS Region (default: ap-southeast-2): ").strip() or "ap-southeast-2"

raw_bucket = input("S3 RAW bucket name (default: {}): ".format(DEFAULT_RAW_BUCKET)).strip() or DEFAULT_RAW_BUCKET
processed_bucket = input("S3 PROCESSED bucket name (default: {}): ".format(DEFAULT_PROCESSED_BUCKET)).strip() or DEFAULT_PROCESSED_BUCKET

if not aws_key:
    # allow environment variables to be used (if set)
    if not (os.environ.get("AWS_ACCESS_KEY_ID") and os.environ.get("AWS_SECRET_ACCESS_KEY")):
        print("No AWS credentials provided and none found in environment. Exiting.")
        raise SystemExit(1)
else:
    # set environment variables for boto3
    os.environ["AWS_ACCESS_KEY_ID"] = aws_key
    os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret
    os.environ["AWS_DEFAULT_REGION"] = aws_region

# -------------------------
# Imports (after pip)
# -------------------------
import boto3
from botocore.exceptions import ClientError
import git
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, year, sum as _sum, avg as _avg, desc

# -------------------------
# Helper utilities
# -------------------------
session = boto3.Session(
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
    region_name=aws_region
)
s3_client = session.client("s3")
s3_resource = session.resource("s3")

def bucket_exists(name):
    try:
        s3_client.head_bucket(Bucket=name)
        return True
    except ClientError as e:
        return False

def create_bucket_if_missing(name, region):
    if bucket_exists(name):
        print("Bucket exists:", name)
        return
    print("Creating bucket:", name)
    try:
        if region == "us-east-1":
            s3_client.create_bucket(Bucket=name)
        else:
            s3_client.create_bucket(Bucket=name, CreateBucketConfiguration={'LocationConstraint': region})
        print("Created bucket:", name)
    except ClientError as e:
        print("Failed to create bucket:", e)
        raise

def upload_file(local_path, bucket, key):
    s3_client.upload_file(local_path, bucket, key)
    print("Uploaded to s3://{}/{}".format(bucket, key))

def download_file(bucket, key, local_path):
    s3_client.download_file(bucket, key, local_path)
    # print("Downloaded s3://{}/{} -> {}".format(bucket, key, local_path))

# -------------------------
# Step 1: Clone repo locally
# -------------------------
WORKDIR = "/content/retail_repo"
if os.path.exists(WORKDIR):
    print("Repo already present at", WORKDIR, " â€” pulling latest changes if possible.")
    try:
        r = git.Repo(WORKDIR)
        r.remote().pull()
    except Exception as e:
        print("git pull failed (continuing with existing copy):", e)
else:
    print("Cloning repo:", GIT_REPO)
    try:
        git.Repo.clone_from(GIT_REPO, WORKDIR, depth=1)
        print("Cloned to", WORKDIR)
    except Exception as e:
        print("git clone failed:", e)
        print("If cloning fails due to TLS/network, upload files manually to Colab then re-run.")
        raise SystemExit(1)

# -------------------------
# Step 2: Find CSV files in repo folders
# -------------------------
print("\nScanning repository for CSV files under:", REPO_CSV_FOLDERS)
files_to_upload = []
for folder in REPO_CSV_FOLDERS:
    path = os.path.join(WORKDIR, folder)
    if not os.path.exists(path):
        # try lowercase or slight variants
        path_alt = os.path.join(WORKDIR, folder.lower())
        if os.path.exists(path_alt):
            path = path_alt
        else:
            # skip missing paths
            print("  (warning) path not found:", folder)
            continue
    for root, dirs, files in os.walk(path):
        for f in files:
            if f.lower().endswith(".csv"):
                local = os.path.join(root, f)
                # build s3 key: foldername/filename
                prefix = os.path.basename(folder).lower().replace(" ", "_")
                key = "{}/{}".format(prefix, f)
                files_to_upload.append((local, key))

if not files_to_upload:
    print("No CSV files found in repo paths. Exiting.")
    raise SystemExit(1)

print("Found {} CSV files to upload.".format(len(files_to_upload)))
for local, key in files_to_upload[:50]:
    print(" -", local, "->", key)

# -------------------------
# Step 3: Ensure S3 buckets exist (create them if needed)
# -------------------------
print("\nEnsuring S3 buckets exist (RAW and PROCESSED)")
create_bucket_if_missing(raw_bucket, aws_region)
create_bucket_if_missing(processed_bucket, aws_region)

# -------------------------
# Step 4: Upload CSVs to RAW bucket
# -------------------------
print("\nUploading CSVs to s3://{}/".format(raw_bucket))
uploaded_keys = []
for local, key in files_to_upload:
    try:
        upload_file(local, raw_bucket, key)
        uploaded_keys.append(key)
    except Exception as e:
        print("Upload failed for", local, ":", e)

print("Uploaded {} objects to s3://{}".format(len(uploaded_keys), raw_bucket))

# -------------------------
# Step 5: Download uploaded files locally into /content/data_for_spark
# (This ensures Spark can read them reliably in Colab)
# -------------------------
LOCAL_DATA_DIR = "/content/data_for_spark"
if os.path.exists(LOCAL_DATA_DIR):
    shutil.rmtree(LOCAL_DATA_DIR)
os.makedirs(LOCAL_DATA_DIR)

print("\nDownloading uploaded objects from S3 to local folder for Spark ingestion...")
for key in uploaded_keys:
    # strip prefix path pieces for local filename safety
    fname = key.replace("/", "__")
    local_path = os.path.join(LOCAL_DATA_DIR, fname)
    try:
        download_file(raw_bucket, key, local_path)
    except Exception as e:
        print("Failed to download s3://{}/{} -> {}".format(raw_bucket, key, local_path, e))

print("Local data directory for Spark:", LOCAL_DATA_DIR)
print("Files:", len(os.listdir(LOCAL_DATA_DIR)))

# -------------------------
# Step 6: Start Spark (local) and read CSVs
# -------------------------
print("\nStarting local SparkSession (this may take ~20 seconds)...")
# Configure findspark / environment (Colab)
findspark.init()

# Create Spark session
spark = SparkSession.builder \
    .appName("RetailStreamColab") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

print("Spark version:", spark.version)

# Read all CSV files in local data folder into a dict of DataFrames
from pyspark.sql import DataFrame
dfs = {}
for fname in sorted(os.listdir(LOCAL_DATA_DIR)):
    if not fname.lower().endswith(".csv"):
        continue
    local_path = os.path.join(LOCAL_DATA_DIR, fname)
    try:
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(local_path)
        keyname = fname.replace("__", "/")
        dfs[keyname] = df
        print("Loaded:", keyname, "rows:", df.count(), "columns:", len(df.columns))
    except Exception as e:
        print("Failed to load", local_path, ":", e)

if not dfs:
    print("No DataFrames loaded. Exiting.")
    raise SystemExit(1)

# -------------------------
# Step 7: Inspect DataFrames and map likely datasets
# -------------------------
print("\nAvailable DataFrames and sample columns:")
for k, df in dfs.items():
    print("==", k, "==")
    print("  columns:", df.columns)
    # show small sample
    try:
        df.show(3)
    except:
        pass

# Try to identify sales / stores / features by filename hints
sales_df = None
stores_df = None
features_df = None
for key, df in dfs.items():
    kl = key.lower()
    if "sales" in kl or "sale" in kl or "sales_sample" in kl:
        sales_df = df
    if "store" in kl or "stores" in kl:
        stores_df = df
    if "feature" in kl or "features" in kl:
        features_df = df

# fallback: if not detected explicitly, pick the largest one as sales
if sales_df is None and dfs:
    # choose DF with highest row count
    sales_df = max(dfs.values(), key=lambda d: d.count())

print("\nIdentified DataFrames:")
print(" sales_df:", "Yes" if sales_df is not None else "No")
print(" stores_df:", "Yes" if stores_df is not None else "No")
print(" features_df:", "Yes" if features_df is not None else "No")

# -------------------------
# Step 8: Basic cleaning helpers
# -------------------------
from pyspark.sql.functions import to_date, regexp_replace

def safe_column(df, candidates):
    """Return first matching column name from candidates present in df.columns"""
    cols = [c for c in df.columns]
    for cand in candidates:
        for c in cols:
            if c.lower() == cand.lower():
                return c
    # try substring matching
    for cand in candidates:
        for c in cols:
            if cand.lower() in c.lower():
                return c
    return None

# Normalize stores_df: ensure it has 'store' or 'store_id' and a 'city' column if possible
if stores_df is not None:
    # print sample
    pass

# Ensure date parsing for sales_df if it has a date-like column
date_col = None
if sales_df is not None:
    date_col = safe_column(sales_df, ["date", "sale_date", "transaction_date"])
    if date_col:
        try:
            sales_df = sales_df.withColumn("__date", to_date(col(date_col)))
            sales_df = sales_df.drop(date_col).withColumnRenamed("__date", date_col)
            print("Parsed date column:", date_col)
        except Exception as e:
            print("Date parse failed:", e)

# Standardize numeric sales/price column names
price_col = safe_column(sales_df, ["price", "amount", "sales", "sale_amount", "total"])
if price_col:
    try:
        sales_df = sales_df.withColumn(price_col, regexp_replace(col(price_col).cast("string"), "[^0-9.\\-]", "").cast("double"))
        print("Normalized price column:", price_col)
    except Exception as e:
        print("Price normalization failed:", e)

# Standardize store column
store_col = safe_column(sales_df, ["store", "store_id", "city", "location"])
if store_col:
    print("Using store column:", store_col)
else:
    print("No obvious store column in sales data. Some aggregations may be skipped.")

# -------------------------
# Step 9: Business-case calculations (best-effort)
# -------------------------
print("\n--- Business Calculations (best-effort) ---\n")

results = {}

# 1) Monthly aggregation: total sales by month (if date and price exist)
if date_col and price_col:
    try:
        monthly = sales_df.withColumn("month", month(col(date_col))).groupBy("month").agg(_sum(col(price_col)).alias("total_sales")).orderBy("month")
        print("Monthly aggregation (month, total_sales):")
        monthly.show(12)
        results["monthly_sales"] = monthly
    except Exception as e:
        print("Monthly aggregation failed:", e)
else:
    print("Skipping monthly aggregation (missing date or price).")

# 2) Weekly top-performing store (approx using id->week or date->week if date present)
from pyspark.sql.functions import weekofyear
if date_col and price_col and store_col:
    try:
        weekly = sales_df.withColumn("week", weekofyear(col(date_col))) \
                         .groupBy("week", col(store_col)) \
                         .agg(_sum(col(price_col)).alias("week_sales"))
        from pyspark.sql.window import Window
        from pyspark.sql.functions import row_number
        w = Window.partitionBy("week").orderBy(desc("week_sales"))
        weekly_top = weekly.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
        print("Weekly top-performing store (week, store, week_sales):")
        weekly_top.show(10)
        results["weekly_top_store"] = weekly_top
    except Exception as e:
        print("Weekly top-performing store failed:", e)
else:
    print("Skipping weekly top store (missing date/price/store).")

# 3) Leap year worst store: find min total sales for leap years if date exists
if date_col and price_col and store_col:
    try:
        leap = sales_df.withColumn("yr", year(col(date_col))).filter((col("yr") % 4) == 0) \
                       .groupBy(col(store_col)).agg(_sum(col(price_col)).alias("leap_sales")) \
                       .orderBy("leap_sales")
        print("Stores ordered by leap-year sales (lowest first):")
        leap.show(10)
        results["leap_worst"] = leap
    except Exception as e:
        print("Leap-year analysis failed:", e)
else:
    print("Skipping leap-year analysis (missing date/price/store).")

# 4) Department performance (by category/product) if available
cat_col = safe_column(sales_df, ["category", "department", "product_category"])
if cat_col and price_col:
    try:
        dept = sales_df.groupBy(cat_col).agg(_sum(col(price_col)).alias("total_sales")).orderBy(desc("total_sales"))
        print("Department performance by total sales:")
        dept.show(10)
        results["dept_performance"] = dept
    except Exception as e:
        print("Department performance failed:", e)
else:
    print("Skipping department performance (no category or price found).")

# 5) Example: average customer visits in type B stores during April
# This requires columns like 'visits', 'store_type', and date. We'll attempt best-effort:
visits_col = safe_column(sales_df, ["visits", "customer_visits", "footfall"])
store_type_col = safe_column(sales_df, ["store_type", "type"])
if date_col and visits_col and store_type_col:
    try:
        april_avg = sales_df.withColumn("month", month(col(date_col))) \
            .filter((col("month") == 4) & (col(store_type_col).rlike("(?i)B"))) \
            .groupBy(col(store_type_col)) \
            .agg(_avg(col(visits_col)).alias("avg_visits"))
        print("Average visits for type B stores during April:")
        april_avg.show()
        results["april_visits_typeB"] = april_avg
    except Exception as e:
        print("April visits calc failed:", e)
else:
    print("Skipping April visits (missing visits, store_type or date).")

# 6) Fuel price analysis (requires 'fuel' related column)
fuel_col = safe_column(sales_df, ["fuel_price", "fuel", "petrol_price"])
if fuel_col and store_col:
    try:
        fuel_weekly = sales_df.withColumn("week", weekofyear(col(date_col)) if date_col else None).groupBy("week", col(store_col)).agg(_avg(col(fuel_col)).alias("avg_fuel"))
        print("Fuel price weekly (sample):")
        if fuel_weekly is not None:
            fuel_weekly.show(5)
            results["fuel_weekly"] = fuel_weekly
    except Exception as e:
        print("Fuel price calc failed:", e)
else:
    print("Skipping fuel price analysis (no fuel or store column).")

# 7) Save results to local CSV files and upload to processed S3 bucket
OUT_DIR = "/content/processed_outputs"
if os.path.exists(OUT_DIR):
    shutil.rmtree(OUT_DIR)
os.makedirs(OUT_DIR)

def save_df_to_csv(spark_df, filename):
    local_tmp = os.path.join(OUT_DIR, filename)
    # coalesce to 1 partition for neat CSV
    try:
        spark_df.coalesce(1).write.mode("overwrite").option("header","true").csv(local_tmp)
        # find the part-*.csv in folder and zip it to final CSV for upload
        part = None
        for f in os.listdir(local_tmp):
            if f.startswith("part-") and f.endswith(".csv"):
                part = os.path.join(local_tmp, f)
                break
        if part is None:
            print("No CSV part found for", filename)
            return None
        final = os.path.join(OUT_DIR, filename + ".csv")
        shutil.move(part, final)
        shutil.rmtree(local_tmp)
        return final
    except Exception as e:
        print("Failed to save", filename, ":", e)
        return None

print("\nWriting analysis outputs and uploading to s3://{}/".format(processed_bucket))
create_bucket_if_missing(processed_bucket, aws_region)

uploaded_outputs = []
for name, sdf in results.items():
    try:
        local_csv = save_df_to_csv(sdf, name)
        if local_csv and os.path.exists(local_csv):
            s3_key = "analysis_outputs/{}".format(os.path.basename(local_csv))
            upload_file(local_csv, processed_bucket, s3_key)
            uploaded_outputs.append(s3_key)
    except Exception as e:
        print("Failed to write/upload result", name, ":", e)

print("Uploaded outputs:", uploaded_outputs)

# Final status
print("\n--- Pipeline finished ---")
print("RAW bucket:", raw_bucket)
print("PROCESSED bucket:", processed_bucket)
print("Uploaded {} data files and {} output objects.".format(len(uploaded_keys), len(uploaded_outputs)))
print("Local data dir for Spark:", LOCAL_DATA_DIR)
print("Processed outputs dir:", OUT_DIR)
print("\nYou can now inspect results in S3 or download processed CSVs from s3://{}/analysis_outputs/".format(processed_bucket))

# If you want a downloadable zip of outputs in Colab:
shutil.make_archive("/content/processed_outputs_zip", "zip", OUT_DIR)
print("\nDownload the results ZIP: /content/processed_outputs_zip.zip")


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Loaded: sales_dataset/Sales_data_set_305.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_306.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_307.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_308.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_309.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_31.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_310.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_311.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_312.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_313.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_314.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_315.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_316.csv rows: 1000 columns: 5
Loaded: sales_dataset/Sales_data_set_317.csv rows: 1000 columns: 