In [None]:
import gc
import os
import pickle
import sys
import time
import uuid
from typing import Final, Optional

import cv2 as OpenCV
import numpy as np
from matplotlib import pyplot as plt
from numpy.linalg import norm
from scipy.cluster.vq import kmeans, vq
from scipy.spatial import Delaunay

### Utils Functions / Exceptions

In [None]:
def log_to_file(file_name: str, message: str):
    import datetime
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_message = f"[{timestamp}] {message}"
    with open(file_name, "a") as f:
        f.write(f"{log_message}\n")


def print_size(file_name: str, obj, obj_name="N/A"):
    from pympler import asizeof
    memory_usage = asizeof.asizeof(obj)
    # Convert memory usage to a more readable format
    if memory_usage < 1024:
        memory_usage_str = f"{memory_usage} bytes"
    elif memory_usage < 1024 ** 2:
        memory_usage_str = f"{memory_usage / 1024} KB"
    elif memory_usage < 1024 ** 3:
        memory_usage_str = f"{memory_usage / (1024 ** 2)} MB"
    else:
        memory_usage_str = f"{memory_usage / (1024 ** 3)} GB"
    # Print the memory usage and object name
    log_to_file(file_name, f"Memory usage of {obj_name}: {memory_usage_str}")

def timeit(func):
    def wrapper(*args, **kwargs):
        image_set_name = kwargs['image_set_name']
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Started {func.__name__}...")
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Done {func.__name__} took {end_time - start_time:,} seconds to execute.")
        return result
    return wrapper

In [None]:
class CalibrationError(Exception):
    def __init__(self, message):
        self.message = message

class IntrinsicParametersNotFoundError(Exception):
    def __init__(self, message):
        self.message = message

### Main Classes

