In [7]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib_inline.backend_inline import set_matplotlib_formats
from scipy.ndimage import binary_fill_holes, label, generate_binary_structure, convolve
from scipy.stats import linregress
import random
import seaborn as sns


set_matplotlib_formats('png', dpi=150)

In [8]:
# --- Config ---
lattice_height = 2000
lattice_width = 2000
fill_probability = 0.405
feature_area_threshold = 1000
convolution_structure = generate_binary_structure(rank=2, connectivity=1)  # 4-connected

In [9]:
# --- Generate random lattice ---
def generate_lattice(width, height, prob):
    return np.random.rand(height, width) < prob

# --- Filter small features ---
def filter_features_over_threshold(labeled_features, num_features, threshold):
    counts = np.bincount(labeled_features.ravel())
    valid = counts > threshold
    valid[0] = False  # background stays 0
    mask = valid[labeled_features]
    filtered_labels, new_count = label(mask, structure=convolution_structure)
    return filtered_labels, new_count

# --- Check if a feature touches image border ---
def touches_edge(coords, shape):
    return (
        (coords[:, 0] == 0).any() or
        (coords[:, 0] == shape[0] - 1).any() or
        (coords[:, 1] == 0).any() or
        (coords[:, 1] == shape[1] - 1).any()
    )

def crop_to_bounding_box(mask):
    """
    Crops a binary mask to the tightest bounding box around the non-zero region.
    Assumes there is exactly one connected component (cloud) in the mask.

    Parameters:
        mask : 2D boolean np.ndarray

    Returns:
        cropped : 2D boolean np.ndarray (cropped region)
    """
    rows, cols = np.where(mask)
    if rows.size == 0:
        return np.zeros((0, 0), dtype=bool)  # Empty case

    row_min, row_max = rows.min(), rows.max() + 1
    col_min, col_max = cols.min(), cols.max() + 1
    return mask[row_min:row_max, col_min:col_max]

def compute_perimeter(mask):
    """
    Computes the 4-connected perimeter (pixel-side length) of a binary cloud mask.

    Parameters:
        mask : 2D boolean np.ndarray (cropped)

    Returns:
        perimeter : int
    """
    kernel = np.array([[0, 1, 0],
                       [1, 0, 1],
                       [0, 1, 0]])
    neighbors = convolve(mask.astype(np.uint8), kernel, mode='constant', cval=0)
    return np.sum(mask * (4 - neighbors))

def compute_area(mask):
    """
    Computes the area (number of True pixels) in a binary mask.

    Parameters:
        mask : 2D boolean np.ndarray (cropped)

    Returns:
        area : int
    """
    return np.count_nonzero(mask)

def count_edge_contacts(coords, shape):
    """
    Returns the number of pixels in the feature that touch each image edge.
    
    Parameters:
        coords : np.ndarray of shape (N, 2)
            Coordinates of pixels in the feature.
        shape : tuple
            Shape of the full image (height, width)
    
    Returns:
        edge_counts : np.ndarray of shape (4,)
            Number of pixels touching [left, top, right, bottom] edges respectively.
    """
    rows, cols = coords[:, 0], coords[:, 1]
    height, width = shape

    left_count   = np.sum(cols == 0)
    top_count    = np.sum(rows == 0)
    right_count  = np.sum(cols == width - 1)
    bottom_count = np.sum(rows == height - 1)

    return np.array([left_count, top_count, right_count, bottom_count])

def interpret_edge_touch(edge_counts):
    """
    Interprets how a cloud touches the edges of the image.

    Parameters:
        edge_counts : np.ndarray of shape (4,)
            Number of pixels touching [left, top, right, bottom] edges.

    Returns:
        int:
            0  → no edge touched
            1  → exactly one edge touched
            2  → exactly two adjacent edges touched
            -1 → otherwise (non-adjacent pairs or >= 3 edges touched)
    """
    # Find which edges are touched
    left = edge_counts[0] > 0
    top = edge_counts[1] > 0
    right = edge_counts[2] > 0
    bottom = edge_counts[3] > 0

    num_touched = np.count_nonzero(edge_counts)

    if num_touched == 0:
        return 0
    elif num_touched == 1:
        return 1
    elif num_touched == 2:
        if (left and right) or (top and bottom):
            return -1
        else: 
            return 2
    else:
        return -1

