In [1]:
import json
import nbformat
import ast

# ---- STEP 1: Extract Code from Jupyter Notebook ---- #
def extract_code_from_notebook(notebook_path, output_script_path="extracted_script.py"):
    """Extracts Python code from an .ipynb notebook and saves it as a .py script."""
    with open(notebook_path, "r", encoding="utf-8") as f:
        notebook = nbformat.read(f, as_version=4)

    code_cells = [cell["source"] for cell in notebook.cells if cell["cell_type"] == "code"]
    extracted_code = "\n\n".join(code_cells)

    with open(output_script_path, "w", encoding="utf-8") as f:
        f.write(extracted_code)

    print(f"Code extracted and saved to {output_script_path}")
    return extracted_code

# ---- STEP 2: Static Analysis for Data Leakage ---- #
class LeakageAnalyzer(ast.NodeVisitor):
    """Analyzes Python code for data leakage risks."""

    def __init__(self):
        self.transformed_vars = set()
        self.split_vars = set()
        self.test_vars = set()
        self.leakage_warnings = {"Preprocessing Leakage": [], "Overlap Leakage": [], "Multi-Test Leakage": []}

    def visit_Assign(self, node):
        """Detects fit_transform() or fit_resample() and tracks assigned variables."""
        if isinstance(node.value, ast.Call) and hasattr(node.value.func, "attr"):
            function_name = node.value.func.attr

            if function_name in ["fit_transform", "fit_resample"]:
                if isinstance(node.targets[0], ast.Name):
                    transformed_var = node.targets[0].id
                    self.transformed_vars.add(transformed_var)
                    self.leakage_warnings["Preprocessing Leakage"].append(
                        f"{function_name} used on {transformed_var} at line {node.lineno}"
                    )

        self.generic_visit(node)

    def visit_Call(self, node):
        """Detects train_test_split(), pd.concat(), and model.score()"""
        if hasattr(node.func, "attr"):
            function_name = node.func.attr

            if function_name == "train_test_split":
                for arg in node.args:
                    if isinstance(arg, ast.Name):
                        self.split_vars.add(arg.id)

            if function_name == "concat":
                self.leakage_warnings["Overlap Leakage"].append(
                    f"pd.concat() used before splitting at line {node.lineno}"
                )

            if function_name == "score":
                for arg in node.args:
                    if isinstance(arg, ast.Name):
                        if arg.id in self.test_vars:
                            self.leakage_warnings["Multi-Test Leakage"].append(
                                f"Multiple evaluations on {arg.id} at line {node.lineno}"
                            )
                        self.test_vars.add(arg.id)

        self.generic_visit(node)

    def check_for_preprocessing_leakage(self):
        """Confirm preprocessing leakage only if transformed variables are used in train_test_split."""
        for var in self.transformed_vars:
            if var in self.split_vars:
                self.leakage_warnings["Preprocessing Leakage"].append(
                    f"Confirmed: {var} transformed before train-test split"
                )

# ---- STEP 3: Run the Full Process ---- #
def analyze_code_for_leakage(code):
    """Runs the static analysis for detecting data leakage in Python code."""
    tree = ast.parse(code)
    analyzer = LeakageAnalyzer()
    analyzer.visit(tree)
    analyzer.check_for_preprocessing_leakage()

    print("\nAnalyzing Code for Data Leakage...\n")
    issues_found = sum(len(w) for w in analyzer.leakage_warnings.values())

    for category, warnings in analyzer.leakage_warnings.items():
        if warnings:
            print(f"{category}:")
            for warning in warnings:
                print(f"  - {warning}")

    if issues_found == 0:
        print("\nNo obvious data leakage detected")
    else:
        print(f"\nAnalysis complete: {issues_found} issue(s) found")

if __name__ == "__main__":
    notebook_path = "titanic-advanced-feature-engineering-tutorial.ipynb"

    extracted_code = extract_code_from_notebook(notebook_path, "extracted_script.py")
    analyze_code_for_leakage(extracted_code)


Code extracted and saved to extracted_script.py

Analyzing Code for Data Leakage...

Preprocessing Leakage:
  - fit_transform used on X_train at line 552
  - fit_transform used on X_test at line 554
Overlap Leakage:
  - pd.concat() used before splitting at line 21
  - pd.concat() used before splitting at line 540
  - pd.concat() used before splitting at line 541

Analysis complete: 5 issue(s) found
