In [None]:
import os
import numpy as np
import itertools
import math, random
random.seed = 44

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

from pathlib import Path
import scipy.spatial.distance
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

#3DGeo

In [None]:
!pip install open3d

In [None]:
import numpy as np
import scipy.spatial  # type: ignore


def vector_equal(v1, v2):
    return v1.shape == v2.shape and np.allclose(
        v1, v2, rtol=1e-12, atol=1e-12, equal_nan=False
    )


def distance_point_point(p1, p2):
    """Calculates the euclidian distance between two points or sets of points
    >>> distance_point_point(np.array([1, 0]), np.array([0, 1]))
    1.4142135623730951
    >>> distance_point_point(np.array([[1, 1], [0, 0]]), np.array([0, 1]))
    array([1., 1.])
    >>> distance_point_point(np.array([[1, 0], [0, 0]]), np.array([[0, 0], [0, -3]]))
    array([1., 3.])
    """
    return scipy.spatial.minkowski_distance(p1, p2)


def distance_plane_point(plane_point, plane_normal, point):
    """Calculates the signed distance from a plane to one or more points
    >>> distance_plane_point(np.array([0, 0, 1]), np.array([0, 0, 1]), np.array([2, 2, 2]))
    1
    >>> distance_plane_point(np.array([0, 0, 1]), np.array([0, 0, 1]), np.array([[2, 2, 2], [2, 2, 3]]))
    array([1, 2])
    """
    assert np.allclose(
        np.linalg.norm(plane_normal), 1.0, rtol=1e-12, atol=1e-12, equal_nan=False
    )
    return np.dot(point - plane_point, plane_normal)


def distance_line_point(line_point, line_direction, point):
    """Calculates the distance from a line to a point
    >>> distance_line_point(np.array([0, 0, 1]), np.array([0, 0, 1]), np.array([1, 1, 2]))
    1.4142135623730951
    >>> distance_line_point(np.array([0, 0, 1]), np.array([0, 0, 1]), np.array([[1, 0, 1], [0, 2, 3]]))
    array([1., 2.])
    """
    assert np.allclose(
        np.linalg.norm(line_direction), 1.0, rtol=1e-12, atol=1e-12, equal_nan=False
    )
    delta_p = point - line_point
    return distance_point_point(
        delta_p,
        np.matmul(
            np.expand_dims(np.dot(delta_p, line_direction), axis=-1),
            np.atleast_2d(line_direction),
        ),
    )


In [None]:
from weakref import WeakKeyDictionary

import numpy as np

DTYPE = np.float64


class Position:
    def __init__(self, dim: int):
        self.dim = dim
        self._instance_data: WeakKeyDictionary[str, np.ndarray] = WeakKeyDictionary()

    def __get__(self, instance, owner):
        try:
            view = self._instance_data[instance].view()
        except KeyError as e:
            raise AttributeError() from e
        view.flags.writeable = False
        return view

    def __set__(self, instance, value):
        value = np.array(value, dtype=DTYPE, copy=True)  # TODO copy?
        if value.shape != (self.dim,):
            raise ValueError("Could not construct a 3D point")
        self._instance_data[instance] = value


class Direction:
    def __init__(self, dim: int):
        self.dim = dim
        self._instance_data: WeakKeyDictionary[str, np.ndarray] = WeakKeyDictionary()

    def __get__(self, instance, owner):
        try:
            view = self._instance_data[instance].view()
        except KeyError as e:
            raise AttributeError() from e
        view.flags.writeable = False
        return view

    def __set__(self, instance, value):
        value = np.array(value, dtype=DTYPE, copy=True)
        value /= np.linalg.norm(value)
        if value.shape != (self.dim,):
            raise ValueError("Could not construct a 3D point")
        self._instance_data[instance] = value


class PositiveNumber:
    def __init__(self):
        self._instance_data = WeakKeyDictionary()

    def __get__(self, instance, owner):
        try:
            return self._instance_data[instance]
        except KeyError as e:
            raise AttributeError() from e

    def __set__(self, instance, value):
        value = DTYPE(value)
        if value < 0:
            raise ValueError(
                "{} must be initialized with a positive number".format(
                    self.__class__.__name__
                )
            )
        self._instance_data[instance] = value


In [None]:
from abc import ABC, abstractmethod

import numpy as np

#from ._descriptor import Direction, Position, PositiveNumber
#from ._util import distance_line_point, distance_plane_point, distance_point_point


class GeometricShape(ABC):
    @abstractmethod
    def distance_to_point(self, point):
        """Calculates the smallest distance from a point to the shape"""

    # def project_point(self, point):
    # pass


class Line(GeometricShape):
    anchor_point = Position(3)
    direction = Direction(3)

    def __init__(self, anchor_point, direction):
        self.anchor_point = anchor_point
        self.direction = direction

    def __repr__(self):
        return f"Line(anchor_point={self.anchor_point.tolist()}, direction={self.direction.tolist()})"

    def distance_to_point(self, point):
        return distance_line_point(self.anchor_point, self.direction, point)


class Plane(GeometricShape):
    anchor_point = Position(3)
    normal = Direction(3)

    def __init__(self, anchor_point, normal):
        self.anchor_point = anchor_point
        self.normal = normal

    def __repr__(self):
        return f"Plane(anchor_point={self.anchor_point.tolist()}, normal={self.normal.tolist()})"

    def distance_to_point(self, point):
        return np.abs(distance_plane_point(self.anchor_point, self.normal, point))


class Sphere(GeometricShape):
    center = Position(3)
    radius = PositiveNumber()

    def __init__(self, center, radius):
        self.center = center
        self.radius = radius

    def __repr__(self):
        return f"Sphere(center={self.center.tolist()}, radius={self.radius})"

    def distance_to_point(self, point):
        return np.abs(distance_point_point(point, self.center) - self.radius)


