In [1679]:
import numpy as np
import cv2 as cv
import random
import os
from tqdm import tqdm
from skimage.metrics import structural_similarity as ssim

In [1680]:
def hough_transform(edge_image, theta_steps=180, rho_step=1):
    """
    Corrected Hough Transform for line detection using raw image coordinates.
    
    Parameters:
    edge_image (numpy.ndarray): Binary edge image (0s and 1s).
    theta_steps (int): Number of angular bins (0 to 180 degrees).
    rho_step (float): Resolution step for rho (pixels).
    
    Returns:
    tuple: (accumulator, thetas, rhos)
        accumulator: 2D Hough accumulator array.
        thetas: Array of theta values (radians).
        rhos: Array of rho values (pixels).
    """
    height, width = edge_image.shape
    diag = int(np.ceil(np.hypot(height, width)))
    thetas = np.deg2rad(np.linspace(0, 180, theta_steps, endpoint=False))
    
    # Rho range covers -diag to diag, adjusted for rho_step
    max_rho = diag
    rhos = np.arange(-max_rho, max_rho + rho_step, rho_step)
    accumulator = np.zeros((len(rhos), theta_steps), dtype=np.uint64)
    
    # Get edge coordinates (no y-axis flipping)
    y_img, x = np.nonzero(edge_image)
    
    # Vectorized computation of rho for all (x, y) and thetas
    cos_theta = np.cos(thetas)
    sin_theta = np.sin(thetas)
    
    # Shape: (N_edges, theta_steps)
    rho_values = x[:, np.newaxis] * cos_theta + y_img[:, np.newaxis] * sin_theta
    
    # Quantize rho to bins
    rho_indices = np.round(
        (rho_values + max_rho) / rho_step  # Shift to non-negative
    ).astype(int)
    
    # Filter valid indices
    valid_mask = (rho_indices >= 0) & (rho_indices < len(rhos))
    theta_indices = np.tile(np.arange(theta_steps), (len(x), 1))
    
    # Update accumulator using valid indices
    np.add.at(
        accumulator, 
        (rho_indices[valid_mask], theta_indices[valid_mask]), 
        1
    )
    
    return accumulator, thetas, rhos

In [1681]:
def hough_lines(accumulator, thetas, rhos, threshold=200, neighborhood_size=20):
    lines = []
    half_size = neighborhood_size // 2
    # Get candidate indices where accumulator > threshold
    candidate_indices = np.argwhere(accumulator > threshold)
    
    for rho_idx, theta_idx in candidate_indices:
        candidate_value = accumulator[rho_idx, theta_idx]
        
        # Define neighborhood bounds (handling borders)
        rho_min = max(rho_idx - half_size, 0)
        rho_max = min(rho_idx + half_size + 1, accumulator.shape[0])
        theta_min = max(theta_idx - half_size, 0)
        theta_max = min(theta_idx + half_size + 1, accumulator.shape[1])
        
        # Extract the neighborhood
        neighborhood = accumulator[rho_min:rho_max, theta_min:theta_max]
        
        # Only add the candidate if it is the local maximum in its neighborhood
        if candidate_value == np.max(neighborhood):
            rho = rhos[rho_idx]
            theta = thetas[theta_idx]
            lines.append((rho, theta))
    
    return lines

In [1682]:
def ransac_line_vectorized(data, k, t, d):
    """
    Vectorized RANSAC algorithm to fit a line to data with outliers.

    Parameters:
    - data: array of shape (N, 2) where each row is (x, y)
    - k: maximum number of iterations
    - t: threshold value to determine when a data point fits the model
    - d: minimum number of inliers required to accept a model

    Returns:
    - best_model: tuple (m, c) for the line model y = m*x + c, or None if no valid model is found
    - best_consensus_set: array of indices of inlier points for the best model
    """
    best_model = None
    best_consensus_set = None
    best_inlier_count = 0
    best_error = np.inf

    for _ in range(k):
        # Randomly select n distinct points
        sample_indices = random.sample(range(data.shape[0]), 2)
        sample = data[sample_indices]

        # Skip vertical lines to avoid division by zero
        if sample[1, 0] == sample[0, 0]:
            continue

        # Compute line parameters: y = m*x + c
        m = (sample[1, 1] - sample[0, 1]) / (sample[1, 0] - sample[0, 0])
        c = sample[0, 1] - m * sample[0, 0]

        # Vectorized computation of distances of all points to the line
        distances = np.abs(m * data[:, 0] - data[:, 1] + c) / np.sqrt(m**2 + 1)
        consensus_set = np.where(distances < t)[0]
        inlier_count = consensus_set.size

        if inlier_count >= d:
            error = np.mean(distances[consensus_set])
            # Update best model if more inliers are found or if equal, but with lower error
            if inlier_count > best_inlier_count or (inlier_count == best_inlier_count and error < best_error):
                best_inlier_count = inlier_count
                best_error = error
                best_model = (m, c)
                best_consensus_set = consensus_set

    return best_model, best_consensus_set

