<a href="https://colab.research.google.com/github/alfredqbit/NU-DDS-8515/blob/main/sepulvedaadds_8515_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Assignment 6: ANOVA and MANOVA on A Marketing Dataset

Dataset: `digital_marketing_campaign_dataset.csv`
Source: Kaggle / Opendatabay (Digital Marketing Conversion Dataset)

Goals:
 - One-way ANOVA: PreviousPurchases ~ CampaignChannel
 - Two-way ANOVA: ClickThroughRate ~ CampaignType * IncomeSegment
 - MANOVA: (ClickThroughRate, TimeOnSite) ~ CampaignChannel

In [None]:
import os
import zipfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

from scipy import stats
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.multivariate.manova import MANOVA
from statsmodels.graphics.factorplots import interaction_plot

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

sns.set(style="whitegrid")
warnings.filterwarnings('ignore')

FIG_DIR = "figures"
os.makedirs(FIG_DIR, exist_ok=True)


In [None]:
import subprocess
import sys

try:
    import statsmodels
except ImportError:
    print("statsmodels not found, installing...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    print("statsmodels installed.")
    import statsmodels


In [None]:
import pandas as pd

# Load the dataset from the sample_data directory
def load_ad_dataset() -> pd.DataFrame:
  file_path = 'sample_data/Dataset_Ads.csv'
  print(f"Loading dataset from {file_path}...")

  try:
      df_ads = pd.read_csv(file_path)
      print("Dataset loaded successfully.")
      display(df_ads.head())
      display(df_ads.decribe())
      display(df_ads.info())
      return df_ads
  except FileNotFoundError:
    print(f"Error: File not found at {file_path}")

 # Data loading and preprocessing pipeline

In [None]:
def load_raw_digital_marketing_dataset(
    source: str = "auto",
    local_path: str | None = None,
    download_dir: str = "data"
) -> pd.DataFrame:
    """
    Load `digital_marketing_campaign_dataset.csv` from:
      - a local file,
      - the Kaggle dataset
        `rabieelkharoua/predict-conversion-in-digital-marketing-dataset`,
      - or (optionally) a public raw URL.

    Parameters
    ----------
    source : {'auto', 'local', 'kaggle', 'github'}
        'auto'  : try local_path, then Kaggle path (if on Kaggle),
                  then GitHub raw mirror.
        'local' : require local_path to be a valid file.
        'kaggle': use Kaggle CLI/API to download.
        'github': load from a public raw URL mirror of the same CSV.
    local_path : str or None
        Path to a local CSV file, if available.
    download_dir : str
        Directory where Kaggle downloads / extracted files are stored.

    Returns
    -------
    df : pandas.DataFrame
    """
    filename = "digital_marketing_campaign_dataset.csv"

    # --- 1. AUTO mode: try local, then Kaggle input path, then GitHub raw ---
    if source == "auto":
        # (a) explicit local path
        if local_path is not None and os.path.exists(local_path):
            print(f"Loading dataset from local path: {local_path}")
            return pd.read_csv(local_path)

        # (b) Kaggle built-in path (when running inside Kaggle notebooks)
        kaggle_input = "/kaggle/input/predict-conversion-in-digital-marketing-dataset/digital_marketing_campaign_dataset.csv"
        if os.path.exists(kaggle_input):
            print(f"Loading dataset from Kaggle input path: {kaggle_input}")
            return pd.read_csv(kaggle_input)

        # (c) GitHub raw mirror as a last resort (same CSV mirrored on GitHub)
        github_raw_url = (
            "https://raw.githubusercontent.com/"
            "Elakkiya-U/Digital-marketing-campaign/"
            "main/Digital_Marketing_Campaign_Dataset.csv"
        )
        print(f"Loading dataset from GitHub raw URL: {github_raw_url}")
        return pd.read_csv(github_raw_url)

    # --- 2. LOCAL mode (explicit) ---
    if source == "local":
        if local_path is None:
            raise ValueError("source='local' requires a valid local_path.")
        if not os.path.exists(local_path):
            raise FileNotFoundError(f"Local file not found: {local_path}")
        print(f"Loading dataset from local file: {local_path}")
        return pd.read_csv(local_path)

    # --- 3. KAGGLE mode (non-Kaggle environment, using Kaggle API) ---
    if source == "kaggle":
        # You must have:
        #   pip install kaggle
        #   KAGGLE_USERNAME and KAGGLE_KEY set in your environment,
        #   or kaggle.json configured in ~/.kaggle/
        try:
            from kaggle.api.kaggle_api_extended import KaggleApi
        except ImportError as e:
            raise ImportError(
                "Kaggle API not installed. Run `pip install kaggle` first."
            ) from e

        os.makedirs(download_dir, exist_ok=True)

        dataset_slug = "rabieelkharoua/predict-conversion-in-digital-marketing-dataset"
        print(f"Downloading '{filename}' from Kaggle dataset: {dataset_slug}")

        api = KaggleApi()
        api.authenticate()

        api.dataset_download_file(
            dataset_slug,
            file_name=filename,
            path=download_dir,
            force=True
        )

        zip_path = os.path.join(download_dir, filename + ".zip")
        if os.path.exists(zip_path):
            print(f"Extracting {zip_path} ...")
            with zipfile.ZipFile(zip_path, "r") as zf:
                zf.extractall(download_dir)
        csv_path = os.path.join(download_dir, filename)
        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"Could not find extracted CSV at {csv_path}")
        print(f"Loading dataset from extracted CSV: {csv_path}")
        return pd.read_csv(csv_path)

    # --- 4. GitHub mode (explicit raw HTTP) ---
    if source == "github":
        github_raw_url = (
            "https://raw.githubusercontent.com/"
            "Elakkiya-U/Digital-marketing-campaign/"
            "main/Digital_Marketing_Campaign_Dataset.csv"
        )
        print(f"Loading dataset from GitHub raw URL: {github_raw_url}")
        return pd.read_csv(github_raw_url)

    # If we got here, source was invalid.
    raise ValueError(f"Unknown source '{source}'. Use 'auto', 'local', 'kaggle', or 'github'.")

