In [1]:
# -------------------- Standard Library --------------------
import os
import glob
import math
import itertools
import warnings

# Suppress warnings
warnings.filterwarnings('ignore')

# -------------------- Core Scientific Libraries --------------------
import numpy as np
import pandas as pd

# -------------------- Visualization --------------------
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image

# -------------------- Statistical Analysis --------------------
from scipy.stats import skew, shapiro, kurtosis, chi2_contingency
# -------------------- Scikit-learn Core --------------------
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.ensemble import IsolationForest

In [2]:
df = pd.read_csv(r'C:\Users\Asus\Downloads\Fraud_MLOps_Project\Data\payment_fraud.csv')
df.head()

Unnamed: 0,accountAgeDays,numItems,localTime,paymentMethod,paymentMethodAgeDays,Category,isWeekend,label
0,29,1,4.745402,paypal,28.204861,shopping,0.0,0
1,725,1,4.742303,storecredit,0.0,electronics,0.0,0
2,845,1,4.921318,creditcard,0.0,food,1.0,0
3,503,1,4.886641,creditcard,0.0,electronics,1.0,0
4,2000,1,5.040929,creditcard,0.0,shopping,0.0,0


In [3]:
# d = dtale.show(df, ignore_duplicate=False)
# d.open_browser()
# d