def plot_loglog_with_fit(areas, perims, title):
    log_area = np.log(areas)
    log_perim = np.log(perims)
    slope, intercept, r_value, _, _ = linregress(log_area, log_perim)
    D = 2 * slope
    r2 = r_value ** 2

    plt.figure(figsize=(6, 5))
    plt.loglog(areas, perims, 'o', markersize=2, alpha=0.6, label='Data')
    plt.loglog(areas, np.exp(intercept) * areas**slope, 'r--',
               label=f'Fit: slope={slope:.4f} → D={D:.4f}')
    plt.xlabel('Area (log scale)')
    plt.ylabel('Perimeter (log scale)')
    plt.title(f'{title}\nFractal Dimension ≈ {D:.4f}, R² = {r2:.4f}')
    plt.legend()
    plt.grid(True, which='both', ls='--', lw=0.5)
    plt.tight_layout()
    plt.show()

    print(f"{title}")
    print(f"  Slope (log-log): {slope:.4f}")
    print(f"  Estimated fractal dimension D: {D:.4f}")
    print(f"  R²: {r2:.4f}\n")

def compute_loglog_stats(areas, perims, return_slope_intercept=False):
    """
    Computes slope, intercept, fractal dimension D, and R² from log-log fit.
    
    Parameters:
        areas : np.ndarray
        perims : np.ndarray
        return_slope_intercept : bool — if True, also returns slope and intercept

    Returns:
        D, R²               (if return_slope_intercept is False)
        slope, D, R²        (if return_slope_intercept is True)
    """
    if len(areas) == 0 or len(perims) == 0:
        return None

    log_area = np.log(areas)
    log_perim = np.log(perims)
    slope, intercept, r_value, _, _ = linregress(log_area, log_perim)
    D = 2 * slope
    r2 = r_value ** 2

    if return_slope_intercept:
        return slope, intercept, D, r2
    else:
        return D, r2

def truncate_with_edge_exposure(mask, num_cuts=1):

    """
    Truncates a 2D binary/numeric mask along vertical and/or horizontal axes,
    resizing the grid and reporting how many non-zero pixels were exposed on
    edges that result from the cut(s).

    Parameters:
        mask : 2D np.ndarray
            A cropped region containing a single object or cloud.
        num_cuts : int
            Number of cuts to apply: 0, 1 (default), or 2.

    Returns:
        truncated : 2D np.ndarray
            The resized (cropped) array after truncation.
        exposed_edge_counts : np.ndarray([left, top, right, bottom])
            Non-zero pixels exposed only on cut-created edges.
    """
    height, width = mask.shape
    row_start, row_end = 0, height
    col_start, col_end = 0, width

    # Track which edges were cut
    cut_edges = {"left": False, "top": False, "right": False, "bottom": False}

    if num_cuts == 0:
        truncated = mask.copy()
    else:
        cuts_to_do = []
        if num_cuts == 1:
            cuts_to_do = [np.random.choice(['vertical', 'horizontal'])]
        elif num_cuts == 2:
            cuts_to_do = ['vertical', 'horizontal']

        for direction in cuts_to_do:
            if direction == 'vertical' and width > 2:
                cut_col = np.random.randint(1, width)
                if np.random.rand() < 0.5:
                    # Keep RIGHT side → exposed edge = left
                    col_start = cut_col
                    cut_edges["left"] = True
                else:
                    # Keep LEFT side → exposed edge = right
                    col_end = cut_col
                    cut_edges["right"] = True

            elif direction == 'horizontal' and height > 2:
                cut_row = np.random.randint(1, height)
                if np.random.rand() < 0.5:
                    # Keep BOTTOM side → exposed edge = top
                    row_start = cut_row
                    cut_edges["top"] = True
                else:
                    # Keep TOP side → exposed edge = bottom
                    row_end = cut_row
                    cut_edges["bottom"] = True

        # Apply slicing
        truncated = mask[row_start:row_end, col_start:col_end]

    # Compute edge counts only for the cut-exposed edges
    exposed_edge_counts = np.zeros(4, dtype=int)  # [left, top, right, bottom]
    if truncated.size > 0:
        if cut_edges["left"]:
            exposed_edge_counts[0] = np.count_nonzero(truncated[:, 0])
        if cut_edges["top"]:
            exposed_edge_counts[1] = np.count_nonzero(truncated[0, :])
        if cut_edges["right"]:
            exposed_edge_counts[2] = np.count_nonzero(truncated[:, -1])
        if cut_edges["bottom"]:
            exposed_edge_counts[3] = np.count_nonzero(truncated[-1, :])

    return truncated, exposed_edge_counts