# Example call (will be used later):
# df = load_and_prepare_data("digital_marketing_campaign_dataset.csv")

# One-way ANOVA pipeline: PreviousPurchases ~ CampaignChannel

We create three income-based segments (Low, Medium, High)
via tertiles, to use as a factor in the two-way ANOVA.

In [None]:
def run_oneway_anova_purchases(df: pd.DataFrame, fig_dir: str = FIG_DIR):
    """
    Run one-way ANOVA: PreviousPurchases ~ CampaignChannel,
    check assumptions, and export diagnostic plots.
    """
    if "PreviousPurchases" not in df.columns or "CampaignChannel" not in df.columns:
        raise ValueError("Required columns 'PreviousPurchases' or 'CampaignChannel' missing.")

    # Drop rows with missing values relevant to this analysis
    df_1way = df.dropna(subset=["PreviousPurchases", "CampaignChannel"])

    # Fit ANOVA model
    model = ols("PreviousPurchases ~ C(CampaignChannel)", data=df_1way).fit()
    anova_table = sm.stats.anova_lm(model, typ=2)
    print("One-way ANOVA: PreviousPurchases ~ CampaignChannel")
    display(anova_table)

    resid = model.resid
    fitted = model.fittedvalues

    # Levene's test for homogeneity of variance
    groups = [g["PreviousPurchases"].values for _, g in df_1way.groupby("CampaignChannel")]
    lev_stat, lev_p = stats.levene(*groups)
    print(f"\nLevene's test: stat={lev_stat:.3f}, p={lev_p:.3f}")

    # Shapiro-Wilk for normality of residuals, with sampling for large N
    sample_size_limit = 5000
    if len(resid) > sample_size_limit:
        sampled_resid = resid.sample(n=sample_size_limit, random_state=42)
        print(f"Shapiro-Wilk (residuals, sampled N={sample_size_limit}):")
        sh_stat, sh_p = stats.shapiro(sampled_resid)
    else:
        print(f"Shapiro-Wilk (residuals): ")
        sh_stat, sh_p = stats.shapiro(resid)
    print(f"  stat={sh_stat:.3f}, p={sh_p:.3f}")

    # Q-Q plot for residuals
    sm.qqplot(resid, line="45")
    plt.title("One-way ANOVA Residuals Q-Q Plot")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "oneway_resid_qq.png"), dpi=300)
    plt.show()

    # Residuals vs fitted
    plt.figure()
    plt.scatter(fitted, resid, alpha=0.6)
    plt.axhline(0, color="gray", linewidth=0.8)
    plt.xlabel("Fitted values")
    plt.ylabel("Residuals")
    plt.title("One-way ANOVA: Residuals vs Fitted")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "oneway_resid_vs_fitted.png"), dpi=300)
    plt.show()

    # Boxplot of PreviousPurchases by CampaignChannel
    plt.figure(figsize=(8, 5))
    sns.boxplot(x="CampaignChannel", y="PreviousPurchases", data=df_1way)
    plt.title("Previous Purchases by Campaign Channel")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "box_purchases_by_channel.png"), dpi=300)
    plt.show()

    return model, anova_table