In [1683]:
def convert_line_to_rho_theta(m, c):
    """
    Converts a line from slope-intercept form y = m*x + c to the polar form
    rho = x*cos(theta) + y*sin(theta) where rho >= 0 and theta in [0, π).

    Parameters:
    - m: slope of the line
    - c: intercept of the line

    Returns:
    - (rho, theta): tuple with the polar coordinates
    """
    # Using the line in form: m*x - y + c = 0
    # Normal vector is (m, -1) and its norm is sqrt(m^2+1)
    theta = np.arctan2(-1, m)
    rho = -c / np.sqrt(m**2 + 1)
    
    # Ensure that rho is nonnegative
    if rho < 0:
        rho = -rho
        theta += np.pi
        
    # Normalize theta to be in the range [0, π)
    theta = theta % np.pi
    return rho, theta

In [1684]:
def find_multiple_lines(data, k, t, d, min_remaining_points=100):
    """
    Iteratively applies RANSAC to find multiple lines.
    
    Parameters:
    - data: array of shape (N, 2) with (x, y) coordinates.
    - k, t, d: RANSAC parameters.
    - min_remaining_points: minimum number of points required to run RANSAC on remaining data.
    
    Returns:
    - lines: list of tuples (model, inlier_points) for each detected line.
    - remaining_data: points that were not fitted to any line.
    """
    lines = []
    remaining_data = data.copy()

    while remaining_data.shape[0] >= min_remaining_points:
        model, inlier_indices = ransac_line_vectorized(remaining_data, k, t, d)
        if model is None or inlier_indices is None or inlier_indices.size == 0:
            break

        m, c = model
        rho, theta = convert_line_to_rho_theta(m, c)

        lines.append((rho, theta))

        # Remove the inliers from the remaining data
        remaining_data = np.delete(remaining_data, inlier_indices, axis=0)
    return lines

In [1685]:
def group_lines(lines, angle_tol=15):
    """Group lines by their theta angles (in degrees) with a tolerance."""
    groups = []
    deg_tol = np.deg2rad(angle_tol)
    
    for line in lines:
        rho, theta = line
        matched = False
        
        for group in groups:
            t_diff = abs(theta - group['theta'])
            if min(t_diff, np.pi - t_diff) <= deg_tol:
                group['lines'].append(line)
                matched = True
                break
        
        if not matched:
            groups.append({'theta': theta, 'lines': [line]})
    
    return groups

def compute_max_distance(group):
    """Compute maximum distance between lines in a group."""
    rhos = [line[0] for line in group['lines']]
    max_rho, min_rho = max(rhos), min(rhos)
    return max_rho - min_rho

def find_perpendicular_pairs(groups, angle_tol=15):
    """Find pairs of perpendicular line groups."""
    pairs = []
    deg_tol = np.deg2rad(angle_tol)
    
    for i, g1 in enumerate(groups):
        for j, g2 in enumerate(groups[i+1:], i+1):
            angle_diff = abs(g1['theta'] - g2['theta'])
            if abs(min(angle_diff, np.pi - angle_diff) - np.pi/2) <= deg_tol:
                pairs.append((g1, g2))
    
    return pairs

def line_intersection(line1, line2):
    """Find intersection point of two lines."""
    rho1, theta1 = line1
    rho2, theta2 = line2
    
    A = np.array([
        [np.cos(theta1), np.sin(theta1)],
        [np.cos(theta2), np.sin(theta2)]
    ])
    b = np.array([[rho1], [rho2]])
    
    try:
        x, y = np.linalg.solve(A, b)
        return int(np.round(x)), int(np.round(y))
    except np.linalg.LinAlgError:
        return None