def plot_D_distributions(results, bins=20):
    """
    Plots the distributions of D_real and D_mirror values from the benchmarking results.
    
    Parameters:
        results : list of dicts
            Each dict should contain keys 'D_real' and 'D_mirror'
        bins : int
            Number of bins to use for the histogram
    """
    D_real = [r["D_real"] for r in results if r.get("D_real") is not None]
    D_mirror = [r["D_mirror"] for r in results if r.get("D_mirror") is not None]

    plt.figure(figsize=(10, 5))
    sns.histplot(D_real, color='skyblue', label='D (Real)', kde=True, stat='density', bins=bins, alpha=0.6)
    sns.histplot(D_mirror, color='orange', label='D (Mirrored)', kde=True, stat='density', bins=bins, alpha=0.6)

    mean_real = np.mean(D_real)
    mean_mirror = np.mean(D_mirror)

    plt.axvline(mean_real, color='blue', linestyle='--', label=f'Mean Real D = {mean_real:.4f}')
    plt.axvline(mean_mirror, color='darkorange', linestyle='--', label=f'Mean Mirror D = {mean_mirror:.4f}')

    plt.title("Distribution of Fractal Dimension (D)\nReal vs Mirrored Clouds")
    plt.xlabel("Fractal Dimension (D)")
    plt.ylabel("Density")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.4)
    plt.tight_layout()
    plt.show()

In [10]:
def plot_cloud_comparison(lattice, cloud_id, original_mask, truncated_mask, mirrored_mask):
    """
    Plots a 4-panel visualization of:
    - Full lattice
    - Original cloud
    - Truncated version
    - Mirrored approximation
    """
    fig, axs = plt.subplots(1, 4, figsize=(16, 4))

    axs[0].imshow(lattice, cmap='gray')
    axs[0].set_title('Original Lattice')
    axs[0].axis('off')

    axs[1].imshow(original_mask, cmap='gray')
    axs[1].set_title(f'Original Cloud #{cloud_id}')
    axs[1].axis('off')

    axs[2].imshow(truncated_mask, cmap='gray')
    axs[2].set_title('Truncated Cloud')
    axs[2].axis('off')

    axs[3].imshow(mirrored_mask, cmap='gray')
    axs[3].set_title('Mirrored Cloud (Approx.)')
    axs[3].axis('off')

    plt.tight_layout()
    plt.show()


