## Mode 1:Anomaly Detection

In [1]:
import pandas as pd
import numpy as np
from scipy import stats

In [2]:
def load_data(file_path):
    try:
        if file_path.endswith(".csv"):
            df = pd.read_csv(file_path, encoding="utf-8")
        elif file_path.endswith((".xls", ".xlsx")):
            df = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Use CSV or Excel.")
    except UnicodeDecodeError:

        df = pd.read_csv(file_path, encoding="ISO-8859-1")
    return df

In [3]:
def data_profile(df):
    profile = pd.DataFrame({
        "Column": df.columns,
        "DataType": df.dtypes.astype(str),
        "NonNullCount": df.notnull().sum(),
        "MissingCount": df.isnull().sum(),
        "MissingPercent": round(df.isnull().mean() * 100, 2),
        "UniqueValues": df.nunique()
    })
    return profile

In [4]:
def numeric_summary(df):
    return df.describe().T.assign(skew=df.skew(), kurtosis=df.kurtosis())

In [5]:
def detect_invalid(df):
    invalid_rows = pd.DataFrame()
    for col in df.select_dtypes(include=np.number).columns:
        invalids = df[df[col] < 0]
        if not invalids.empty:
            invalid_rows = pd.concat([invalid_rows, invalids])
    return invalid_rows if not invalid_rows.empty else pd.DataFrame(columns=df.columns)

In [6]:
def detect_outliers(df):
    summary = []
    n_rows = len(df)

    for col in df.select_dtypes(include=np.number).columns:
        series = df[col].dropna()
        if series.empty:
            continue

        z_scores = np.abs(stats.zscore(series))
        z_outliers = (z_scores > 3).sum()
        if z_outliers > 0:
            summary.append({
                "Column": col,
                "Method": "Z-Score",
                "Outlier Count": int(z_outliers),
                "Percent of Total": round((z_outliers / n_rows) * 100, 2)
            })

        Q1, Q3 = series.quantile([0.25, 0.75])
        IQR = Q3 - Q1
        iqr_outliers = ((series < Q1 - 1.5*IQR) | (series > Q3 + 1.5*IQR)).sum()
        if iqr_outliers > 0:
            summary.append({
                "Column": col,
                "Method": "IQR",
                "Outlier Count": int(iqr_outliers),
                "Percent of Total": round((iqr_outliers / n_rows) * 100, 2)
            })

    return pd.DataFrame(summary)

In [7]:
def detect_duplicates(df):
    dups = df[df.duplicated()]
    return dups if not dups.empty else pd.DataFrame(columns=df.columns)

In [8]:
def correlation_analysis(df, threshold=0.9):
    numeric_df = df.select_dtypes(include=np.number)
    if numeric_df.empty:
        return pd.DataFrame(columns=["Column 1", "Column 2", "Correlation", "Flagged"])

    corr_matrix = numeric_df.corr()

    records = []
    for i in range(len(corr_matrix.columns)):
        for j in range(i+1, len(corr_matrix.columns)):
            col1, col2 = corr_matrix.columns[i], corr_matrix.columns[j]
            corr_val = corr_matrix.iloc[i, j]
            records.append({
                "Column 1": col1,
                "Column 2": col2,
                "Correlation": round(corr_val, 3),
                "Flagged": "Yes" if abs(corr_val) >= threshold else "No"
            })

    return pd.DataFrame(records)

In [9]:
def low_variance_columns(df, threshold=0.01):
    summary = []
    for col in df.select_dtypes(include=np.number).columns:
        variance = df[col].var()
        if pd.isna(variance):
            continue
        unique_ratio = df[col].nunique() / len(df)
        summary.append({
            "Column": col,
            "Variance": round(variance, 4),
            "Unique Ratio": round(unique_ratio, 4),
            "Flagged": "Yes" if unique_ratio < threshold else "No"
        })
    return pd.DataFrame(summary)

In [10]:
def skewness_analysis(df, threshold=1.0):
    summary = []
    for col in df.select_dtypes(include=np.number).columns:
        skew = df[col].skew(skipna=True)
        if pd.isna(skew):
            continue
        summary.append({
            "Column": col,
            "Skewness": round(skew, 3),
            "Flagged": "Yes" if abs(skew) > threshold else "No"
        })
    return pd.DataFrame(summary)

## Mode 2: Cleaning Anomalies