def find_largest_rectangle(lines):
    if lines is None:
        # print("No lines detected")
        return
    
    # Group lines and filter small groups
    groups = group_lines(lines)
    groups = [g for g in groups if len(g['lines']) >= 2]
    
    # Find perpendicular pairs and calculate areas
    pairs = find_perpendicular_pairs(groups)
    max_area = 0
    best_pair = None
    
    for g1, g2 in pairs:
        area = compute_max_distance(g1) * compute_max_distance(g2)
        if area > max_area:
            max_area = area
            best_pair = (g1, g2)
    
    if not best_pair:
        # print("No perpendicular pairs found")
        return
    
    # Get extreme lines from best pair
    g1_lines = sorted(best_pair[0]['lines'], key=lambda x: x[0])
    g2_lines = sorted(best_pair[1]['lines'], key=lambda x: x[0])
    
    l1_min = g1_lines[0]
    l1_max = g1_lines[-1]
    l2_min = g2_lines[0]
    l2_max = g2_lines[-1]
    
    # Find intersection points
    pts = [
        line_intersection(l1_min, l2_min),
        line_intersection(l1_min, l2_max),
        line_intersection(l1_max, l2_max),
        line_intersection(l1_max, l2_min)
    ]
    
    if None in pts:
        # print("Could not find all intersection points")
        return
    
    return pts

In [1686]:
def order_points(pts):
    """Sort points to [TL, TR, BR, BL] order."""
    pts = np.array(pts)
    x_sorted = pts[np.argsort(pts[:, 0])]
    left = x_sorted[:2]
    right = x_sorted[2:]
    left = left[np.argsort(left[:, 1])]
    tl, bl = left[0], left[1]
    right = right[np.argsort(right[:, 1])]
    tr, br = right[0], right[1]
    return np.array([tl, tr, br, bl], dtype=np.float32)

In [1687]:
def compute_homography(src_points, dst_points):
    # Compute homography matrix from source to destination points.
    A = []
    b = []
    for i in range(4):
        x, y = src_points[i]
        u, v = dst_points[i]
        A.append([x, y, 1, 0, 0, 0, -x*u, -y*u])
        A.append([0, 0, 0, x, y, 1, -x*v, -y*v])
        b.extend([u, v])
    A = np.array(A, dtype=np.float64)
    b = np.array(b, dtype=np.float64)
    h = np.linalg.lstsq(A, b, rcond=None)[0]
    H = np.array([
        [h[0], h[1], h[2]],
        [h[3], h[4], h[5]],
        [h[6], h[7], 1]
    ], dtype=np.float64)
    return H

In [1688]:
def resize_image(image, target_size):
    # Resize image to target_size (width, height)
    return cv.resize(image, target_size)

In [1689]:
def warp_image_bilinear(image, H, output_shape):
    output_height, output_width = output_shape
    H_inv = np.linalg.inv(H)
    
    u, v = np.meshgrid(np.arange(output_width), np.arange(output_height))
    grid = np.stack([u.ravel(), v.ravel(), np.ones_like(u.ravel())])
    src_points = H_inv @ grid
    
    x = (src_points[0] / src_points[2]).reshape(output_height, output_width)
    y = (src_points[1] / src_points[2]).reshape(output_height, output_width)
    
    # Floor and ceiling coordinates for bilinear interpolation
    x0 = np.floor(x).astype(int)
    y0 = np.floor(y).astype(int)
    x1 = x0 + 1
    y1 = y0 + 1
    
    # Clamp coordinates to prevent out-of-bounds
    x0 = np.clip(x0, 0, image.shape[1]-1)
    x1 = np.clip(x1, 0, image.shape[1]-1)
    y0 = np.clip(y0, 0, image.shape[0]-1)
    y1 = np.clip(y1, 0, image.shape[0]-1)
    
    # Interpolation weights
    wa = (x1 - x) * (y1 - y)
    wb = (x1 - x) * (y - y0)
    wc = (x - x0) * (y1 - y)
    wd = (x - x0) * (y - y0)
    
    # Sample pixel values
    Ia = image[y0, x0]
    Ib = image[y1, x0]
    Ic = image[y0, x1]
    Id = image[y1, x1]
    
    # Compute interpolated values
    warped = (wa[..., np.newaxis] * Ia +
              wb[..., np.newaxis] * Ib +
              wc[..., np.newaxis] * Ic +
              wd[..., np.newaxis] * Id).astype(np.uint8)
    
    return warped

