In [1]:
import sqlite3
import pandas as pd

def extract_from_sql(db_path, query):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

if __name__ == "__main__":
    df = extract_from_sql("test.db", "SELECT * FROM sales_table;")
    df.to_csv("data/raw/sql_output.csv", index=False)
    print("SQL extraction completed â†’ data/raw/sql_output.csv")

SQL extraction completed â†’ data/raw/sql_output.csv


In [5]:
#Data cleaning
import pandas as pd
import os

def clean_data(input_path, output_path):
    df = pd.read_csv(input_path)

    # Standardize column names
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

    # Handle missing values
    df = df.fillna({
        col: df[col].median() if df[col].dtype != "object" else "Unknown"
        for col in df.columns
    })

    # Convert date columns
    for col in df.columns:
        if "date" in col:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    df.to_csv(output_path, index=False)
    print(f"Cleaned data saved â†’ {output_path}")

if __name__ == "__main__":
    clean_data("data/raw/sample_raw.csv", "data/cleaned/cleaned_data.csv")

Cleaned data saved â†’ data/cleaned/cleaned_data.csv


In [3]:
import pandas as pd

def generate_insights(df):
    insights = {}

    insights["row_count"] = len(df)
    insights["column_count"] = len(df.columns)
    insights["missing_values"] = df.isna().sum().to_dict()
    insights["numeric_summary"] = df.describe().to_dict()

    if "sales" in df.columns:
        insights["total_sales"] = df["sales"].sum()
        insights["max_sales"] = df["sales"].max()
        insights["min_sales"] = df["sales"].min()

    return insights

if __name__ == "__main__":
    df = pd.read_csv("data/cleaned/cleaned_data.csv")
    insights = generate_insights(df)

    pd.DataFrame.from_dict(insights, orient="index").to_csv("data/cleaned/insights.csv")
    print("Insights generated â†’ data/cleaned/insights.csv")

Insights generated â†’ data/cleaned/insights.csv


In [6]:
#SQL Runner
import os
import sqlite3
import pandas as pd


# ---------------------------------------------------------
# STEP 1 â€” EXTRACT DATA
# ---------------------------------------------------------
def extract_from_sql(db_path, query, output_path):
    print("ðŸ”¹ Extracting data from SQL...")
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query(query, conn)
    conn.close()

    df.to_csv(output_path, index=False)
    print(f"   â†’ Extracted SQL data saved to: {output_path}")
    return df


def extract_from_csv(raw_csv_path):
    print("ðŸ”¹ Reading raw CSV file...")
    df = pd.read_csv(raw_csv_path)
    print(f"   â†’ Loaded CSV from: {raw_csv_path}")
    return df


# ---------------------------------------------------------
# STEP 2 â€” CLEAN DATA
# ---------------------------------------------------------
def clean_data(df, output_path):
    print("ðŸ”¹ Cleaning data...")

    # Standardize column names
    df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

    # Fill missing values
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].fillna("Unknown")
        else:
            df[col] = df[col].fillna(df[col].median())

    # Convert date columns
    for col in df.columns:
        if "date" in col:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    df.to_csv(output_path, index=False)
    print(f"   â†’ Cleaned data saved to: {output_path}")
    return df


# ---------------------------------------------------------
# STEP 3 â€” ANALYZE DATA (EDA)
# ---------------------------------------------------------
def generate_insights(df, output_path):
    print("ðŸ”¹ Generating insights...")

    insights = {}
    insights["row_count"] = len(df)
    insights["column_count"] = len(df.columns)
    insights["missing_values"] = df.isna().sum().to_dict()
    insights["numeric_summary"] = df.describe().to_dict()

    if "sales" in df.columns:
        insights["total_sales"] = df["sales"].sum()
        insights["max_sales"] = df["sales"].max()
        insights["min_sales"] = df["sales"].min()

    # Save insights as CSV
    pd.DataFrame.from_dict(
        {k: str(v) for k, v in insights.items()},
        orient="index",
        columns=["value"]
    ).to_csv(output_path)

    print(f"   â†’ Insights saved to: {output_path}")
    return insights


# ---------------------------------------------------------
# MASTER RUN FUNCTION
# ---------------------------------------------------------
def run_pipeline():
    print("\nðŸš€ RUNNING DATA ANALYST PIPELINE\n")

    # Ensure directory structure
    os.makedirs("data/raw", exist_ok=True)
    os.makedirs("data/cleaned", exist_ok=True)

    # Step 1 â€” Extract SQL
    extract_from_sql(
        db_path="test.db",
        query="SELECT * FROM sales_table;",
        output_path="data/raw/sql_output.csv"
    )

    # Step 1b â€” Extract RAW CSV
    raw_df = extract_from_csv("data/raw/sample_raw.csv")

    # Step 2 â€” Clean
    cleaned_df = clean_data(
        raw_df,
        output_path="data/cleaned/cleaned_data.csv"
    )

    # Step 3 â€” Insights
    generate_insights(
        cleaned_df,
        output_path="data/cleaned/insights.csv"
    )

    print("\nðŸŽ‰ Pipeline completed successfully!\n")


if __name__ == "__main__":
    run_pipeline()



ðŸš€ RUNNING DATA ANALYST PIPELINE

ðŸ”¹ Extracting data from SQL...
   â†’ Extracted SQL data saved to: data/raw/sql_output.csv
ðŸ”¹ Reading raw CSV file...
   â†’ Loaded CSV from: data/raw/sample_raw.csv
ðŸ”¹ Cleaning data...
   â†’ Cleaned data saved to: data/cleaned/cleaned_data.csv
ðŸ”¹ Generating insights...
   â†’ Insights saved to: data/cleaned/insights.csv

ðŸŽ‰ Pipeline completed successfully!