In [None]:
class Image:
    def __init__(self, img_id, rgb_image, gray_image, keypoints, descriptors, path):
        self.img_id: int = int(img_id)
        self.unique_id: uuid = uuid.uuid4()
        self.rgb_image: Image = rgb_image
        self.gray_image: Image = gray_image
        self.keypoints: list[OpenCV.KeyPoint] = keypoints
        self.descriptors: np.ndarray = descriptors
        self.path: str = path

    @property
    def length(self):
        return f"{len(self.keypoints)}" if len(self.keypoints) == len(self.descriptors) else f"{len(self.keypoints)}, {len(self.descriptors)}"
    
    def draw_sift_features(self):
        image_with_sift = OpenCV.drawKeypoints(self.rgb_image, self.keypoints, None, flags=OpenCV.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
        plt.imshow(image_with_sift)
        plt.title("Image with SIFT Features")
        plt.axis('off')
        plt.show()

    def display_rgb_image(self, title: Optional[str] = None):
        image = self.rgb_image
        plt.imshow(image)
        if title is not None:
            plt.title(title)
        plt.axis('off')
        plt.show()

    def display_gray_image(self, title: Optional[str] = None):
        image = self.gray_image
        plt.gray()
        plt.imshow(image)
        if title is not None:
            plt.title(title)
        plt.axes('off')
        plt.show()

        
    def __repr__(self):
        return f"Image({self.img_id})"
    
    def __str__(self):
        return self.__repr__()
    
    def __eq__(self, other):
        return self.unique_id == other.unique_id
    
    def __hash__(self):
        return hash(self.img_id)
    
    def __getstate__(self):
        state = self.__dict__.copy()
        state['keypoints'] = [tuple(k.pt) + (k.size, k.angle, k.response, k.octave, k.class_id) for k in self.keypoints]
        return state
    
    def __setstate__(self, state):
        state['keypoints'] = [OpenCV.KeyPoint(x, y, size, angle, response, octave, class_id) for x, y, size, angle, response, octave, class_id in state['keypoints']]
        self.__dict__ = state

class FeatureMatches:
    def __init__(self, image_one: Image, image_two: Image, matches: list[OpenCV.DMatch]):
        self.image_one: Image = image_one
        self.image_two: Image = image_two
        self.matches: list[OpenCV.DMatch] = matches

    def draw_matches(self, output_filename: str) -> None:
        combined_image = OpenCV.hconcat([
            self.image_one.rgb_image,
            self.image_two.rgb_image
        ])

        for match in self.matches:
            x1, y1 = self.image_one.keypoints[match.queryIdx].pt
            x2, y2 = self.image_two.keypoints[match.trainIdx].pt
            # Draw a line connecting the matched keypoints
            OpenCV.line(
                combined_image, 
                (int(x1), int(y1)), 
                (int(x2) + self.image_one.rgb_image.shape[1], int(y2)), 
                (0, 255, 0), 
                1
            )

        OpenCV.imwrite(output_filename, combined_image)

    def __repr__(self):
        return f"FeatureMatches({self.image_one}, {self.image_two} ---> {len(self.matches)})"

    def __getstate__(self):
        state = self.__dict__.copy()
        state['matches'] = [
            {'queryIdx': m.queryIdx, 'trainIdx': m.trainIdx, 'distance': m.distance} for m in self.matches
        ]
        return state
    
    def __setstate__(self, state):
        state['matches'] = [
            OpenCV.DMatch(match['queryIdx'], match['trainIdx'], match['distance']) for match in state['matches']
        ]
        self.__dict__ = state
    
class Images:
    def __init__(self, images: list[Image], image_set_name: str):
        self.id = uuid.uuid4()
        self.images: list[Image] = images
        self.image_set_name: str = image_set_name
        self.feature_matches: list[FeatureMatches] = []
        self.similar_images: dict[list[Image]] = {}
        self.num_clusters: int = 50

    def save_feature_matches(self):
        for match in self.feature_matches:
            match.draw_matches(f"data/snow-man/output/feature-match/{match.image_one.img_id}_{match.image_two.img_id}.jpg")

    def __len__(self):
        return len(self.images)
    
    def display_similar_images(self, key):
        print(f"cluster {key}")
        print("-----------------------------------------------------")
        for value in self.similar_images[key]:
            print(value)
            rgb_image = OpenCV.cvtColor(OpenCV.imread(value.path), OpenCV.COLOR_BGR2RGB)
            plt.imshow(rgb_image)
            plt.title(value.path)
            plt.axis('off')
            plt.show()

    def save_similar_images(self):
        for cluster in self.similar_images.keys():
            if not os.path.exists(f"data/{self.image_set_name}/output/image-match/{cluster}"):
                os.makedirs(f"data/{self.image_set_name}/output/image-match/{cluster}")
            for value in self.similar_images[cluster]:
                OpenCV.imwrite(f"data/{self.image_set_name}/output/image-match/{cluster}/{value.img_id}.jpg", value.rgb_image)

    def __getitem__(self, key: int) -> Image:
        for image in self.images:
            if image.img_id == key:
                return image
        raise KeyError(f'Image with img_id {key} not found.')
    


### Pipeline Start

In [None]:
""" Step One: Read and Load Images
Inputs: 
- folder_path: str

Outputs:
- images: Images

Main Functions:
1. prepare_images: read and load images from a folder into an Images object

Utils Functions:
1. dump_images: dump images to a pickle file
2. load_images: load images from a pickle file
"""

@timeit
def prepare_images(**kwargs) -> Images:
    """ Read and load images """
    image_set_name = kwargs['image_set_name']
    folder_path = f"data/{image_set_name}/images"
    images: Images = Images([], folder_path.split("/")[-2])
    files: list[str] = filter(lambda file: ".jpg" in file, os.listdir(folder_path))
    for file in files:
        image_path = f"{folder_path}/{file}"
        rgb_image = OpenCV.cvtColor(OpenCV.imread(image_path), OpenCV.COLOR_BGR2RGB)
        gray_image = OpenCV.cvtColor(rgb_image, OpenCV.COLOR_RGB2GRAY)
        images.images.append(Image(file.split(".")[0], rgb_image, gray_image, [], [], image_path))
    return images

def dump_images_bak(images_file_path: str, images: Images) -> None:
    """ Dump images to a file """
    with open(images_file_path, "wb") as file:
        pickle.dump(images, file)

def load_images_bak(images_file_path: str) -> Images:
    """ Load images from a file """
    with open(images_file_path, "rb") as file:
        images = pickle.load(file)
    return images

In [None]:
"""Step Two: Feature Extraction
Inputs:
- images: Images
- SIFT: OpenCV.SIFT

Outputs:
- image: Image
--> image.keypoints: list[OpenCV.KeyPoint]
--> image.descriptors: np.ndarray

Main Functions:
1. compute_keypoints_descriptors
"""

@timeit
def compute_keypoints_descriptors(images: list[Image], SIFT: OpenCV.SIFT, **kwargs) -> None:
    """Compute keypoints and descriptors for each image in the list of images using SIFT algorithm.
    Modifies each image in the list of images by adding its keypoints and descriptors as attributes.
    
    Args:
    - images: List of images to compute keypoints and descriptors for.
    - SIFT: OpenCV SIFT object used to detect and compute keypoints and descriptors.

    Returns:
    - None.
    """
    image_set_name = kwargs['image_set_name']
    for img in images.images:
        keypoints: list[OpenCV.KeyPoint]
        descriptors: np.ndarray
        keypoints, descriptors = SIFT.detectAndCompute(img.gray_image, None)
        img.keypoints = keypoints
        img.descriptors = descriptors
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Img({img.img_id}, {img.path}) has {len(img.keypoints)} keypoints and {len(img.descriptors)} descriptors.")

In [None]:
import os
import numpy as np
from sklearn.cluster import KMeans
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing import image as keras_image

@timeit
def image_matching(images_obj: Images, **kwargs):
  def load_image(image_path, target_size=(224, 224)):
    img = keras_image.load_img(image_path, target_size=target_size)
    img = keras_image.img_to_array(img)
    img = np.expand_dims(img, axis=0)
    img = preprocess_input(img)
    return img
  
  image_set_name = kwargs['image_set_name']
  image_dir = f'data/{image_set_name}/images'
  image_files = os.listdir(image_dir)
  images = [load_image(os.path.join(image_dir, f)) for f in image_files]
  images = np.vstack(images)

  model = ResNet50(weights='imagenet', include_top=False, pooling='avg')
  features = model.predict(images)

  kmeans = KMeans(n_clusters=images_obj.num_clusters, random_state=42)
  clusters = kmeans.fit_predict(features)

  for i, cluster in enumerate(clusters):
      if cluster not in images_obj.similar_images:
        images_obj.similar_images[cluster] = []
      images_obj.similar_images[cluster].append(images_obj[int(image_files[i].split(".")[0])])

  images_obj.similar_images = {key: value for key, value in images_obj.similar_images.items() if len(value) > 1}


In [None]:
"""Step Four: Feature Matching
Inputs:
- images: Images

Outputs:
- None

Main Functions:
1. data_feature_matching

Utils Functions:
1. feature_matching
"""

@timeit
def feature_matching(
        img_one_descriptors: np.ndarray, 
        img_two_descriptors: np.ndarray,
        **kwargs
    ) -> list[OpenCV.DMatch]:
    """ Match features between two images using Brute Force Matcher
    Args:
        img_id_one: the index of the first image
        img_id_two: the index of the second image
        descriptors: a list of descriptors of the images
    Returns:
        A list of OpenCV.DMatch objects.
    """
    matcher = OpenCV.BFMatcher()
    return matcher.match(img_one_descriptors, img_two_descriptors)

@timeit
def apply_ransac(matches, keypoints1, keypoints2, threshold = 3.0, **kwargs):
    src_pts = np.float32([keypoints1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
    dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)

    _, mask = OpenCV.findHomography(src_pts, dst_pts, OpenCV.RANSAC, threshold)
    matches_mask = mask.ravel().tolist()
    return [m for m, keep in zip(matches, matches_mask) if keep]

@timeit
def data_feature_matching(images: Images, **kwargs) -> None:
    """ Match features between images using Brute Force Matcher
    Args:
        matchesIDs: a list of lists of tuples, where each tuple contains the index of a similar image and the cosine similarity 
            between the i-th image and the similar image. The list is sorted by the cosine similarity in 
            descending order.
        descriptors: a list of descriptors of the images
    Returns:
        A list of lists, where each list contains 
        the index of the first image, the index of the second image, 
        and a list of OpenCV.DMatch objects.
    """
    import itertools
    image_set_name = kwargs['image_set_name']
    for key, values in images.similar_images.items():
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Started Feature Match for cluster number {key}:")
        for image, matched_image in itertools.combinations(values, 2):
            feature_matching_output = feature_matching(image.descriptors, matched_image.descriptors, **kwargs)
            ransac_output = apply_ransac(feature_matching_output, image.keypoints, matched_image.keypoints, threshold=150, **kwargs)
            images.feature_matches.append(FeatureMatches(image, matched_image, ransac_output))
            log_to_file(f"data/{image_set_name}/logs/tune.log", f"({image.img_id}, {matched_image.img_id}) with {len(ransac_output)} / {len(feature_matching_output)}.")

In [None]:
"""Step Five: Camera Calibration
Inputs:
- None.

Outputs:
- k_matrix: np.ndarray

ToDo:
1- generate K_matrix.pickle for each camera using Chess board pattern.
"""

@timeit
def compute_k_matrix(img_path: str, **kwargs) -> np.ndarray:
    import exifread
    # Open the image file
    image = open(img_path, "rb")
    # Read the EXIF data
    exif = exifread.process_file(image)
    # Extract the intrinsic parameters
    focal_length = exif['EXIF FocalLength'].values[0]
    sensor_width = exif['EXIF ExifImageWidth'].values[0]
    sensor_height = exif['EXIF ExifImageLength'].values[0]
    principal_point_x = exif['EXIF ExifImageWidth'].values[0] / 2
    principal_point_y = exif['EXIF ExifImageLength'].values[0] / 2
    # distortion_coefficients = exif['EXIF MakerNote'].values[0]
    # Calculate the scaling factor for the K-matrix
    scaling_factor = 1.0
    return np.array(
        [
            [float(focal_length), 0, principal_point_x],
            [0, float(focal_length), principal_point_y],
            [0, 0, scaling_factor],
        ]
    )

In [None]:
"""Step Six: Triangulation (3D Reconstruction)
Inputs:
- feature_matches_list: list[list[int, int, list[OpenCV.DMatch]]]
    -> A list of lists, where each list contains 
        the index of the first image, the index of the second image, 
        and a list of OpenCV.DMatch objects.
- K_matrix: np.ndarray
    -> The camera matrix of the camera used to take the images.

Outputs:
- point_cloud: list[np.ndarray]; each element is a 3D point.

Main Functions:
1. generate_point_cloud

Utils Functions:
1. triangulatePoints
"""
@timeit
def generate_point_cloud(images: Images, K_matrix: np.ndarray, **kwargs) -> np.ndarray:
    point_cloud = []

    for feature_match in images.feature_matches:
        image_one = feature_match.image_one
        image_two = feature_match.image_two

        # Extract matched keypoints
        keypoints_one = np.array([image_one.keypoints[m.queryIdx].pt for m in feature_match.matches])
        keypoints_two = np.array([image_two.keypoints[m.trainIdx].pt for m in feature_match.matches])

        # Estimate the essential matrix
        E, mask = OpenCV.findEssentialMat(keypoints_one, keypoints_two, K_matrix, method=OpenCV.RANSAC, prob=0.999, threshold=1.0)
        _, R, t, _ = OpenCV.recoverPose(E, keypoints_one, keypoints_two, K_matrix)

        # Create projection matrices
        P1 = K_matrix @ np.hstack((np.eye(3), np.zeros((3, 1))))
        P2 = K_matrix @ np.hstack((R, t))

        # Triangulate points
        points_4D = OpenCV.triangulatePoints(P1, P2, keypoints_one.T, keypoints_two.T)
        points_3D = (points_4D / points_4D[3])[:3]

        point_cloud.append(points_3D)

    # Merge all point clouds into one
    point_cloud = np.hstack(point_cloud).T

    return point_cloud


### Pipeline End

In [None]:
import enum

class Mode(enum.Enum):
    OPTMIZED = "optimized"
    DEBUG = "debug"

In [None]:
# image_set_name = "rubik-cube"
image_set_name = "snow-man"
# image_set_name = "test"

log_to_file(f"data/{image_set_name}/logs/tune.log", "Welcome ScanMate...")
mode: enum = Mode.DEBUG
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Running image_set_name {image_set_name} in {mode} mode...")
images: Optional[Images] = None

# 0. Reload the last state
last_state: str
if os.path.isfile(f"data/{image_set_name}/bak/feature-matching-output.pkl"):
    last_state = "Feature Matching Step"
elif os.path.isfile(f"data/{image_set_name}/bak/images-matched.pkl"):
    last_state = "Images Matching Step"
elif os.path.isfile(f"data/{image_set_name}/bak/sift-features.pkl"):
    last_state = "SIFT Features Step"
else:
    last_state = "Images Loading Step"
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Last state for {image_set_name} is {last_state}")

In [None]:
# 1. Load and prepare Images
if last_state == "Images Loading Step":
    if os.path.isfile(f"data/{image_set_name}/bak/images.pkl"):
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/sift-images.pkl] exists")
        log_to_file(f"data/{image_set_name}/logs/tune.log", "Loading images from pickle file...")
        images: Images = load_images_bak(f"data/{image_set_name}/bak/images.pkl")
    else:
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/images.pkl] does not exist")
        log_to_file(f"data/{image_set_name}/logs/tune.log", "Loading images from images directory...")
        images: Images = prepare_images(image_set_name=image_set_name)
        log_to_file(f"data/{image_set_name}/logs/tune.log", "Saving images to pickle file...")
        dump_images_bak(f"data/{image_set_name}/bak/images.pkl", images)
    log_to_file(f"data/{image_set_name}/logs/tune.log", "Images loaded successfully")
    last_state = "SIFT Features Step"