In [11]:
def handle_missing(df, threshold=0.4):
    df_clean = df.copy()

    missing_fraction = df_clean.isnull().mean()
    cols_to_drop = missing_fraction[missing_fraction > threshold].index
    df_clean.drop(columns=cols_to_drop, inplace=True)

    for col in df_clean.select_dtypes(include=np.number).columns:
        df_clean[col].fillna(df_clean[col].median(), inplace=True)

    for col in df_clean.select_dtypes(include=["object", "category"]).columns:
        if not df_clean[col].mode().empty:
            df_clean[col].fillna(df_clean[col].mode()[0], inplace=True)
        else:
            df_clean[col].fillna("Unknown", inplace=True)

    return df_clean

In [12]:
def handle_outliers(df):
    df_clean = df.copy()
    for col in df_clean.select_dtypes(include=np.number).columns:
        Q1, Q3 = df_clean[col].quantile([0.25, 0.75])
        IQR = Q3 - Q1
        lower, upper = Q1 - 1.5*IQR, Q3 + 1.5*IQR
        df_clean[col] = np.where(df_clean[col] < lower, lower,
                                 np.where(df_clean[col] > upper, upper, df_clean[col]))
    return df_clean

## Before vs After

In [13]:
def cleaning_comparison(df, df_clean):
    summary = []

    for col in df.select_dtypes(include=np.number).columns:
        # Missing values
        missing_before = df[col].isna().sum()
        missing_after = df_clean[col].isna().sum()

        # Outliers before
        series = df[col].dropna()
        outliers_before = 0
        if not series.empty:
            z_scores = np.abs(stats.zscore(series))
            outliers_before = (z_scores > 3).sum()

        # Outliers after
        series_clean = df_clean[col].dropna()
        outliers_after = 0
        if not series_clean.empty:
            z_scores_clean = np.abs(stats.zscore(series_clean))
            outliers_after = (z_scores_clean > 3).sum()

        summary.append({
            "Column": col,
            "Missing Before": missing_before,
            "Missing After": missing_after,
            "Outliers Before": int(outliers_before),
            "Outliers After": int(outliers_after)
        })

    return pd.DataFrame(summary)

## Output

In [14]:
def generate_report(df, output_file="data_quality_report.xlsx", clean=False):
    profile = data_profile(df)
    numeric_stats = numeric_summary(df.select_dtypes(include=np.number))
    invalid_entries = detect_invalid(df)
    outliers = detect_outliers(df)
    duplicates = detect_duplicates(df)
    corr_analysis = correlation_analysis(df)
    low_var = low_variance_columns(df)
    skewness = skewness_analysis(df)

    with pd.ExcelWriter(output_file) as writer:
        profile.to_excel(writer, sheet_name="Data Profile", index=False)
        numeric_stats.to_excel(writer, sheet_name="Numeric Summary")
        if not invalid_entries.empty:
            invalid_entries.to_excel(writer, sheet_name="Invalid Entries", index=False)
        if not outliers.empty:
            outliers.to_excel(writer, sheet_name="Outlier Summary", index=False)
        if not duplicates.empty:
            duplicates.to_excel(writer, sheet_name="Duplicates", index=False)
        if not corr_analysis.empty:
            corr_analysis.to_excel(writer, sheet_name="Correlation Analysis", index=False)
        if not low_var.empty:
            low_var.to_excel(writer, sheet_name="Low Variance Columns", index=False)
        if not skewness.empty:
            skewness.to_excel(writer, sheet_name="Skewness Analysis", index=False)

    if clean:
        df_clean = handle_missing(df)
        df_clean = handle_outliers(df_clean)
        df_clean.to_csv("cleaned_dataset.csv", index=False)
        comparison = cleaning_comparison(df, df_clean)
        with pd.ExcelWriter(output_file, mode="a", engine="openpyxl") as writer:
            comparison.to_excel(writer, sheet_name="Before vs After", index=False)

In [15]:
if __name__ == "__main__":
    file_path = "Sample - Superstore.csv"
    df = load_data(file_path)

    mode = input("Run in cleaning mode? (y/n): ").strip().lower()
    clean_flag = True if mode == "y" else False

    generate_report(df, clean=clean_flag)

Run in cleaning mode? (y/n):  y


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_clean[col].fillna(df_clean[col].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_clean[col].fillna(df_clean[col].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on whi