In [4]:
class EDA:
    def __init__(self, df, report_path="eda_results", outlier_method="iqr", threshold=1.5):
        """
        outlier_method: 'iqr' or 'zscore'
        threshold: For IQR -> multiplier (1.5), for Z-Score -> sigma (3)
        """
        self.df = df.copy()  # Read-only
        self.report_path = report_path
        self.outlier_method = outlier_method
        self.threshold = threshold
        self.target_col = None

        # Folder structure
        folders = [
            "missing_data", "cardinality", "numerical_analysis", "categorical_analysis",
            "relationships", "outliers", "distribution_shape", "target_analysis",
            "advanced_relationships"
        ]
        for folder in folders:
            os.makedirs(os.path.join(report_path, folder), exist_ok=True)

    # ----------------- Helper Functions -----------------
    def save_text(self, folder, filename, content):
        with open(os.path.join(self.report_path, folder, filename), "w") as f:
            f.write(content)

    def set_target(self, target_name=None):
        if target_name is None:
            target_name = input("Enter target column name (or leave blank if none): ").strip()
        if target_name and target_name in self.df.columns:
            self.target_col = target_name
            print(f"[Info] Target column set to: {self.target_col}")
        else:
            self.target_col = None
            print("[Info] No target column set")

    # ----------------- 1. Missing Data Analysis -----------------
    def advanced_missing_data(self):
        missing = self.df.isnull().sum()
        missing_percent = (missing / len(self.df)) * 100
        summary = pd.DataFrame({"Missing Count": missing, "Missing %": missing_percent})
        summary = summary[summary["Missing Count"] > 0].sort_values(by="Missing %", ascending=False)
        self.save_text("missing_data", "missing_summary.txt", str(summary))

        # Heatmap
        plt.figure(figsize=(10, 6))
        sns.heatmap(self.df.isnull(), cbar=False)
        plt.title("Missing Data Heatmap")
        plt.savefig(os.path.join(self.report_path, "missing_data", "missing_heatmap.png"))
        plt.close()

        # Missing correlation (binary mask correlation)
        missing_corr = self.df.isnull().astype(int).corr()
        self.save_text("missing_data", "missing_correlation.txt", str(missing_corr))

        print("[Saved] Missing data summary, heatmap & correlation")

    # ----------------- 2. Cardinality Analysis -----------------
    def cardinality_analysis(self):
        cat_cols = self.df.select_dtypes(exclude=np.number).columns
        summary = {}
        high_cardinality = []
        for col in cat_cols:
            count = self.df[col].nunique()
            summary[col] = count
            if count > 50:  # Arbitrary threshold
                high_cardinality.append(col)
        text = f"Cardinality per categorical feature:\n{summary}\n\nHigh cardinality features (>50 unique): {high_cardinality}"
        self.save_text("cardinality", "cardinality_summary.txt", text)
        print("[Saved] Cardinality summary with high-cardinality flags")

    # ----------------- 3. Numerical Feature Relationships -----------------
    def numerical_relationships(self):
        num_cols = self.df.select_dtypes(include=np.number).columns
        if len(num_cols) <= 10 and len(num_cols) > 1:
            sns.pairplot(self.df[num_cols].dropna())
            plt.savefig(os.path.join(self.report_path, "relationships", "pairplot.png"))
            plt.close()
            print("[Saved] Pairplot for numerical features")

        # Boxplot of numeric vs categorical target
        if self.target_col and self.df[self.target_col].dtype == 'object':
            for col in num_cols:
                plt.figure(figsize=(6, 4))
                sns.boxplot(x=self.df[self.target_col], y=self.df[col])
                plt.title(f"{col} vs {self.target_col} (Boxplot)")
                plt.savefig(os.path.join(self.report_path, "relationships", f"{col}_vs_target_box.png"))
                plt.close()

        # Scatterplot vs continuous target
        if self.target_col and self.df[self.target_col].dtype != 'object':
            for col in num_cols:
                if col == self.target_col: continue
                plt.figure(figsize=(6, 4))
                sns.scatterplot(x=self.df[col], y=self.df[self.target_col])
                plt.title(f"{col} vs {self.target_col}")
                plt.savefig(os.path.join(self.report_path, "relationships", f"{col}_vs_target.png"))
                plt.close()

    # ----------------- 4. Categorical Feature Relationships -----------------
    def categorical_relationships(self):
        cat_cols = self.df.select_dtypes(exclude=np.number).columns
        chi_summary = []
        for col1, col2 in itertools.combinations(cat_cols, 2):
            table = pd.crosstab(self.df[col1], self.df[col2])
            chi2, p, _, _ = chi2_contingency(table)
            chi_summary.append(f"{col1} vs {col2} -> p={p:.4f}")
        self.save_text("relationships", "categorical_chi2.txt", "\n".join(chi_summary))

        # Cramér’s V
        cramer_results = []
        for col1, col2 in itertools.combinations(cat_cols, 2):
            table = pd.crosstab(self.df[col1], self.df[col2])
            chi2 = chi2_contingency(table)[0]
            n = table.sum().sum()
            phi2 = chi2 / n
            r, k = table.shape
            phi2corr = max(0, phi2 - ((k-1)*(r-1))/(n-1))
            rcorr = r - ((r-1)**2)/(n-1)
            kcorr = k - ((k-1)**2)/(n-1)
            cramer_v = np.sqrt(phi2corr / min((kcorr-1), (rcorr-1)))
            cramer_results.append(f"{col1} vs {col2} -> Cramér's V={cramer_v:.4f}")
        self.save_text("relationships", "categorical_cramers_v.txt", "\n".join(cramer_results))
        print("[Saved] Chi-square and Cramér's V results for categorical features")

    # ----------------- 5. Feature Importance (Mutual Information) -----------------
    def feature_importance(self):
        if self.target_col:
            X = self.df.drop(columns=[self.target_col]).select_dtypes(include=np.number).fillna(0)
            y = self.df[self.target_col]
            if y.dtype == 'object':
                y = y.astype('category').cat.codes
                scores = mutual_info_classif(X, y)
            else:
                scores = mutual_info_regression(X, y)
            mi_scores = pd.Series(scores, index=X.columns).sort_values(ascending=False)
            self.save_text("relationships", "mutual_information.txt", str(mi_scores))
            print("[Saved] Mutual Information scores")

    # ----------------- 6. Target Advanced Analysis -----------------
    def target_advanced(self):
        if self.target_col:
            # Class imbalance
            if self.df[self.target_col].dtype == 'object':
                counts = self.df[self.target_col].value_counts(normalize=True) * 100
                self.save_text("target_analysis", "class_imbalance.txt", str(counts))
            # Target mean per categorical
            cat_cols = self.df.select_dtypes(exclude=np.number).columns
            for col in cat_cols:
                if col != self.target_col:
                    mean_vals = self.df.groupby(col)[self.target_col].mean(numeric_only=True)
                    self.save_text("target_analysis", f"{col}_target_mean.txt", str(mean_vals))
            print("[Saved] Target distribution & mean analysis per category")

    # ----------------- 7. Correlation Beyond Pearson -----------------
    def advanced_correlations(self):
        num_df = self.df.select_dtypes(include=np.number)
        if num_df.shape[1] > 1:
            # Compute correlations
            spearman_corr = num_df.corr(method='spearman')
            kendall_corr = num_df.corr(method='kendall')

            # Save correlation matrices
            self.save_text("advanced_relationships", "spearman_corr.txt", str(spearman_corr))
            self.save_text("advanced_relationships", "kendall_corr.txt", str(kendall_corr))

            # Clean correlation matrix for clustering (replace NaN with 0)
            corr_matrix = num_df.corr().fillna(0)

            try:
                sns.clustermap(corr_matrix, annot=True, cmap='coolwarm')
                plt.savefig(os.path.join(self.report_path, "advanced_relationships", "clustered_corr_heatmap.png"))
                plt.close()
                print("[Saved] Spearman, Kendall correlations & clustered heatmap")
            except ValueError:
                print("[Warning] Could not generate clustered heatmap (NaNs/Infs in data even after fill).")
        else:
            print("[Info] Not enough numerical columns for advanced correlation analysis.")

    # ----------------- 8. Outlier Profiling -----------------
    def outlier_profiling(self):
        num_cols = self.df.select_dtypes(include=np.number).columns
        results = []
        for col in num_cols:
            Q1, Q3 = self.df[col].quantile([0.25, 0.75])
            IQR = Q3 - Q1
            lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
            outliers = self.df[(self.df[col] < lower) | (self.df[col] > upper)]
            results.append(f"{col}: {len(outliers)} outliers ({(len(outliers)/len(self.df))*100:.2f}%)")
            # Boxplot
            plt.figure(figsize=(6, 4))
            sns.boxplot(x=self.df[col])
            plt.title(f"Boxplot: {col}")
            plt.savefig(os.path.join(self.report_path, "outliers", f"box_{col}.png"))
            plt.close()

        # Multivariate outliers using Isolation Forest
        iso = IsolationForest(contamination=0.05)
        preds = iso.fit_predict(self.df[num_cols].fillna(0))
        outlier_count = np.sum(preds == -1)
        results.append(f"Multivariate Outliers (IsolationForest): {outlier_count}")

        self.save_text("outliers", "outlier_profile.txt", "\n".join(results))
        print("[Saved] Outlier profiles & multivariate outliers")

    # ----------------- 9. Distribution Shape -----------------
    def distribution_shape(self):
        num_cols = self.df.select_dtypes(include=np.number).columns
        results = []
        for col in num_cols:
            data = self.df[col].dropna()
            skew_val = skew(data)
            kurt_val = kurtosis(data)
            stat, p = shapiro(data.sample(min(5000, len(data))))
            results.append(f"{col}: Skew={skew_val:.2f}, Kurtosis={kurt_val:.2f}, Shapiro p={p:.4f}")
            # Histogram
            sns.histplot(data, kde=True)
            plt.title(f"Distribution: {col}")
            plt.savefig(os.path.join(self.report_path, "distribution_shape", f"dist_{col}.png"))
            plt.close()
        self.save_text("distribution_shape", "distribution_stats.txt", "\n".join(results))
        print("[Saved] Skewness, kurtosis & normality plots")

    # ----------------- 10. Dataset Integrity -----------------
    def integrity_checks(self):
        results = []
        # Constant features
        constant_cols = [col for col in self.df.columns if self.df[col].nunique() == 1]
        if constant_cols:
            results.append(f"Constant columns: {constant_cols}")

        # Highly correlated pairs
        num_df = self.df.select_dtypes(include=np.number)
        corr_matrix = num_df.corr().abs()
        high_corr = np.where((corr_matrix > 0.9) & (corr_matrix < 1))
        high_corr_pairs = [(num_df.columns[i], num_df.columns[j]) for i, j in zip(*high_corr)]
        if high_corr_pairs:
            results.append(f"Highly correlated pairs: {high_corr_pairs}")

        # Duplicate features
        duplicate_features = []
        for col1, col2 in itertools.combinations(self.df.columns, 2):
            if self.df[col1].equals(self.df[col2]):
                duplicate_features.append((col1, col2))
        if duplicate_features:
            results.append(f"Duplicate features: {duplicate_features}")

        self.save_text("relationships", "integrity_checks.txt", "\n".join(results) if results else "No issues found")
        print("[Saved] Integrity checks")

    # ----------------- Numerical & Categorical Plots -----------------
    def numerical_analysis(self):
        num_cols = self.df.select_dtypes(include=np.number).columns
        for col in num_cols:
            plt.figure(figsize=(8, 4))
            sns.histplot(self.df[col].dropna(), kde=True)
            plt.title(f"Distribution of {col}")
            plt.savefig(os.path.join(self.report_path, "numerical_analysis", f"hist_{col}.png"))
            plt.close()

            plt.figure(figsize=(6, 4))
            sns.boxplot(x=self.df[col].dropna())
            plt.title(f"Boxplot of {col}")
            plt.savefig(os.path.join(self.report_path, "numerical_analysis", f"box_{col}.png"))
            plt.close()
        print("[Saved] Numerical feature plots")

    def categorical_analysis(self):
        cat_cols = self.df.select_dtypes(exclude=np.number).columns
        for col in cat_cols:
            plt.figure(figsize=(8, 4))
            sns.countplot(y=self.df[col], order=self.df[col].value_counts().index)
            plt.title(f"Count Plot of {col}")
            plt.savefig(os.path.join(self.report_path, "categorical_analysis", f"count_{col}.png"))
            plt.close()
        print("[Saved] Categorical feature plots")
    
    def duplicate_rows_detail(self):
        duplicates = self.df[self.df.duplicated()]
        if not duplicates.empty:
            duplicates.to_csv(os.path.join(self.report_path, "relationships", "duplicate_rows.csv"), index=False)
            print(f"[Saved] Detailed duplicate rows ({len(duplicates)} rows)")
        else:
            print("[Info] No duplicate rows found")
    
    def data_imbalance_heatmap(self):
        if self.target_col and self.df[self.target_col].dtype == 'object':
            counts = self.df[self.target_col].value_counts(normalize=True) * 100
            plt.figure(figsize=(6, 4))
            sns.heatmap(counts.to_frame(), annot=True, fmt=".2f", cmap="coolwarm")
            plt.title(f"Class Imbalance Heatmap for {self.target_col}")
            plt.savefig(os.path.join(self.report_path, "target_analysis", "imbalance_heatmap.png"))
            plt.close()
            print("[Saved] Target imbalance heatmap")
        else:
            print("[Info] Target imbalance heatmap skipped (target not categorical)")

    def scaling_transformation_suggestions(self):
        num_cols = self.df.select_dtypes(include=np.number).columns
        suggestions = []

        for col in num_cols:
            data = self.df[col].dropna()
            if len(data) == 0: 
                continue

            # Skewness check
            skew_val = skew(data)
            suggestion = f"{col}: Skew={skew_val:.2f} -> "
            if abs(skew_val) > 1:
                suggestion += "Highly skewed; consider Log or Box-Cox transform"
            elif abs(skew_val) > 0.5:
                suggestion += "Moderately skewed; consider mild transform"
            else:
                suggestion += "Fairly symmetric"

            # Scaling suggestion
            if (data.max() - data.min()) > 1000:
                suggestion += " | Suggest MinMax Scaling (large range)"
            else:
                suggestion += " | Suggest Standard Scaling (normal range)"

            suggestions.append(suggestion)

        self.save_text("numerical_analysis", "scaling_transformation_suggestions.txt", "\n".join(suggestions))
        print("[Saved] Scaling and transformation suggestions")

    # ----------------- Run All -----------------
    def run_all(self, target_name=None):
        print("===== Starting EDA =====")
        self.set_target(target_name)
        self.advanced_missing_data()
        self.cardinality_analysis()
        self.numerical_analysis()
        self.categorical_analysis()
        self.numerical_relationships()
        self.categorical_relationships()
        self.feature_importance()
        self.target_advanced()
        self.advanced_correlations()
        self.outlier_profiling()
        self.distribution_shape()
        self.integrity_checks()
        self.duplicate_rows_detail()
        self.data_imbalance_heatmap()
        self.scaling_transformation_suggestions()

        summary = "EDA Completed.\nCheck folders for:\n"
        summary += "\n".join([
            "- missing_data", "- cardinality", "- numerical_analysis",
            "- categorical_analysis", "- relationships", "- outliers",
            "- distribution_shape", "- target_analysis", "- advanced_relationships"
        ])
        self.save_text("", "summary.txt", summary)
        print(f"\nEDA completed! Full report at '{self.report_path}'")