In [None]:
# 2. Feature Extraction: SIFT
if last_state == "SIFT Features Step":
    if os.path.isfile(f"data/{image_set_name}/bak/sift-features.pkl"):
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/sift-features.pkl] exists")
        if images: 
            del images
        images: Images = load_images_bak(f"data/{image_set_name}/bak/sift-features.pkl")
    else:
        log_to_file(f"data/{image_set_name}/logs/tune.log", "File [data/{image_set_name}/bak/sift-features.pkl] DO NOT exists")
        log_to_file(f"data/{image_set_name}/logs/tune.log", "Extracting SIFT features...")
        sift = OpenCV.SIFT_create()
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images before: {sys.getrefcount(images)}")
        print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
        compute_keypoints_descriptors(images, sift, image_set_name=image_set_name)
        print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images after: {sys.getrefcount(images)}")
        dump_images_bak(f"data/{image_set_name}/bak/sift-features.pkl", images)
        # remove bak/{image_set_name}/images.pkl
        if mode == Mode.OPTMIZED:
            if os.path.exists(f"data/{image_set_name}/bak/images.pkl"):
                os.remove(f"data/{image_set_name}/bak/images.pkl")
                log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/images.pkl removed successfully.")
            else:
                log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/images.pkl does not exist.")
    log_to_file(f"data/{image_set_name}/logs/tune.log", "Feature Extraction: SIFT DONE...")
    last_state = "Images Matching Step"