def visualize_sample_clouds(lattice, labeled_features, max_clouds=3, num_cuts=1):
    """
    Randomly selects a few internal clouds and visualizes their:
    - Original form
    - Truncated version
    - Mirrored reconstruction

    Parameters:
        lattice : 2D np.ndarray (bool)
        labeled_features : 2D np.ndarray (int)
        max_clouds : int
        num_cuts : int (1 or 2)
    """
    all_ids = list(np.unique(labeled_features))
    all_ids = [x for x in all_ids if x != 0]
    random.shuffle(all_ids)

    shown = 0
    for cloud_id in all_ids:
        if shown >= max_clouds:
            break

        mask = labeled_features == cloud_id
        coords = np.argwhere(mask)

        # Check that it's internal
        if interpret_edge_touch(count_edge_contacts(coords, mask.shape)) != 0:
            continue

        cropped = crop_to_bounding_box(mask)
        chopped, exposed = truncate_with_edge_exposure(cropped, num_cuts=num_cuts)

        if chopped.size == 0:
            continue

        # Approximate mirrored reconstruction
        mirrored = chopped.copy()
        if exposed[0] > 0:  # left edge exposed → mirror left to right
            mirrored = np.concatenate([np.flip(chopped, axis=1), chopped], axis=1)
        elif exposed[2] > 0:  # right edge exposed → mirror right to left
            mirrored = np.concatenate([chopped, np.flip(chopped, axis=1)], axis=1)

        if exposed[1] > 0:  # top edge exposed → mirror top to bottom
            mirrored = np.concatenate([np.flip(mirrored, axis=0), mirrored], axis=0)
        elif exposed[3] > 0:  # bottom edge exposed → mirror bottom to top
            mirrored = np.concatenate([mirrored, np.flip(mirrored, axis=0)], axis=0)

        plot_cloud_comparison(lattice, cloud_id, cropped, chopped, mirrored)
        shown += 1

def plot_cloud_delta_D_curve(cloud_results):
    slice_fracs = [r["slice_frac"] for r in cloud_results]
    delta_Ds = [r["delta_D"] for r in cloud_results]

    plt.figure(figsize=(6, 4))
    plt.plot(slice_fracs, delta_Ds, 'o-', markersize=3)
    plt.xlabel("Slice Fraction")
    plt.ylabel("ΔD (D_mirror - D_true)")
    plt.title("ΔD vs Slice Fraction (Single Cloud)")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_single_run_D_distributions(run_results):
    D_true_vals = [r["D_true"] for r in run_results]
    D_mirror_vals = [r["D_mirror"] for r in run_results]

    plt.figure(figsize=(10, 5))
    sns.histplot(D_true_vals, label='D_true', color='skyblue', bins=20, kde=True)
    sns.histplot(D_mirror_vals, label='D_mirror', color='orange', bins=20, kde=True)
    plt.legend()
    plt.xlabel("Fractal Dimension")
    plt.title("Distribution of D_true vs D_mirror (Single Run)")
    plt.grid(True, linestyle='--')
    plt.tight_layout()
    plt.show()

In [11]:
import numpy as np

def analyze_cloud_slice_mirroring(cloud_mask, cloud_id, slice_fracs=np.linspace(0.01, 0.99, 100)):
    """
    Given a single binary cloud mask, this function:
    - Computes D_true (from full cloud)
    - Applies left-side slicing at increasing slice_frac
    - Mirrors each slice
    - Computes D_mirror
    - Returns results as a list of dicts (one per slice)

    Parameters:
        cloud_mask : 2D np.ndarray (boolean)
            A binary mask of the internal cloud (already cropped to bounding box).
        cloud_id : int
            Identifier for this cloud.
        slice_fracs : np.ndarray
            Fractions (0 < f < 1) to slice from the left.

    Returns:
        results : list of dicts
            Each dict contains cloud_id, area, D_true, slice_frac, D_mirror, delta_D
    """
    results = []

    full_area = compute_area(cloud_mask)
    full_perim = compute_perimeter(cloud_mask)

    if full_area < 10 or full_perim < 10:
        return []  # skip degenerate masks

    D_true, R2_true = compute_loglog_stats(np.array([full_area]), np.array([full_perim]))

    h, w = cloud_mask.shape
    for frac in slice_fracs:
        cut_col = int(w * frac)
        if cut_col < 2 or cut_col >= w - 1:
            continue  # skip meaningless slices

        sliced = cloud_mask[:, cut_col:]
        if np.count_nonzero(sliced) == 0:
            continue  # nothing left

        mirrored = np.concatenate([sliced[:, ::-1], sliced], axis=1)

        if frac == 0.5 and cloud_id < 5:  # Only show for a few clouds
            fig, axs = plt.subplots(1, 3, figsize=(12, 4))

            axs[0].imshow(cloud_mask, cmap='gray')
            axs[0].set_title("Original Cloud")

            axs[1].imshow(sliced, cmap='gray')
            axs[1].set_title(f"Sliced (frac={frac:.2f})")

            axs[2].imshow(mirrored, cmap='gray')
            axs[2].set_title("Mirrored Slice")

            for ax in axs:
                ax.axis('off')

            plt.suptitle(f"Cloud ID {cloud_id} — Slice Fraction {frac:.2f}")
            plt.tight_layout()
            plt.show()

        area_mirror = compute_area(mirrored)
        perim_mirror = compute_perimeter(mirrored)

        D_mirror, R2_mirror = compute_loglog_stats(np.array([area_mirror]), np.array([perim_mirror]))

        results.append({
            "cloud_id": cloud_id,
            "area": full_area,
            "D_true": D_true,
            "slice_frac": frac,
            "D_mirror": D_mirror,
            "delta_D": D_mirror - D_true,
            "R2_true": R2_true,
            "R2_mirror": R2_mirror,
            "area_mirror": area_mirror,
            "perim_mirror": perim_mirror,
        })

    return results