# Two-way ANOVA pipeline: ClickThroughRate ~ CampaignType * IncomeSegment

In [None]:
def run_twoway_anova_ctr(df: pd.DataFrame, fig_dir: str = FIG_DIR):
    """
    Run two-way ANOVA:
      ClickThroughRate ~ CampaignType * IncomeSegment
    Check assumptions and export interaction and diagnostic plots.
    """
    required = ["ClickThroughRate", "CampaignType", "IncomeSegment"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns for two-way ANOVA: {missing}")

    df_2way = df.dropna(subset=required)

    # Fit two-way ANOVA model with interaction
    formula = "ClickThroughRate ~ C(CampaignType) * C(IncomeSegment)"
    model = ols(formula, data=df_2way).fit()
    anova_table = sm.stats.anova_lm(model, typ=2)

    print("Two-way ANOVA: ClickThroughRate ~ CampaignType * IncomeSegment")
    display(anova_table)

    resid = model.resid

    # Shapiro-Wilk test for residuals, with sampling for large N
    sample_size_limit = 5000
    if len(resid) > sample_size_limit:
        sampled_resid = resid.sample(n=sample_size_limit, random_state=42)
        print(f"\nShapiro-Wilk (two-way residuals, sampled N={sample_size_limit}):")
        sh_stat, sh_p = stats.shapiro(sampled_resid)
    else:
        print(f"\nShapiro-Wilk (two-way residuals):")
        sh_stat, sh_p = stats.shapiro(resid)
    print(f"  stat={sh_stat:.3f}, p={sh_p:.3f}")

    # Levene's test across CampaignType x IncomeSegment cells (add observed=False)
    cells = [g["ClickThroughRate"].values
             for _, g in df_2way.groupby(["CampaignType", "IncomeSegment"], observed=False)]
    lev_stat, lev_p = stats.levene(*cells)
    print(f"Levene's test (two-way): stat={lev_stat:.3f}, p={lev_p:.3f}")

    # Interaction plot
    plt.figure(figsize=(8, 5))
    interaction_plot(
        df_2way["CampaignType"],
        df_2way["IncomeSegment"],
        df_2way["ClickThroughRate"],
        markers=["o", "s", "D"],
        ms=6
    )
    plt.ylabel("Mean ClickThroughRate")
    plt.title("Interaction: CampaignType x IncomeSegment on CTR")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "interaction_ctr_adtype_segment.png"), dpi=300)
    plt.show()

    # Q-Q plot for residuals
    sm.qqplot(resid, line="45")
    plt.title("Two-way ANOVA Residuals Q-Q Plot (CTR)")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "twoway_ctr_qqplot.png"), dpi=300)
    plt.show()

    # Residuals vs fitted
    plt.figure()
    plt.scatter(model.fittedvalues, resid, alpha=0.5)
    plt.axhline(0, color="gray", linewidth=0.8)
    plt.xlabel("Fitted values")
    plt.ylabel("Residuals")
    plt.title("Two-way ANOVA: CTR Residuals vs Fitted")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "twoway_ctr_resid_fitted.png"), dpi=300)
    plt.show()

    return model, anova_table