In [None]:
# 3. Image Matching
if last_state == "Images Matching Step":
    if os.path.isfile(f"data/{image_set_name}/bak/images-matched.pkl"):
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/images-matched.pkl] exists")
        if images: 
            del images
        images: Images = load_images_bak(f"data/{image_set_name}/bak/images-matched.pkl")
    else:
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/images-matched.pkl] DO NOT exists")
        log_to_file(f"data/{image_set_name}/logs/tune.log", "Matching images...")
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images before: {sys.getrefcount(images)}")
        print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
        images.num_clusters = 50
        image_matching(images, image_set_name=image_set_name)
        log_to_file(f"data/{image_set_name}/logs/tune.log", "image matching done")
        images.save_similar_images()
        log_to_file(f"data/{image_set_name}/logs/tune.log", "saved image clusters")
        print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
        log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images after: {sys.getrefcount(images)}")
        dump_images_bak(f"data/{image_set_name}/bak/images-matched.pkl", images)
        # remove bak/{image_set_name}/sift-features.pkl
        if mode == Mode.OPTMIZED:
            if os.path.exists(f"data/{image_set_name}/bak/sift-features.pkl"):
                os.remove(f"data/{image_set_name}/bak/sift-features.pkl")
                log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/sift-features.pkl removed successfully.")
            else:
                log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/sift-features.pkl does not exist.")
    log_to_file(f"data/{image_set_name}/logs/tune.log", "Done Image Matching Step...")