class Cylinder(Line):
    radius = PositiveNumber()

    def __init__(self, anchor_point, direction, radius):
        super().__init__(anchor_point, direction)
        self.radius = radius

    def __repr__(self):
        return f"Cylinder(anchor_point={self.anchor_point.tolist()}, direction={self.direction.tolist()}, radius={self.radius})"

    def distance_to_point(self, point):
        return np.abs(super().distance_to_point(point) - self.radius)


class Circle3D(GeometricShape):
    center = Position(3)
    direction = Direction(3)
    radius = PositiveNumber()

    def __init__(self, center, direction, radius):
        self.center = center
        self.direction = direction
        self.radius = radius

    def __repr__(self):
        return f"Circle3D(center={self.center.tolist()}, direction={self.direction.tolist()}, radius={self.radius})"

    def distance_to_point(self, point):
        delta_p = point - self.center
        x1 = np.matmul(
            np.expand_dims(np.dot(delta_p, self.direction), axis=-1),
            np.atleast_2d(self.direction),
        )
        x2 = delta_p - x1
        return np.sqrt(
            np.linalg.norm(x1, axis=-1) ** 2
            + (np.linalg.norm(x2, axis=-1) - self.radius) ** 2
        )


class Torus(Circle3D):
    minor_radius = PositiveNumber()

    def __init__(self, center, direction, major_radius, minor_radius):
        super().__init__(center, direction, major_radius)
        self.minor_radius = minor_radius

    def __repr__(self):
        return f"Torus(center={self.center.tolist()}, direction={self.direction.tolist()}, major_radius={self.major_radius}, minor_radius={self.minor_radius})"

    @property
    def major_radius(self):
        return self.radius

    def distance_to_point(self, point):
        return np.abs(super().distance_to_point(point) - self.minor_radius)


In [None]:
import functools
import numbers
from typing import List, Union

import numpy as np
import open3d as o3d  # type: ignore


def vec2vec_rotation(unit_vec_1, unit_vec_2):
    angle = np.arccos(np.dot(unit_vec_1, unit_vec_2))
    if angle < 1e-8:
        return np.identity(3, dtype=np.float64)

    if angle > (np.pi - 1e-8):
        # WARNING this only works because all geometries are rotationaly invariant
        # minus identity is not a proper rotation matrix
        return -np.identity(3, dtype=np.float64)

    rot_vec = np.cross(unit_vec_1, unit_vec_2)
    rot_vec /= np.linalg.norm(rot_vec)

    return o3d.geometry.get_rotation_matrix_from_axis_angle(angle * rot_vec)


@functools.singledispatch
def to_open3d_geom(geom):
    return geom


@to_open3d_geom.register  # type: ignore[no-redef]
def _(points: np.ndarray):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    return pcd


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Line, length: numbers.Number = 1):
    points = (
        geom.anchor_point
        + np.stack([geom.direction, -geom.direction], axis=0) * length / 2
    )

    line_set = o3d.geometry.LineSet(
        points=o3d.utility.Vector3dVector(points),
        lines=o3d.utility.Vector2iVector([[0, 1]]),
    )
    return line_set


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Sphere):
    mesh = o3d.geometry.TriangleMesh.create_sphere(radius=geom.radius)
    mesh.translate(geom.center)

    return o3d.geometry.LineSet.create_from_triangle_mesh(mesh)


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Plane, length: numbers.Number = 1):
    points = np.array([[1, 1, 0], [-1, 1, 0], [1, -1, 0], [-1, -1, 0]]) * length / 2

    mesh = o3d.geometry.TetraMesh()
    mesh.vertices = o3d.utility.Vector3dVector(points)
    mesh.tetras = o3d.utility.Vector4iVector(np.array([[0, 1, 2, 3]]))

    rotation = vec2vec_rotation([0, 0, 1], geom.normal)
    mesh.rotate(rotation)
    mesh.translate(geom.anchor_point)

    return o3d.geometry.LineSet.create_from_tetra_mesh(mesh)


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Cylinder, length: numbers.Number = 1):
    mesh = o3d.geometry.TriangleMesh.create_cylinder(radius=geom.radius, height=length)

    mesh.remove_vertices_by_index([0, 1])

    rotation = vec2vec_rotation([0, 0, 1], geom.direction)
    mesh.rotate(rotation)
    mesh.translate(geom.anchor_point)

    return o3d.geometry.LineSet.create_from_triangle_mesh(mesh)


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Circle3D):
    mesh = o3d.geometry.TriangleMesh.create_torus(
        torus_radius=geom.radius, tube_radius=1e-6
    )
    rotation = vec2vec_rotation([0, 0, 1], geom.direction)
    mesh.rotate(rotation)
    mesh.translate(geom.center)

    return o3d.geometry.LineSet.create_from_triangle_mesh(mesh)


@to_open3d_geom.register  # type: ignore[no-redef]
def _(geom: Torus):
    mesh = o3d.geometry.TriangleMesh.create_torus(
        torus_radius=geom.major_radius, tube_radius=geom.minor_radius
    )
    rotation = vec2vec_rotation([0, 0, 1], geom.direction)
    mesh.rotate(rotation)
    mesh.translate(geom.center)

    return o3d.geometry.LineSet.create_from_triangle_mesh(mesh)


def plot(
    geometries_or_points: List[Union[GeometricShape, np.ndarray]],
    display_coordinate_frame: bool = False,
):
    geometries = [to_open3d_geom(g) for g in geometries_or_points]
    if display_coordinate_frame:
        geometries.append(o3d.geometry.TriangleMesh.create_coordinate_frame())
    o3d.visualization.draw_geometries(geometries)