In [1690]:
def warp_image(path):
    img = resize_image(cv.imread(path), (1024, 1024))

    img_gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

    img_blurred = cv.GaussianBlur(img_gray, (13, 13), 0)

    edges = cv.Canny(img_blurred, 50, 150)

    accumulator, thetas, rhos = hough_transform(edges)
    lines = hough_lines(accumulator, thetas, rhos, threshold=100)

    points = np.column_stack(np.where(edges > 0))[:, ::-1]

    lines_ransac = find_multiple_lines(points, 1000, 2, 150)

    lines.extend(lines_ransac)

    pts = find_largest_rectangle(lines)

    if not pts:
        return img

    ordered_corners = order_points(pts)

    # Calculate destination size (width and height)
    tl, tr, br, bl = ordered_corners
    width = int(max(np.linalg.norm(tr - tl), np.linalg.norm(br - bl)))
    height = int(max(np.linalg.norm(bl - tl), np.linalg.norm(br - tr)))

    # Define destination points (rectangle)
    dst_points = np.array([
        [0, 0],
        [width, 0],
        [width, height],
        [0, height]
    ], dtype=np.float32)

    # Compute homography matrix
    H = compute_homography(ordered_corners, dst_points)

    if height < 7 or width < 7:
        return img

    # Warp the image
    warped_image = warp_image_bilinear(img, H, (height, width))

    return warped_image

In [1691]:
def compute_ssim(imageA, imageB):
    # Convert images to grayscale for SSIM computation
    grayA = cv.cvtColor(imageA, cv.COLOR_BGR2GRAY)
    grayB = cv.cvtColor(imageB, cv.COLOR_BGR2GRAY)

    height, width = grayB.shape

    grayA_resized = resize_image(grayA, (width, height))

    score, _ = ssim(grayA_resized, grayB, full=True)
    return score

In [1692]:
start_path = "WarpDoc"

digital = os.path.join(start_path, "digital")
distorted = os.path.join(start_path, "distorted")

total_ssim_scores = []

for folder in os.listdir(distorted):
    digital_path = os.path.join(digital, folder)
    distorted_path = os.path.join(distorted, folder)

    ssim_scores = []

    for image_path in tqdm(os.listdir(distorted_path)):
        pathA = os.path.join(digital_path, image_path)
        pathB = os.path.join(distorted_path, image_path)
        
        image = resize_image(cv.imread(pathA), (1024, 1024))
        warped_image = warp_image(pathB)

        if image is None or warped_image is None:
            print(f"Error loading image: {pathA} or {pathB}")

        score = compute_ssim(image, warped_image)
        ssim_scores.append(score)

    total_ssim_scores.extend(ssim_scores)
    
    average_ssim = sum(ssim_scores) / len(ssim_scores)
    print(f"SSIM for folder ({folder}): {average_ssim}")

# Calculate and print the average SSIM if there are valid image pairs
if total_ssim_scores:
    average_total_ssim = sum(total_ssim_scores) / len(total_ssim_scores)
    print(f"Average SSIM over all pairs: {average_total_ssim}")
else:
    print("No valid image pairs to compute SSIM.")

  return int(np.round(x)), int(np.round(y))
100%|██████████| 100/100 [01:54<00:00,  1.15s/it]


SSIM for folder (curved): 0.20276961879593677


100%|██████████| 100/100 [01:52<00:00,  1.13s/it]


SSIM for folder (fold): 0.20094835407299386


100%|██████████| 100/100 [01:38<00:00,  1.02it/s]


SSIM for folder (incomplete): 0.21946030064507657


100%|██████████| 100/100 [02:48<00:00,  1.69s/it]


SSIM for folder (perspective): 0.2202182650132417


100%|██████████| 100/100 [02:16<00:00,  1.36s/it]


SSIM for folder (random): 0.20564519723268673


100%|██████████| 100/100 [02:06<00:00,  1.27s/it]

SSIM for folder (rotate): 0.2570046922940457
Average SSIM over all pairs: 0.21767440467566354