In [None]:
# 4. Feature Matching
if os.path.isfile(f"data/{image_set_name}/bak/feature-matching-output.pkl"):
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File [data/{image_set_name}/bak/feature-matching-output.pkl] exists")
    if images: 
        del images
    images: Images = load_images_bak(f"data/{image_set_name}/bak/feature-matching-output.pkl")
else:
    log_to_file(f"data/{image_set_name}/logs/tune.log", "File [data/{image_set_name}/bak/feature-matching-output.pkl] Do NOT exists")
    log_to_file(f"data/{image_set_name}/logs/tune.log", "Matching features...")
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images before: {sys.getrefcount(images)}")
    print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
    data_feature_matching(images, image_set_name=image_set_name)
    log_to_file(f"data/{image_set_name}/logs/tune.log", "done feature matching")
    images.save_feature_matches()
    log_to_file(f"data/{image_set_name}/logs/tune.log", "saved feature matching")
    print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images after: {sys.getrefcount(images)}")
    dump_images_bak(f"data/{image_set_name}/bak/feature-matching-output.pkl", images)
    # remove bak/{image_set_name}/images-matched.pkl
    if mode == Mode.OPTMIZED:
        if os.path.exists(f"data/{image_set_name}/bak/images-matched.pkl"):
            os.remove(f"data/{image_set_name}/bak/images-matched.pkl")
            log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/images-matched.pkl removed successfully.")
        else:
            log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/images-matched.pkl does not exist.")