In [12]:
import pandas as pd
from tqdm import tqdm

def run_mirroring_experiment(
    n_runs=10,
    lattice_width=2000,
    lattice_height=2000,
    fill_probability=0.405,
    feature_area_threshold=1000,
    output_csv_path="slice_mirroring_results.csv",
    save_every=1,
):
    """
    Main experiment runner. For each run:
    - Generates a random lattice
    - Identifies large internal features
    - Runs mirroring slice analysis on each feature
    - Aggregates results into a CSV
    """
    all_results = []
    cloud_id_counter = 0

    for run_id in range(1, n_runs + 1):
        print(f"\n=== Run {run_id} ===")
        lattice = generate_lattice(lattice_width, lattice_height, fill_probability)

        plt.figure(figsize=(6, 6))
        plt.imshow(lattice, cmap='gray')
        plt.title("Raw Lattice (Random Fill)")
        plt.axis('off')
        plt.show()

        filled = binary_fill_holes(lattice, structure=convolution_structure)
        labeled, num_features = label(filled, structure=convolution_structure)
        labeled, num_features = filter_features_over_threshold(labeled, num_features, feature_area_threshold)

        # Only internal features (edge_case == 0)
        internal_mask = np.zeros_like(labeled, dtype=bool)
        for label_id in range(1, num_features + 1):
            mask = (labeled == label_id)
            coords = np.argwhere(mask)
            if interpret_edge_touch(count_edge_contacts(coords, mask.shape)) == 0:
                internal_mask |= mask

        plt.figure(figsize=(6, 6))
        plt.imshow(internal_mask, cmap='gray')
        plt.title("Internal Features Only")
        plt.axis('off')
        plt.show()

        for label_id in range(1, num_features + 1):
            mask = (labeled == label_id)
            coords = np.argwhere(mask)
            edge_counts = count_edge_contacts(coords, mask.shape)
            edge_case = interpret_edge_touch(edge_counts)

            if edge_case != 0:
                continue  # Skip non-internal features

            cropped = crop_to_bounding_box(mask)
            cloud_results = analyze_cloud_slice_mirroring(cropped, cloud_id=cloud_id_counter)

            if cloud_results:
                plot_cloud_delta_D_curve(cloud_results)

            if cloud_results:
                all_results.extend(cloud_results)
                cloud_id_counter += 1

        # Optionally checkpoint results every few runs
        if run_id % save_every == 0:
            print(f"Saving interim results after run {run_id}...")
            df = pd.DataFrame(all_results)
            df.to_csv(output_csv_path, index=False)

    # Final save
    print("Saving final results...")
    df = pd.DataFrame(all_results)
    df.to_csv(output_csv_path, index=False)

    plot_single_run_D_distributions(all_results)

    print(f"Results saved to {output_csv_path}")

    return df

In [None]:
df_results = run_mirroring_experiment(n_runs=10)