In [None]:
import numpy as np
import numpy.linalg as la
import open3d as o3d
import matplotlib.pyplot as plt
from scipy import optimize  # type: ignore


def centroid_fit(points, weights=None):
    """Calculates the weighted average of a set of points
    This minimizes the sum of the squared distances between the points
    and the centroid.
    """
    if points.ndim == 1:
        return points
    return np.average(points, axis=0, weights=weights)


def _check_input(points, weights) -> None:
    """Check the input data of the fit functionality"""
    points = np.asarray(points)
    if weights is not None:
        weights = np.asarray(weights)

    if points.ndim != 2 or points.shape[1] != 3:
        raise ValueError(
            f"Input data has the wrong shape, expects points to be of shape ('n', 3), got {points.shape}"
        )
    if weights is not None and (weights.ndim != 1 or len(weights) != len(points)):
        raise ValueError(
            "Shape of weights does not match points, weights should be a 1 dimensional array of len(points)"
        )


def line_fit(points, weights=None) -> Line:
    """Fits a line through a set points"""
    _check_input(points, weights)
    centroid = centroid_fit(points, weights)
    weights = 1.0 if weights is None else weights
    centered_points = points - centroid
    u, s, v = np.linalg.svd(
        np.matmul(weights * centered_points.transpose(), centered_points)
    )
    return Line(anchor_point=centroid, direction=v[0])


def plane_fit(points, weights=None) -> Plane:
    """Fits a plane through a set of points"""
    _check_input(points, weights)
    centroid = centroid_fit(points, weights)
    weights = 1.0 if weights is None else weights
    centered_points = points - centroid
    u, s, v = np.linalg.svd(
        np.matmul(weights * centered_points.transpose(), centered_points)
    )
    return Plane(anchor_point=centroid, normal=v[2])


# TODO add weights
def fast_sphere_fit(points) -> Sphere:
    """A fast algebraic circle fit, that uses a modified error function that
    is more sensitive to outliers
    """
    _check_input(points, None)
    A = np.append(points * 2, np.ones((points.shape[0], 1)), axis=1)
    f = np.sum(points ** 2, axis=1)
    C, _, _, _ = np.linalg.lstsq(A, f, rcond=None)
    center = C[0:3]
    radius = np.average(distance_point_point(points, center))
    return Sphere(center=center, radius=radius)


def sphere_fit(
    points, weights=None, initial_guess: Sphere = None
) -> Sphere:
    """Fits a circle through a set of points"""
    _check_input(points, weights)
    initial_guess = initial_guess or fast_sphere_fit(points)

    def sphere_fit_residuals(center, points, weights):
        distances = distance_point_point(center, points)
        radius = np.average(distances, weights=weights)
        if weights is None:
            return distances - radius
        return (distances - radius) * np.sqrt(weights)

    results = optimize.least_squares(
        sphere_fit_residuals, x0=initial_guess.center, args=(points, weights)
    )
    if not results.success:
        raise RuntimeError(results.message)

    radius = np.average(distance_point_point(points, results.x), weights=weights)
    return Sphere(center=results.x, radius=radius)


def cylinder_fit(points, weights=None, initial_guess: Cylinder = None):
    """Fits a cylinder trough a set of points"""
    _check_input(points, weights)
    if initial_guess is None:
        raise NotImplementedError(
            "Cylinder fit currently does support running without an intial guess."
        )

    def cylinder_fit_residuals(anchor_direction, points, weights):
        line = Line(anchor_direction[:3], anchor_direction[3:])
        distances = line.distance_to_point(points)
        radius = np.average(distances, weights=weights)
        if weights is None:
            return distances - radius
        return (distances - radius) * np.sqrt(weights)

    x0 = np.concatenate([initial_guess.anchor_point, initial_guess.direction])
    results = optimize.least_squares(
        cylinder_fit_residuals, x0=x0, args=(points, weights), ftol=1e-10
    )
    if not results.success:
        return RuntimeError(results.message)

    line = Line(results.x[:3], results.x[3:])
    distances = line.distance_to_point(points)
    radius = np.average(distances, weights=weights)
    return Cylinder(results.x[:3], results.x[3:], radius)


def circle3D_fit(points, weights=None, initial_guess: Circle3D = None):
    """Fits a circle in three dimensions trough a set of points"""
    _check_input(points, weights)
    if initial_guess is None:
        raise NotImplementedError(
            "Circle3D fit currently does support running without an intial guess."
        )

    def circle_fit_residuals(circle_params, points, sqrt_w):
        circle = Circle3D(
            circle_params[:3], circle_params[3:], la.norm(circle_params[3:])
        )
        distances = circle.distance_to_point(points)
        return distances * sqrt_w

    x0 = np.concatenate(
        [initial_guess.center, initial_guess.direction * initial_guess.radius]
    )
    results = optimize.least_squares(
        circle_fit_residuals,
        jac="2-point",
        method="lm",
        x0=x0,
        args=(points, 1.0 if weights is None else np.sqrt(weights)),
    )

    results = optimize.minimize(
        lambda *args: np.sum(circle_fit_residuals(*args) ** 2),
        x0=results.x,
        args=(points, 1.0 if weights is None else np.sqrt(weights)),
    )

    if not results.success:
        return RuntimeError(results.message)

    return Circle3D(results.x[:3], results.x[3:], la.norm(results.x[3:]))