log_to_file(f"data/{image_set_name}/logs/tune.log", "Done Feature Matching Step...")

In [None]:
# 5. Camera Calibration
log_to_file(f"data/{image_set_name}/logs/tune.log", "Camera Calibration starts ....")
if not os.path.isfile(f"data/{image_set_name}/bak/K_matrix.pickle"):
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/K_matrix.pickle does not exist")
    K_matrix = compute_k_matrix(images.images[0].path, image_set_name=image_set_name)
    with open(f"data/{image_set_name}/bak/K_matrix.pickle", 'wb') as f:
        pickle.dump(K_matrix, f)
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/K_matrix.pickle saved successfully")
else:
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/K_matrix.pickle exists")
    with open(f"data/{image_set_name}/bak/K_matrix.pickle", 'rb') as f:
        K_matrix = pickle.load(f)

In [None]:
# 6. Triangulation
log_to_file(f"data/{image_set_name}/logs/tune.log", "Triangulation starts ....")
if os.path.isfile(f"data/{image_set_name}/bak/point-cloud.pkl"):
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/point-cloud.pkl exists")
    with open(f"data/{image_set_name}/bak/point-cloud.pkl", 'rb') as f:
        points_cloud: np.ndarray = pickle.load(f)
else:
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"File data/{image_set_name}/bak/point-cloud.pkl does not exist")
    log_to_file(f"data/{image_set_name}/logs/tune.log", "Triangulating...")
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images before: {sys.getrefcount(images)}")
    print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
    points_cloud: np.ndarray = generate_point_cloud(images, K_matrix, image_set_name=image_set_name)
    print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
    log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images after: {sys.getrefcount(images)}")
    # Pickle the point cloud
    with open(f"data/{image_set_name}/bak/point-cloud.pkl", 'wb') as f:
        pickle.dump(points_cloud, f)
log_to_file(f"data/{image_set_name}/logs/tune.log", "Done Point Cloud Step...")

In [None]:
print(f"points_cloud.shape: {points_cloud.shape}")
# np.savetxt(f"data/{image_set_name}/output/point-cloud.txt", points_cloud)

### Clean memory before Clustring