In [5]:
eda = EDA(df, report_path="Advanced_EDA_Results", outlier_method="zscore", threshold=3)
eda.run_all(target_name="label") 

===== Starting EDA =====
[Info] Target column set to: label
[Saved] Missing data summary, heatmap & correlation
[Saved] Cardinality summary with high-cardinality flags
[Saved] Numerical feature plots
[Saved] Categorical feature plots
[Saved] Pairplot for numerical features
[Saved] Chi-square and Cramér's V results for categorical features
[Saved] Mutual Information scores
[Saved] Target distribution & mean analysis per category
[Saved] Spearman, Kendall correlations & clustered heatmap
[Saved] Outlier profiles & multivariate outliers
[Saved] Skewness, kurtosis & normality plots
[Saved] Integrity checks
[Saved] Detailed duplicate rows (3033 rows)
[Info] Target imbalance heatmap skipped (target not categorical)
[Saved] Scaling and transformation suggestions

EDA completed! Full report at 'Advanced_EDA_Results'


In [6]:
def create_eda_collage(report_path="eda_results", output_file="eda_summary_collage.png", cols=3, image_size=(600, 400)):
    """
    Combine all .png images from EDA folders into one collage
    """
    # Find all images recursively
    image_paths = glob.glob(os.path.join(report_path, "**", "*.png"), recursive=True)
    
    if not image_paths:
        print("No images found to combine.")
        return

    # Resize all images to uniform size
    images = []
    for path in image_paths:
        img = Image.open(path).convert("RGB")
        img = img.resize(image_size)  # Resize for uniformity
        images.append(img)

    # Determine grid size
    total_images = len(images)
    rows = math.ceil(total_images / cols)
    width = cols * image_size[0]
    height = rows * image_size[1]

    # Create blank canvas
    collage = Image.new("RGB", (width, height), color=(255, 255, 255))

    # Paste images in grid
    for idx, img in enumerate(images):
        x = (idx % cols) * image_size[0]
        y = (idx // cols) * image_size[1]
        collage.paste(img, (x, y))

    # Save collage
    collage.save(os.path.join(report_path, output_file))
    print(f"[Saved] Collage created at: {os.path.join(report_path, output_file)}")


In [7]:
create_eda_collage(report_path="Advanced_EDA_Results", cols=4, image_size=(500, 350))


[Saved] Collage created at: Advanced_EDA_Results\eda_summary_collage.png


In [8]:
def create_text_summary(report_path="eda_results", output_file="eda_text_summary.txt"):
    """
    Combine all EDA text results into one summary file
    """
    text_paths = glob.glob(os.path.join(report_path, "**", "*.txt"), recursive=True)
    summary_content = []

    for path in sorted(text_paths):
        # Skip the final summary itself to avoid duplication
        if output_file in path:
            continue
        relative_path = os.path.relpath(path, report_path)
        summary_content.append(f"\n===== {relative_path} =====\n")
        with open(path, "r") as f:
            summary_content.append(f.read())

    with open(os.path.join(report_path, output_file), "w") as f:
        f.write("\n".join(summary_content))

    print(f"[Saved] Combined text summary at: {os.path.join(report_path, output_file)}")


In [9]:
create_text_summary(report_path="Advanced_EDA_Results", output_file="eda_text_summary.txt")

[Saved] Combined text summary at: Advanced_EDA_Results\eda_text_summary.txt