MANOVA pipeline: model: (ClickThroughRate, TimeOnSite) ~ CampaignChannel

Includes:
 - MANOVA via statsmodels
 - Assumption checks (Shapiro, Box's M)
 - Canonical discriminant visualization via sklearn Pipeline (StandardScaler + LDA)

Conditionally execute univariate ANOVAs and diagnostics when the MANOVA result (Wilks' lambda) is significant (p < 0.05).



In [None]:
from numpy.linalg import det, inv
from scipy.stats import chi2

def box_m_test(df_y: pd.DataFrame, group_labels: np.ndarray):
    """
    Box's M test for equality of covariance matrices across groups.
    """
    y = df_y.values
    groups = np.unique(group_labels)
    p = y.shape[1]
    N = y.shape[0]

    covs = []
    ns = []
    for g in groups:
        Yg = y[group_labels == g]
        ns.append(Yg.shape[0])
        covs.append(np.cov(Yg, rowvar=False))

    ns = np.array(ns)
    covs = np.array(covs)

    # Pooled covariance
    Sp = sum((ns[i] - 1) * covs[i] for i in range(len(groups))) / (N - len(groups))

    M = (N - len(groups)) * np.log(det(Sp)) - sum(
        (ns[i] - 1) * np.log(det(covs[i])) for i in range(len(groups))
    )

    # Correction factor
    C = ((2 * p**2 + 3 * p - 1) /
         (6 * (p + 1) * (len(groups) - 1))) * \
        (sum(1 / (ns[i] - 1) for i in range(len(groups))) -
         1 / (N - len(groups)))

    chi2_approx = M * (1 - C)
    df_val = (len(groups) - 1) * p * (p + 1) / 2
    p_value = 1 - chi2.cdf(chi2_approx, df_val)

    return M, chi2_approx, df_val, p_value


def run_manova_and_canonical(df: pd.DataFrame, fig_dir: str = FIG_DIR):
    """
    Run MANOVA:
      (ClickThroughRate, TimeOnSite) ~ CampaignChannel
    + assumption checks
    + conditional univariate ANOVAs (if MANOVA significant)
    + canonical discriminant visualization via sklearn Pipeline (StandardScaler + LDA).
    """
    required = ["ClickThroughRate", "TimeOnSite", "CampaignChannel"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns for MANOVA: {missing}")

    df_manova = df.dropna(subset=required)

    # MANOVA using statsmodels
    formula = "ClickThroughRate + TimeOnSite ~ C(CampaignChannel)"
    manova = MANOVA.from_formula(formula, data=df_manova)

    # Capture MANOVA results
    mv_results = manova.mv_test()
    print("MANOVA results (CTR, TimeOnSite ~ CampaignChannel):")
    print(mv_results)

    # Assumption checks: univariate Shapiro-Wilk for each DV, with sampling for large N
    sample_size_limit = 5000
    for dv in ["ClickThroughRate", "TimeOnSite"]:
        data_for_shapiro = df_manova[dv]
        if len(data_for_shapiro) > sample_size_limit:
            sampled_data = data_for_shapiro.sample(n=sample_size_limit, random_state=42)
            print(f"\nShapiro-Wilk for {dv}: (applied to a random sample of {sample_size_limit} due to large N)")
            stat, pval = stats.shapiro(sampled_data)
        else:
            print(f"\nShapiro-Wilk for {dv}:")
            stat, pval = stats.shapiro(data_for_shapiro)
        print(f"  stat={stat:.3f}, p={pval:.3f}")

    # Box's M test for equality of covariance matrices
    Y = df_manova[["ClickThroughRate", "TimeOnSite"]]
    groups = df_manova["CampaignChannel"].values
    M, chi2_val, df_box, p_box = box_m_test(Y, groups)
    print(f"\nBox's M test: M={M:.3f}, chi2={chi2_val:.3f}, df={df_box:.0f}, p={p_box:.3f}")

    # Check MANOVA significance (Wilks' lambda)
    # The key in results is typically the term name 'C(CampaignChannel)'
    term_name = 'C(CampaignChannel)'
    if term_name in mv_results.results:
        stats_df = mv_results.results[term_name]['stat']
        # Extract Wilks' lambda p-value
        # The row index is "Wilks' lambda" and column is "Pr > F"
        wilks_p = stats_df.loc["Wilks' lambda", "Pr > F"]
        print(f"\nWilks' Lambda p-value: {wilks_p:.4g}")

        if wilks_p < 0.05:
            print("MANOVA result is significant (p < 0.05). Proceeding to univariate ANOVAs.")
            # Follow-up univariate ANOVAs
        else:
            print("\nMANOVA result is not significant (p >= 0.05).")
        for dv in ["ClickThroughRate", "TimeOnSite"]:
            print(f"\nFollowup Univariate ANOVA: {dv} ~ CampaignChannel")
            model_dv = ols(f"{dv} ~ C(CampaignChannel)", data=df_manova).fit()
            anova_dv = sm.stats.anova_lm(model_dv, typ=2)
            display(anova_dv)
    else:
        print(f"\nWarning: Term '{term_name}' not found in MANOVA results. Cannot check significance automatically.")

    # Canonical discriminant scores via sklearn Pipeline: StandardScaler + LDA
    lda_pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("lda", LinearDiscriminantAnalysis(n_components=2))
    ])

    Y_numeric = Y.values
    scores = lda_pipeline.fit_transform(Y_numeric, groups)

    can_df = pd.DataFrame(scores, columns=["Can1", "Can2"])
    can_df["CampaignChannel"] = groups

    plt.figure(figsize=(7, 5))
    sns.scatterplot(
        x="Can1", y="Can2",
        hue="CampaignChannel",
        data=can_df,
        s=60, alpha=0.8
    )
    plt.axhline(0, color="gray", linewidth=0.8)
    plt.axvline(0, color="gray", linewidth=0.8)
    plt.title("Canonical Score Plot: CTR & TimeOnSite by CampaignChannel")
    plt.tight_layout()
    plt.savefig(os.path.join(fig_dir, "manova_canonical_scatter.png"), dpi=300)
    plt.show()

    return manova, can_df

# Run the full analysis pipeline

In [None]:
df = load_raw_digital_marketing_dataset()
# df = load_ad_dataset()

# Create IncomeSegment for two-way ANOVA
df['IncomeSegment'] = pd.qcut(
    df['Income'],
    q=3,
    labels=['Low', 'Medium', 'High'],
    duplicates='drop'
)

print("Dataset preview:")
display(df.describe())
display(df.head())
display(df.info())

print("\nRunning one-way ANOVA pipeline...")
model_1way, anova_1way = run_oneway_anova_purchases(df)

print("\nRunning two-way ANOVA pipeline...")
model_2way, anova_2way = run_twoway_anova_ctr(df)

print("\nRunning MANOVA + canonical discriminant pipeline + followup AMOVA")
manova_res, canonical_scores_df = run_manova_and_canonical(df)

print("\nPipeline complete. Plots saved in the 'figures' subdirectory for downloading into full LaTeX report.")

Sanity check for modeling strategy:
Generate synthetic dataset with moderate to mild significant effects and run full analysis pieline
Check for consisten results

In [None]:
def generate_synthetic_marketing_data(n_samples=3000, seed=42):
    np.random.seed(seed)

    # 1. Generate Categorical Features
    channels = ['Social Media', 'Email', 'PPC', 'Referral', 'SEO']
    types = ['Awareness', 'Consideration', 'Conversion', 'Retention']
    segments = ['Low', 'Medium', 'High']

    data = {
        'CampaignChannel': np.random.choice(channels, n_samples),
        'CampaignType': np.random.choice(types, n_samples),
        '_TrueSegment': np.random.choice(segments, n_samples) # Helper for income generation
    }
    df = pd.DataFrame(data)

    # 2. Generate Income based on Segment
    def get_income(seg):
        if seg == 'Low': return np.random.randint(20000, 45000)
        if seg == 'Medium': return np.random.randint(45001, 80000)
        return np.random.randint(80001, 150000)

    df['Income'] = df['_TrueSegment'].apply(get_income)

    # 3. Inject Effects for One-way ANOVA: PreviousPurchases ~ CampaignChannel
    # Moderate: Max diff ~1.5, Sigma=2.5
    pp_means = {'Social Media': 2.5, 'Email': 4.0, 'PPC': 2.8, 'Referral': 3.5, 'SEO': 3.0}
    df['PreviousPurchases'] = df['CampaignChannel'].map(pp_means) + np.random.normal(0, 2.5, n_samples)
    df['PreviousPurchases'] = df['PreviousPurchases'].clip(lower=0).astype(int)

    # 4. Inject Effects for MANOVA: TimeOnSite ~ CampaignChannel
    # Moderate: Max diff ~25, Sigma=20
    tos_means = {'Social Media': 50.0, 'Email': 75.0, 'PPC': 45.0, 'Referral': 65.0, 'SEO': 55.0}
    df['TimeOnSite'] = df['CampaignChannel'].map(tos_means) + np.random.normal(0, 20.0, n_samples)
    df['TimeOnSite'] = df['TimeOnSite'].clip(lower=0)

    # 5. Inject Effects for Two-way ANOVA & MANOVA:
    # ClickThroughRate ~ CampaignType * IncomeSegment + CampaignChannel

    base_ctr = 0.10

    # Moderate Channel effects (for MANOVA)
    channel_eff = {'Social Media': 0.00, 'Email': 0.03, 'PPC': 0.01, 'Referral': 0.02, 'SEO': 0.005}

    # Moderate Type effects
    type_eff = {'Awareness': 0.00, 'Consideration': 0.02, 'Conversion': 0.04, 'Retention': 0.01}

    # Moderate Segment effects
    seg_eff = {'Low': 0.00, 'Medium': 0.02, 'High': 0.03}

    ctr_vec = base_ctr + \
              df['CampaignChannel'].map(channel_eff) + \
              df['CampaignType'].map(type_eff) + \
              df['_TrueSegment'].map(seg_eff)

    # Moderate Interaction Effects
    interaction_noise = np.random.normal(0, 0.04, n_samples)

    def get_interaction(row):
        if row['CampaignType'] == 'Conversion' and row['_TrueSegment'] == 'High':
            return 0.04 # Moderate boost
        if row['CampaignType'] == 'Awareness' and row['_TrueSegment'] == 'Low':
            return -0.02 # Moderate penalty
        return 0.0

    df['ClickThroughRate'] = ctr_vec + df.apply(get_interaction, axis=1) + interaction_noise
    df['ClickThroughRate'] = df['ClickThroughRate'].clip(0, 1)

    # Cleanup
    df.drop(columns=['_TrueSegment'], inplace=True)

    return df

# Generate the dataset
print("Generating synthetic dataset with moderately significant effects (p ~< 0.5)...")
df_synthetic = generate_synthetic_marketing_data()

# Preprocess
df_synthetic['IncomeSegment'] = pd.qcut(
    df_synthetic['Income'],
    q=3,
    labels=['Low', 'Medium', 'High'],
    duplicates='drop'
)

print("Synthetic dataset preview:")
display(df_synthetic.head())

# Run the Analysis Pipelines
print("\n--- 1. Running One-way ANOVA (PreviousPurchases ~ CampaignChannel) ---")
run_oneway_anova_purchases(df_synthetic)

print("\n--- 2. Running Two-way ANOVA (CTR ~ CampaignType * IncomeSegment) ---")
run_twoway_anova_ctr(df_synthetic)

print("\n--- 3. Running MANOVA ((CTR, TimeOnSite) ~ CampaignChannel) ---")
run_manova_and_canonical(df_synthetic)