In [None]:
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Ref count of images before: {sys.getrefcount(images)}")
print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
print_size(f"data/{image_set_name}/logs/tune.log", points_cloud, "points_cloud")
images = None
log_to_file(f"data/{image_set_name}/logs/tune.log", gc.collect())
print_size(f"data/{image_set_name}/logs/tune.log", images, "images")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Reference count<images>: {sys.getrefcount(images)}")
log_to_file(f"data/{image_set_name}/logs/tune.log", gc.collect())

In [None]:
# 7. 3D reconstruction
log_to_file(f"data/{image_set_name}/logs/tune.log", "3D reconstruction starts [points_cloud] ....")
import open3d as o3d
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points_cloud[:,:3])

In [None]:
# Save it as a .PLY file
o3d.io.write_point_cloud(f"data/{image_set_name}/output/point_cloud_before_clustring.ply", pcd)
log_to_file(f"data/{image_set_name}/logs/tune.log", "File point_cloud_before_clustring.ply saved successfully...")

In [None]:
# o3d.visualization.draw_geometries([pcd])

In [None]:
log_to_file(f"data/{image_set_name}/logs/tune.log", "started clustring....")
import hdbscan

start_time = time.time()
hdbscan_model = hdbscan.HDBSCAN(min_cluster_size=10).fit(points_cloud)
end_time = time.time()
log_to_file(f"data/{image_set_name}/logs/tune.log", f"time taken: {end_time - start_time:,} seconds")

In [None]:
with open(f"data/{image_set_name}/bak/hdbscan_model.pkl", 'wb') as f:
    pickle.dump(hdbscan_model, f)
log_to_file(f"data/{image_set_name}/logs/tune.log", "File hdbscan_model.pkl saved successfully...")
print_size(f"data/{image_set_name}/logs/tune.log", hdbscan_model, "hdbscan_model")

In [None]:
# Get the cluster labels for each point
labels = hdbscan_model.labels_
log_to_file(f"data/{image_set_name}/logs/tune.log", "Labels Done...")

# Get the indices of the core points (i.e., points that are part of a dense region)
core_indices = np.where(labels != -1)[0]
log_to_file(f"data/{image_set_name}/logs/tune.log", "Core Indicies Done...")

# Get the coordinates of the core points
core_points = points_cloud[core_indices, :]
log_to_file(f"data/{image_set_name}/logs/tune.log", "Core Points Done...")

# Get the indices of the outlier points (i.e., points that are not part of any dense region)
outlier_indices = np.where(labels == -1)[0]
log_to_file(f"data/{image_set_name}/logs/tune.log", "Outlier Indicies Done...")

# Get the coordinates of the outlier points
outlier_points = points_cloud[outlier_indices, :]
log_to_file(f"data/{image_set_name}/logs/tune.log", "Outlier Points Done...")

# Log the number of clusters and the number of outlier points
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Number of clusters: {len(np.unique(labels))-1:,}")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Number of core points: {len(core_indices):,}")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Number of outlier points: {len(outlier_indices):,}")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Number of total points: {len(core_indices) + len(outlier_indices):,}")

In [None]:
with open(f"data/{image_set_name}/bak/core_points.pkl", 'wb') as f:
    pickle.dump(core_points, f)
log_to_file(f"data/{image_set_name}/logs/tune.log", "File core_points.pkl saved successfully...")
print_size(f"data/{image_set_name}/logs/tune.log", core_points, "core_points")

# Furthur analytics on the output

In [None]:
from collections import Counter