def torus_fit(points, weights=None, initial_guess: Torus = None) -> Torus:
    """Fits a torus trough a set of points"""
    _check_input(points, weights)
    if initial_guess is None:
        raise NotImplementedError(
            "Toru fit currently does support running without an intial guess."
        )

    def torus_fit_residuals(circle_params, points, weights):
        circle = Circle3D(
            circle_params[:3], circle_params[3:], la.norm(circle_params[3:])
        )
        distances = circle.distance_to_point(points)
        radius = np.average(distances, weights=weights)
        weights = np.sqrt(weights) if weights is not None else 1.0
        return (distances - radius) * weights

    x0 = np.concatenate(
        [initial_guess.center, initial_guess.direction * initial_guess.major_radius]
    )

    results = optimize.least_squares(
        torus_fit_residuals,
        x0=x0,
        args=(points, weights),
    )

    if not results.success:
        return Torus([-1000,-1000,-1000], [-1000,-1000,-1000], 0, 0)
        raise RuntimeError(results.message)

    circle3D = Circle3D(results.x[:3], results.x[3:], la.norm(results.x[3:]))
    distances = circle3D.distance_to_point(points)
    minor_radius = np.average(distances, weights=weights)
    return Torus(
        results.x[:3], results.x[3:], la.norm(results.x[3:]), minor_radius
    )

#PointNet

In [None]:
class PointSampler(object):
    def __init__(self, output_size):
        assert isinstance(output_size, int)
        self.output_size = output_size
    
    def triangle_area(self, pt1, pt2, pt3):
        side_a = np.linalg.norm(pt1 - pt2)
        side_b = np.linalg.norm(pt2 - pt3)
        side_c = np.linalg.norm(pt3 - pt1)
        s = 0.5 * ( side_a + side_b + side_c)
        return max(s * (s - side_a) * (s - side_b) * (s - side_c), 0)**0.5

    def sample_point(self, pt1, pt2, pt3):
        # barycentric coordinates on a triangle
        # https://mathworld.wolfram.com/BarycentricCoordinates.html
        s, t = sorted([random.random(), random.random()])
        f = lambda i: s * pt1[i] + (t-s)*pt2[i] + (1-t)*pt3[i]
        return (f(0), f(1), f(2))
    
    def __call__(self, mesh):
        verts, faces = mesh
        verts = np.array(verts)
        areas = np.zeros((len(faces)))

        for i in range(len(areas)):
            areas[i] = (self.triangle_area(verts[faces[i][0]],
                                           verts[faces[i][1]],
                                           verts[faces[i][2]]))
            
        sampled_faces = (random.choices(faces, 
                                      weights=areas,
                                      cum_weights=None,
                                      k=self.output_size))
        
        sampled_points = np.zeros((self.output_size, 3))

        for i in range(len(sampled_faces)):
          sampled_points[i] = (self.sample_point(verts[sampled_faces[i][0]],
                                                   verts[sampled_faces[i][1]],
                                                   verts[sampled_faces[i][2]]))
        
        return sampled_points