log_to_file(f"data/{image_set_name}/logs/tune.log", "Analysis of X, Y, Z of Points cloud")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"X<{len(points_cloud[:,0]):,}>: {points_cloud[:,0].min():,} to {points_cloud[:,0].max():,}")
x_counter = Counter(points_cloud[:,0])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(x_counter):,} unique X values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common X: {x_counter.most_common(1)}, Least Two Common X: {x_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", x_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Y<{len(points_cloud[:,1]):,}>: {points_cloud[:,1].min():,} to {points_cloud[:,1].max():,}")
y_counter = Counter(points_cloud[:,1])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(y_counter):,} unique Y values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Y: {y_counter.most_common(1)}, Least Two Common Y: {y_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", y_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Z<{len(points_cloud[:,2]):,}>: {points_cloud[:,2].min():,} to {points_cloud[:,2].max():,}")
z_counter = Counter(points_cloud[:,2])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(z_counter):,} unique Z values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Z: {z_counter.most_common(1)}, Least Two Common Y: {z_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", z_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

In [None]:
log_to_file(f"data/{image_set_name}/logs/tune.log", "Analysis of X, Y, Z of Core Points")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"X<{len(core_points[:,0]):,}>: {core_points[:,0].min():,} to {core_points[:,0].max():,}")
x_counter = Counter(core_points[:,0])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(x_counter):,} unique X values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common X: {x_counter.most_common(1)}, Least Two Common X: {x_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", x_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Y<{len(core_points[:,1]):,}>: {core_points[:,1].min():,} to {core_points[:,1].max():,}")
y_counter = Counter(core_points[:,1])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(y_counter):,} unique Y values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Y: {y_counter.most_common(1)}, Least Two Common Y: {y_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", y_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Z<{len(core_points[:,2]):,}>: {core_points[:,2].min():,} to {core_points[:,2].max():,}")
z_counter = Counter(core_points[:,2])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(z_counter):,} unique Z values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Z: {z_counter.most_common(1)}, Least Two Common Y: {z_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", z_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

In [None]:
log_to_file(f"data/{image_set_name}/logs/tune.log", "Analysis of X, Y, Z of Outliers Points")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"X<{len(outlier_points[:,0]):,}>: {outlier_points[:,0].min():,} to {outlier_points[:,0].max():,}")
x_counter = Counter(outlier_points[:,0])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(x_counter):,} unique X values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common X: {x_counter.most_common(1)}, Least Two Common X: {x_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", x_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Y<{len(outlier_points[:,1]):,}>: {outlier_points[:,1].min():,} to {outlier_points[:,1].max():,}")
y_counter = Counter(outlier_points[:,1])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(y_counter):,} unique Y values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Y: {y_counter.most_common(1)}, Least Two Common Y: {y_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", y_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

log_to_file(f"data/{image_set_name}/logs/tune.log", f"Z<{len(outlier_points[:,2]):,}>: {outlier_points[:,2].min():,} to {outlier_points[:,2].max():,}")
z_counter = Counter(outlier_points[:,2])
log_to_file(f"data/{image_set_name}/logs/tune.log", f"We have {len(z_counter):,} unique Z values")
log_to_file(f"data/{image_set_name}/logs/tune.log", f"Most Common Z: {z_counter.most_common(1)}, Least Two Common Y: {z_counter.most_common()[:-3:-1]}")
# log_to_file(f"data/{image_set_name}/logs/tune.log", z_counter)
log_to_file(f"data/{image_set_name}/logs/tune.log", "-----------------------------------------------------")

In [None]:
# Clear memort of pcd
pcd = None

log_to_file(f"data/{image_set_name}/logs/tune.log", "3D reconstruction starts [core_points] ....")
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(core_points[:,:3])
# Save it as a .PLY file
o3d.io.write_point_cloud(f"data/{image_set_name}/output/point_cloud_after_clustring.ply", pcd)
log_to_file(f"data/{image_set_name}/logs/tune.log", "File point_cloud_after_clustring.ply saved successfully...")

In [None]:
import open3d as o3d

# Define the path to the PLY file
file_path = f"data/{image_set_name}/output/point_cloud_after_clustring.ply"

# Load the point cloud from the PLY file
point_cloud = o3d.io.read_point_cloud(file_path)

# Visualize the point cloud
o3d.visualization.draw_geometries([point_cloud])

In [None]:
# DEBUG
# Estimate the normals for the point cloud
pcd.estimate_normals()

# Apply the Ball-Pivoting Algorithm to create a mesh
radii = [0.005, 0.01, 0.02, 0.04]
bpa_mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting(
    pcd, o3d.utility.DoubleVector(radii)
)

# Save the mesh as an STL file
o3d.io.write_triangle_mesh("snow_man_point_cloud.stl", bpa_mesh)