In [None]:
class Normalize(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2
        
        norm_pointcloud = pointcloud - np.mean(pointcloud, axis=0) 
        norm_pointcloud /= np.max(np.linalg.norm(norm_pointcloud, axis=1))

        return  norm_pointcloud

In [None]:
class RandRotation_z(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        theta = random.random() * 2. * math.pi
        rot_matrix = np.array([[ math.cos(theta), -math.sin(theta),    0],
                               [ math.sin(theta),  math.cos(theta),    0],
                               [0,                             0,      1]])
        
        rot_pointcloud = rot_matrix.dot(pointcloud.T).T
        return  rot_pointcloud
    
class RandomNoise(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        noise = np.random.normal(0, 0.02, (pointcloud.shape))
    
        noisy_pointcloud = pointcloud + noise
        return  noisy_pointcloud

In [None]:
class ToTensor(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        return torch.from_numpy(pointcloud)

In [None]:
def default_transforms():
    return transforms.Compose([
                                Normalize(),
                                ToTensor()
                              ])

In [None]:
train_transforms = transforms.Compose([
                    Normalize(),
                    RandRotation_z(),
                    RandomNoise(),
                    ToTensor()
                    ])

In [None]:
class PointCloudData(Dataset):
    def __init__(self, root_dir, valid=False, folder="train", transform=default_transforms()):
        self.root_dir = root_dir
        folders = [dir for dir in sorted(os.listdir(root_dir)) if os.path.isdir(root_dir + str(dir))]
        self.classes = {folder: i for i, folder in enumerate(folders)}
        self.transforms = transform if not valid else default_transforms()
        self.valid = valid
        self.files = []
        for category in self.classes.keys():
            new_dir = str(root_dir) + str(category) 
            for file in os.listdir(new_dir):
                sample = {}
                sample['pcd_path'] = new_dir + "/" + file
                sample['category'] = category
                sample['filename'] = file
                self.files.append(sample)
    def __len__(self):
        return len(self.files)

    def __preproc__(self, file):
        verts = read_pc(file)
        pointcloud = np.array(verts)
        if self.transforms:
            pointcloud = self.transforms(pointcloud)
        return pointcloud

    def __getitem__(self, idx):
        pcd_path = self.files[idx]['pcd_path']
        category = self.files[idx]['category']
        filename = self.files[idx]['filename']
        with open(pcd_path, 'r') as f:
            pointcloud = self.__preproc__(f)
        return {'pointcloud': pointcloud, 
                'category': self.classes[category],
                'filename' : filename}

In [None]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

class Tnet(nn.Module):
    def __init__(self, k=3):
        super().__init__()
        self.k=k
        self.conv1 = nn.Conv1d(k,64,1)
        self.conv2 = nn.Conv1d(64,128,1)
        self.conv3 = nn.Conv1d(128,1024,1)
        self.fc1 = nn.Linear(1024,512)
        self.fc2 = nn.Linear(512,256)
        self.fc3 = nn.Linear(256,k*k)

        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)
    
    def forward(self, input):
        # input.shape == (bs,n,3)
        bs = input.size(0)
        xb = F.relu(self.bn1(self.conv1(input)))
        xb = F.relu(self.bn2(self.conv2(xb)))
        xb = F.relu(self.bn3(self.conv3(xb)))
        pool = nn.MaxPool1d(xb.size(-1))(xb)
        flat = nn.Flatten(1)(pool)
        xb = F.relu(self.bn4(self.fc1(flat)))
        xb = F.relu(self.bn5(self.fc2(xb)))

        #initialize as identity
        init = torch.eye(self.k, requires_grad=True).repeat(bs,1,1)
        if xb.is_cuda:
            init=init.cuda()
        matrix = self.fc3(xb).view(-1,self.k,self.k) + init
        return matrix

class Transform(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_transform = Tnet(k=3)
        self.feature_transform = Tnet(k=64)
        self.conv1 = nn.Conv1d(3,64,1)

        self.conv2 = nn.Conv1d(64,128,1)
        self.conv3 = nn.Conv1d(128,1024,1)


        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)

    def forward(self, input):
        matrix3x3 = self.input_transform(input)
        # batch matrix multiplication
        xb = torch.bmm(torch.transpose(input,1,2), matrix3x3).transpose(1,2)

        xb = F.relu(self.bn1(self.conv1(xb)))

        matrix64x64 = self.feature_transform(xb)
        xb = torch.bmm(torch.transpose(xb,1,2), matrix64x64).transpose(1,2)

        xb = F.relu(self.bn2(self.conv2(xb)))
        xb = self.bn3(self.conv3(xb))
        xb = nn.MaxPool1d(xb.size(-1))(xb)
        output = nn.Flatten(1)(xb)
        return output, matrix3x3, matrix64x64
        
class PointNet(nn.Module):
    def __init__(self, classes = 10):
        super().__init__()
        self.transform = Transform()
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, classes)
        

        self.bn1 = nn.BatchNorm1d(512)
        self.bn2 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(p=0.3)
        self.logsoftmax = nn.LogSoftmax(dim=1)

    def forward(self, input):
        xb, matrix3x3, matrix64x64 = self.transform(input)
        xb = F.relu(self.bn1(self.fc1(xb)))
        xb = F.relu(self.bn2(self.dropout(self.fc2(xb))))
        output = self.fc3(xb)
        return self.logsoftmax(output), matrix3x3, matrix64x64

In [None]:
def pointnetloss(outputs, labels, m3x3, m64x64, alpha = 0.0001):
    criterion = torch.nn.NLLLoss()
    bs=outputs.size(0)
    id3x3 = torch.eye(3, requires_grad=True).repeat(bs,1,1)
    id64x64 = torch.eye(64, requires_grad=True).repeat(bs,1,1)
    if outputs.is_cuda:
        id3x3=id3x3.cuda()
        id64x64=id64x64.cuda()
    diff3x3 = id3x3-torch.bmm(m3x3,m3x3.transpose(1,2))
    diff64x64 = id64x64-torch.bmm(m64x64,m64x64.transpose(1,2))
    return criterion(outputs, labels) + alpha * (torch.norm(diff3x3)+torch.norm(diff64x64)) / float(bs)

#Plane

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import math

def Planefitter(path):
  xs = []
  ys = []
  zs = []

  with open(path, 'r') as f:
    points = read_pc(f)
  for point in points:
    xs.append(point[0])
    ys.append(point[1])
    zs.append(point[2])

  plt.figure()
  ax = plt.subplot(111, projection='3d')
  ax.scatter(xs, ys, zs, color='b')

  tmp_A = []
  tmp_b = []
  for i in range(len(xs)):
      tmp_A.append([xs[i], ys[i], 1])
      tmp_b.append(zs[i])
  b = np.matrix(tmp_b).T
  A = np.matrix(tmp_A)
  fit = (A.T * A).I * A.T * b
  errors = b - A * fit
  residual = np.linalg.norm(errors)

  print("solution:")
  print("%f x + %f y + %f = z" % (fit[0], fit[1], fit[2]))

  xlim = ax.get_xlim()
  ylim = ax.get_ylim()
  X,Y = np.meshgrid(np.arange(xlim[0], xlim[1]),
                    np.arange(ylim[0], ylim[1]))
  Z = np.zeros(X.shape)
  for r in range(X.shape[0]):
      for c in range(X.shape[1]):
          Z[r,c] = fit[0] * X[r,c] + fit[1] * Y[r,c] + fit[2]
  ax.plot_wireframe(X,Y,Z, color='k')

  ax.set_xlabel('x')
  ax.set_ylabel('y')
  ax.set_zlabel('z')
  plt.show()

  const = fit[2]

  vector = []
  fit[2] = -1
  module = math.sqrt(fit[0]**2 + fit[1]**2 + 1)
  fit = fit/ module

  x = random.uniform(-50.55, 50.55)
  y = random.uniform(-50.55, 50.55)
  z = fit[0]*x + fit[1]*y + const

  return fit, x, y, z #vector phap tuyen, x, y, z

#Cylinder

In [None]:
def Cylinderfitter(path):
  with open(path, 'r') as f:
    verts = read_pc(f)
  pointcloud = np.array(verts)
  initial_guess = Cylinder([0, 0, 0], [0, 0, 1], 1)
  cylinder = cylinder_fit(pointcloud, initial_guess=initial_guess)
  return cylinder

#Shpere

In [None]:
import os
import cv2
import matplotlib.pyplot as plt
from PIL import Image

import numpy as np
import math
#	fit a sphere to X,Y, and Z data points
#	returns the radius and center points of
#	the best fit sphere
def Spherefitter(path):
    correctX=[]
    correctY=[]
    correctZ=[]
    with open(path, "r") as f:
      text=f.readlines()
      for i in text:
        s=i[:-2]
        s=s.split(',')
        correctX.append(float(s[0]))
        correctY.append(float(s[1]))
        correctZ.append(float(s[2]))

    spX = np.array(correctX)
    spY = np.array(correctY)
    spZ = np.array(correctZ)
    A = np.zeros((len(spX),4))
    A[:,0] = spX*2
    A[:,1] = spY*2
    A[:,2] = spZ*2
    A[:,3] = 1

    #   Assemble the f matrix
    f = np.zeros((len(spX),1))
    f[:,0] = (spX*spX) + (spY*spY) + (spZ*spZ)
    C, residules, rank, singval = np.linalg.lstsq(A,f)

    #   solve for the radius
    t = (C[0]*C[0])+(C[1]*C[1])+(C[2]*C[2])+C[3]
    radius = math.sqrt(t)

    return radius, C[0], C[1], C[2]

#Cone

In [None]:
import random

def plane_intersect(a, b):
    """
    a, b   4-tuples/lists
           Ax + By +Cz + D = 0
           A,B,C,D in order  

    output: 2 points on line of intersection, np.arrays, shape (3,)
    """
    a_vec, b_vec = np.array(a[:3]), np.array(b[:3])

    aXb_vec = np.cross(a_vec, b_vec)

    A = np.array([a_vec, b_vec, aXb_vec])
    d = np.array([-a[3], -b[3], 0.]).reshape(3,1)

# could add np.linalg.det(A) == 0 test to prevent linalg.solve throwing error

    p_inter = np.linalg.solve(A, d).T

    return p_inter[0], (p_inter + aXb_vec)[0]

def distance(x1, y1, z1, x2, y2, z2):
  d = 0.0
  d =(x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2
  return d

def cmp(t):
  return t[0]

def find_Denta(point):
  point.sort(key=lambda t: t[0])
  d=abs(point[-1][0]-point[0][0])
  point.sort(key=lambda t: t[1])
  d1=abs(point[-1][1]-point[0][1])
  if d1>d: d=d1
  point.sort(key=lambda t: t[2])
  d2=abs(point[-1][2]-point[0][2])
  if d2>d: d=d2
  return d/len(point)

def create_vector(point1,point2):
  return np.array([point2[i]-point1[i] for i in range(3)])
def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def angle_between(v1, v2):
    """ Returns the angle in radians between vectors 'v1' and 'v2'::

            >>> angle_between((1, 0, 0), (0, 1, 0))
            1.5707963267948966
            >>> angle_between((1, 0, 0), (1, 0, 0))
            0.0
            >>> angle_between((1, 0, 0), (-1, 0, 0))
            3.141592653589793
    """
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

def maximum_con(kt):
    res,max,pos=0,0,0
    for i in range(len(kt)):
      if (kt[i]==1): res+=1
      else: 
        if (res>max): 
          max=res
          pos=i
        res=0
    if (res>max): 
      max=res
      pos=len(kt)-1
    return [max,pos]

def Conefitter(path):
  pcd = o3d.io.read_point_cloud(path)
  print(pcd)
  size = (len(np.asarray(pcd.points)))
  print("Recompute the normal of the downsampled point cloud")
  pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.2,max_nn=30))
  o3d.visualization.draw_geometries([pcd])

  points = []
  for p in range(0,10000):
    i = random.randint(0, size-1)
    j = random.randint(0, size-1)
    k = random.randint(0, size-1)
    di = - (pcd.points[i][0] * pcd.normals[i][0] + pcd.points[i][1] * pcd.normals[i][1] + pcd.points[i][2] * pcd.normals[i][2])
    dj = - (pcd.points[j][0] * pcd.normals[j][0] + pcd.points[j][1] * pcd.normals[j][1] + pcd.points[j][2] * pcd.normals[j][2])
    dk = - (pcd.points[k][0] * pcd.normals[k][0] + pcd.points[k][1] * pcd.normals[k][1] + pcd.points[k][2] * pcd.normals[k][2])
    a = (pcd.points[i][0], pcd.points[i][1], pcd.points[i][2], di)
    b = (pcd.points[j][0], pcd.points[j][1], pcd.points[j][2], dj)
    try:
      c, d = plane_intersect(a,b)
    except:
        continue
    t =  -(c[0] + c[1] + c[2]) /((c[0]- d[0])* pcd.normals[k][0] + (c[1] - d[1]) * pcd.normals[k][1] + (c[2] - d[2])* pcd.normals[k][2])
    points.append([c[0] + (c[0] - d[0])* t, c[1] + (c[1] - d[1]) * t, c[2] + (c[2] - d[2]) * t])
  x_mean = 0
  y_mean = 0
  z_mean = 0
  for i in points:
    x_mean += i[0]
    y_mean += i[1]
    z_mean += i[2]
  x_mean = x_mean / 100000
  y_mean = y_mean / 100000
  z_mean = z_mean / 100000

  dist=[]
  for i in pcd.points:
    dist.append([distance(x_mean,y_mean,z_mean,i[0],i[1],[2]), i])

  dist.sort(key=cmp,reverse=True)
  exp=find_Denta(list(pcd.points))
  points_plane=[]
  kt=[0 for i in dist]

  for i in range(len(dist)-1):
    if abs(dist[i][0]-dist[i+1][0])<=exp: 
      kt[i]=1

  max,pos=maximum_con(kt)[0],maximum_con(kt)[1]
  for i in range(max):
    points_plane.append(dist[pos])
    pos-=1
  points_of_plane=[i[1] for i in points_plane]
  initial_guess = Circle3D([0, 0, 0], [0, 0, 1], 1)
  circle = circle3D_fit(points_of_plane, initial_guess=initial_guess)
  try:
    dir = circle.directi
  return angle_between(create_vector([x_mean,y_mean,z_mean],points_of_plane[0]),(circle.direction[0],circle.direction[1],circle.direction[2])), circle.direction[0],circle.direction[1],circle.direction[2], x_mean,y_mean,z_mean 


#Torus

In [None]:
def Torusfitter(path):
  with open(path, 'r') as f:
    verts = read_pc(f)
  pointcloud = np.array(verts)
  initial_guess = Torus([0, 0, 0], [0, 0, 1], 1, 0.1)
  torus = torus_fit(pointcloud, initial_guess=initial_guess)
  return torus

#Training

In [None]:
path = "/content/drive/MyDrive/SHREC2022/Binary/" #path to dataset

In [None]:
train_ds = PointCloudData(path, transform=train_transforms)

In [None]:
inv_classes = {i: cat for cat, i in train_ds.classes.items()};
inv_classes

In [None]:
print('Train dataset size: ', len(train_ds))
print('Valid dataset size: ', len(valid_ds))
print('Number of classes: ', len(train_ds.classes))
print('Sample pointcloud shape: ', train_ds[0]['pointcloud'].size())
print('Class: ', inv_classes[train_ds[0]['category']])

In [None]:
train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
pointnet = PointNet()
pointnet.to(device);

In [None]:
optimizer = torch.optim.Adam(pointnet.parameters(), lr=0.00025)

In [None]:
from tqdm import tqdm
def train(model, train_loader, val_loader=None,  epochs=10):
    j = 0
    for epoch in tqdm(range(epochs)): 
        pointnet.train()
        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
            optimizer.zero_grad()
            outputs, m3x3, m64x64 = pointnet(inputs.transpose(1,2))

            loss = pointnetloss(outputs, labels, m3x3, m64x64)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 5 == 4:    # print every 10 mini-batches
                print('[Epoch: %d, Batch: %4d / %4d], loss: %.3f' %
                    (epoch + 1, i + 1, len(train_loader), running_loss / 10))
                running_loss = 0.0
        # save the model
        torch.save(pointnet.state_dict(), "/content/drive/MyDrive/SHREC2022/model/binary_conecylinder_" + str(j) +  "_save.pth")
        j += 1

In [None]:
train(pointnet, train_loader, valid_loader)

#Classify and to csv

In [None]:
import pandas as pd

In [None]:
df_plane = pd.DataFrame(columns=["filename" , "nx", "ny", "nz", "x", "y", "z"])
df_cylinder = pd.DataFrame(columns=["filename" , 'rad', 'ax_x' , 'ax_y' , 'ax_z' , 'pt_x' , 'pt_y' , 'pt_z' ])
df_sphere = pd.DataFrame(columns=["filename" , 'rad', 'center_x', 'center_y' , 'center_z'])
df_torus = pd.DataFrame(columns=["filename" , 'b_rad' , 's_rad' , 'ax_x' , 'ax_y' , 'ax_z' , 'center_x' , 'center_y' , 'center_z'])
df_cone = pd.DataFrame(columns=["filename", 'aper', 'ax_x', 'ax_y', 'ax_z', 'v_x', 'v_y', 'v_z'])

In [None]:
from tqdm import tqdm
import os
def classify(num, path, df):
      for list_file in tqdm(os.listdir(path)):
        if num == 0:
          aper, ax_x, ax_y, ax_z, v_x, v_y, v_z = Conefitter(path+list_file)
          df = df.append({'filename' : list_file, 'aper' : aper, 'ax_x' : ax_x, 'ax_y' : ax_y, 'ax_z' : ax_z, 'v_x' : v_x, 'v_y': v_y, 'v_z': v_z}, ignore_index=True)

        if(num == 1):
          cylinder = Cylinderfitter(path+list_file)
          try:
            rad = cylinder.radius
            ax = cylinder.direction
            pt = cylinder.anchor_point
          except:
            rad=0
            ax=[-1000,-1000,-1000]
            pt = [-1000,-1000,-1000]
          
          df = df.append({'filename' : list_file, 'rad': rad, 'ax_x' : ax[0], 'ax_y' : ax[1], 'ax_z' : ax[2], 'pt_x' : pt[0], 'pt_y' : pt[1], 'pt_z' : pt[2]}, ignore_index=True)
          #print(2) 
        if(num == 2):
          n, x, y, z = Planefitter(path + list_file)
          df = df.append({'filename':  list_file , 'nx' : float(n[0,0]), 'ny' : float(n[1,0]), 'nz' : float(n[2,0]), 'x' : float(x), 'y' : float(y), 'z': float(z[0,0])}, ignore_index=True)
          print(n[0,0], n[1,0], n[2,0], x, y, z[0,0])
          print(df_plane)
          #print(1)
        if(num == 3):
          try:
            rad, x, y, z = Spherefitter(path+list_file)
          except:
            rad =0
            x= [-1000]
            y= [-1000]
            z= [-1000]
          df = df.append({'filename' : list_file, 'rad' : rad, 'center_x' : x[0], 'center_y' : y[0], 'center_z' : z[0]}, ignore_index=True)
          print(x[0],y[0],z[0])
          #print(3)
        if(num == 4):
          torus = Torusfitter(path+list_file)
          print(torus)
          try:
            b_rad = torus.major_radius
            s_rad = torus.minor_radius
            ax = torus.direction
            center = torus.center
          except:
            b_rad=0
            s_rad=0
            ax=[-1000,-1000,-1000]
            center=[-1000,-1000,-1000]
          df = df.append({'filename' : list_file, 'b_rad' : b_rad, 's_rad' : s_rad, 'ax_x' : ax[0], 'ax_y' : ax[1], 'ax_z' : ax[2], 'center_x' : center[0], 'center_y' : center[1], 'center_z' : center[2] }, ignore_index=True)
          #print(5)
      return df


In [None]:
for i in range(0,5):
  if i==0:
    df_cone = classify(0,"/content/drive/MyDrive/SHREC2022/ply_testdata/", df_cone)
    df_cone.to_csv('/content/drive/MyDrive/SHREC2022/csv_file/Cone.csv')
  elif i==1:
    df_cylinder = classify(1,"/content/drive/MyDrive/SHREC2022/real_testdata/testdata/", df_cylinder)
    df_cylinder.to_csv('/content/drive/MyDrive/SHREC2022/csv_file/Cylinder.csv')
  elif i==2:
    df_plane = classify(2,"/content/drive/MyDrive/SHREC2022/real_testdata/testdata/", df_plane)
    df_plane.to_csv('/content/drive/MyDrive/SHREC2022/csv_file/Plane.csv')
  elif i==3:
    df_sphere = classify(3,"/content/drive/MyDrive/SHREC2022/real_testdata/testdata/", df_sphere)
    df_sphere.to_csv('/content/drive/MyDrive/SHREC2022/csv_file/Sphere.csv')
  elif i==4:
    df_torus = classify(4,"/content/drive/MyDrive/SHREC2022/real_testdata/testdata/", df_torus)
    df_torus.to_csv('/content/drive/MyDrive/SHREC2022/csv_file/Torus.csv')


#Running


In [None]:
pointnet.load_state_dict(torch.load('/content/drive/MyDrive/SHREC2022/model/modelfull_9_save.pth'))

In [None]:
valid_ds = PointCloudData(path, valid=True, folder='test', transform=train_transforms)
valid_loader = DataLoader(dataset=valid_ds, batch_size=1)

In [None]:
pointnet.eval()
all_preds = []
all_labels = []
with torch.no_grad():
  for i, data in enumerate(valid_loader):
        print('Batch [%4d / %4d]' % (i+1, len(valid_loader)))
        inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
        outputs, __, __ = pointnet(inputs.transpose(1,2))
        _, preds = torch.max(outputs.data, 1)
        all_preds.append([preds.detach().cpu().numpy(), data['filename']])
        all_labels.append(preds.detach().cpu().numpy())

In [None]:
def sort_num(pc):
 return pc[2]

In [None]:
res = []
for i in all_preds:
  res.append([i[0][0], i[1][0], int(i[1][0][10:i[1][0].find('.')])])
res.sort(key = sort_num)

In [None]:
path = "/content/drive/MyDrive/SHREC2022/Run/Run3/"
j = 0
for i in res:
  file = i[1]
  print(path + file[:file.find('.')] + "_prediction.txt")
  with open(path + file[:file.find('.')] + "_prediction.txt" , 'w') as f:
    #print(path + str(i[1][:i[1].find('.')] + "_prediction.txt")
    if(i[0] == 0):
      f.write('4' + "\n")
      name = i[1][:i[1].find(".")] + ".ply"
      id = df_cone.index[df_cone['filename'] == name]
      f.write(str(df_cone['aper'].values[id][0]) + '\n')
      f.write(str(df_cone['ax_x'].values[id][0]) + '\n')
      f.write(str(df_cone['ax_y'].values[id][0]) + '\n')
      f.write(str(df_cone['ax_z'].values[id][0]) + '\n')
      f.write(str(df_cone['v_x'].values[id][0]) + '\n')
      f.write(str(df_cone['v_y'].values[id][0]) + '\n')
      f.write(str(df_cone['v_z'].values[id][0]) + '\n')
    elif(i[0] == 1):
      f.write('2' + "\n")
      id = df_cylinder.index[df_cylinder['filename'] == str(i[1])]
      f.write(str(df_cylinder['rad'].values[id][0]) + '\n')
      f.write(str(df_cylinder['ax_x'].values[id][0]) + '\n')
      f.write(str(df_cylinder['ax_y'].values[id][0]) + '\n')
      f.write(str(df_cylinder['ax_z'].values[id][0]) + '\n')
      f.write(str(df_cylinder['pt_x'].values[id][0]) + '\n')
      f.write(str(df_cylinder['pt_y'].values[id][0]) + '\n')
      f.write(str(df_cylinder['pt_z'].values[id][0]) + '\n')
    elif(i[0] == 2):
      f.write('1' + "\n")
      id = df_plane.index[df_plane['filename'] == str(i[1])]
      f.write(str(df_plane['nx'].values[id][0]) + '\n')
      f.write(str(df_plane['ny'].values[id][0]) + '\n')
      f.write(str(df_plane['nz'].values[id][0]) + '\n')
      f.write(str(df_plane['x'].values[id][0]) + '\n')
      f.write(str(df_plane['y'].values[id][0]) + '\n')
      f.write(str(df_plane['z'].values[id][0]) + '\n')
    elif(i[0] == 3):
      f.write('3' + "\n")
      id = df_sphere.index[df_sphere['filename'] == str(i[1])]
      f.write(str(df_sphere['rad'].values[id][0]) + '\n')
      f.write(str(df_sphere['center_x'].values[id][0]) + '\n')
      f.write(str(df_sphere['center_y'].values[id][0]) + '\n')
      f.write(str(df_sphere['center_z'].values[id][0]) + '\n')
    elif(i[0] == 4):
      f.write('4' + "\n")
      id = df_torus.index[df_torus['filename'] == str(i[1])]
      f.write(str(df_torus['b_rad'].values[id][0]) + '\n')
      f.write(str(df_torus['s_rad'].values[id][0]) + '\n')
      f.write(str(df_torus['ax_x'].values[id][0]) + '\n')
      f.write(str(df_torus['ax_y'].values[id][0]) + '\n')
      f.write(str(df_torus['ax_z'].values[id][0]) + '\n')
      f.write(str(df_torus['center_x'].values[id][0]) + '\n')
      f.write(str(df_torus['center_y'].values[id][0]) + '\n')
      f.write(str(df_torus['center_z'].values[id][0]) + '\n')
    j+= 1