In [None]:
import pyrealsense2 as rs
import open3d as o3d
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Debug)
import numpy as np
import math
import cv2 as cv
import scipy.ndimage as nd
# Used to display Matplotlib plots in Jupyter
# %matplotlib inline
import matplotlib.pyplot as plt
import time

In [None]:
class RealSense():
    def __init__(self,zMax=0.5):
        self.pinholeIntrinsics = None
        self.zMax = zMax # max distance for objects in depth images (m)
        # downsample point cloud with voxel size = 1 mm (0.001 m / 0.04 in)
        self.voxelSize = 0.001
        self.pcd = o3d.geometry.PointCloud() # current pcd from realsense
        self.extrinsics = np.eye(4) # extrinsic parameters of the camera frame 4 x 4 numpy matrix
        self.cameraFrameTransform = np.eye(4) # Transform to display camera frame when moving to objects in the world frame
        self.pipe,self.config = None,None
    
    def initConnection(self):
        # Initializes connection to realsense, sets pipe,config values
        self.pipe = rs.pipeline()
        self.config = rs.config()

        # Getting information about the connected realsense model (device object) - D405
        pipeProfile = self.config.resolve(rs.pipeline_wrapper(self.pipe))
        device = pipeProfile.get_device()
        depth_sensor = device.first_depth_sensor()
        self.depthScale = depth_sensor.get_depth_scale()
        # print(depth_scale)

        # 1 - default, 2 - hand, 3 - high accuracy, 4 - high density, 5 - medium density
        depth_sensor.set_option(rs.option.visual_preset,4) # 4 corresponds to high-density option

        # Setting attributes for stream
        # Depth Stream (1280 x 720) 5 fps - D405 Sensor has max 1280 x 720
        # (Minimum z depth is between 55-70 mm)
        self.config.enable_stream(rs.stream.depth,1280,720,rs.format.z16,5)

        # Color and Infrared D405 Streams Available (1280 x 720) 5 fps - D405 Sensor has max 1280 x 720
        self.config.enable_stream(rs.stream.color,1280,720,rs.format.rgb8,5)

        # Starting the pipeline based on the specified configuration
        self.pipe.start(self.config)
        
    def getPinholeInstrinsics(self,frame):
        # frame is a subclass of pyrealsense2.video_frame (depth_frame,etc)
        intrinsics = frame.profile.as_video_stream_profile().intrinsics
        return o3d.camera.PinholeCameraIntrinsic(intrinsics.width,intrinsics.height, intrinsics.fx,
                                                intrinsics.fy, intrinsics.ppx,
                                                intrinsics.ppy)


    def takeImages(self,saveImages=False):
        # returns Open3D RGBD Image
        # Sets class value for intrinsic parameters
        # Starting the pipeline based on the specified configuration
        pipe,config = self.pipe,self.config
        frames = pipe.wait_for_frames()
        depthFrame = frames.get_depth_frame() # pyrealsense2.depth_frame
        colorFrame = frames.get_color_frame()
        
        self.pinholeInstrinsics = self.getPinholeInstrinsics(colorFrame)
        # asign extrinsics here if the camera pose is known
        # alignOperator maps depth frames to color frames
        alignOperator = rs.align(rs.stream.color)
        alignOperator.process(frames)
        alignedDepthFrame,alignedColorFrame = frames.get_depth_frame(),frames.get_color_frame()
        
        # unmodified rgb and z images as numpy arrays of 3 and 1 channels
        rawColorImage = np.array(alignedColorFrame.get_data())
        rawDepthImage = np.asarray(alignedDepthFrame.get_data())

        rawRGBDImage = o3d.geometry.RGBDImage.create_from_color_and_depth(
            o3d.geometry.Image(rawColorImage),
            o3d.geometry.Image(rawDepthImage.astype('uint16')),
            depth_scale=1.0 / self.depthScale,
            depth_trunc = self.zMax,
            convert_rgb_to_intensity=False)
        
        subFix = "Back" # append to end of file paths
        if saveImages:
            np.save(f"newRealsense/depthImage{subFix}",rawRGBDImage.depth)
            np.save(f"newRealsense/colorImage{subFix}",rawRGBDImage.color)
            colorIM = Image.fromarray(rawColorImage)
            colorIM.save(f"newRealsense/colorImage{subFix}.jpeg")
        return rawRGBDImage

    def getPCD(self,saveData=False):
        # depthImage: 1 channel numpy array of z values
        # colorImage: 3 channel numpy array of rgb values
        # zMax: disance at which points are cut off
        # display: boolean, toggles if point cloud should be shown
        # out: tuple of (open3d point cloud (o3d.geometry.PointCloud),RGBDImage)
        rawRGBDImage = self.takeImages()
        if (self.extrinsics is None) == False:
            print("Extrinsics defined")
            pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rawRGBDImage,self.pinholeInstrinsics,extrinsic=self.extrinsics,project_valid_depth_only=True) # extrinsics pose
        else:
            pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rawRGBDImage,self.pinholeInstrinsics,project_valid_depth_only=True) # default pose
        # flip_transform = [[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]
        # pcd.transform(flip_transform)
        downsampledPCD = pcd.voxel_down_sample(voxel_size=self.voxelSize)
        if saveData:
            subFix = "3"
            np.save(f"colorImage{subFix}",np.array(rawRGBDImage.color))
            np.save(f"depthImage{subFix}",np.array(rawRGBDImage.depth))
            o3d.io.write_point_cloud(f"pcd{subFix}.pcd",downsampledPCD)
        return downsampledPCD,rawRGBDImage

    def displayImages(self,depthImg,colorImg):
        # Displays a depth and color image given the rgbdImage
        plt.subplot(1,2,1)
        plt.title("RealSense Color Image")
        plt.imshow(depthImg)
        plt.subplot(1,2,2)
        plt.title("RealSense Depth Image")
        plt.imshow(colorImg)
        plt.show()
    
    def displayPCD(self,pcds):
        # Displays a list of point clouds given an array of pcd's. Displays camera frame if self.extrinsics != None
        # flip_transform = [[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]]
        # pcd.transform(flip_transform)
        worldFrame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.075,origin=[0,0,0])
        if (self.extrinsics is None) == False:
            cameraFrame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.075)
            cameraFrameTransform = self.cameraFrameTransform
            # cameraFrameTransform[0:3,3] * -1
            print(f"transform\n: {cameraFrameTransform}")
            cameraFrame.transform(cameraFrameTransform)
            res = [worldFrame,cameraFrame]
            res.extend(pcds)
            baseSphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.025)
            res.append(baseSphere)
            # testExtrinsic = np.array([[1,0,0,0],[0,1,0,0],[0,0,1,1],[0,0,0,1]])
            # testFrame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.25)
            # testFrame.transform(testExtrinsic)
            # res.append(testFrame)
            o3d.visualization.draw_geometries(res)
        else:
            res = [worldFrame].extend(pcds)
            o3d.visualization.draw_geometries(res)
    
    def displayStream(self):
        # streams and displays the point cloud data in open3d
        # pipe,config are stream properties set in the earlier cells 
        # Streaming loop
        pipe,config = self.pipe,self.config
        vis = o3d.visualization.Visualizer()
        vis.create_window()
        framesTaken = 0
        displayedPCD = o3d.geometry.PointCloud()
        try:
            while True:
                temp = self.getPCD()[0]
                displayedPCD.points = temp.points
                displayedPCD.colors = temp.colors
                if framesTaken == 0:
                    vis.add_geometry(displayedPCD)
                    worldFrame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.25,origin=[0,0,0])
                    vis.add_geometry(worldFrame)
                vis.update_geometry(displayedPCD)
                framesTaken += 1
                t0 = time.time()
                vis.poll_events()
                vis.update_renderer()
                #time.sleep(5)
        except Exception as e:
            print(f"Stream Issue. Exception Raised")
            raise(e)
        # closes window when cell is stopped (exception raised)
        finally:
            vis.destroy_window()
            pipe.stop()


In [None]:
# real = RealSense()
# real.initConnection()
# real.getPCD(True)
# real.extrinsics = np.array(robot_model.getCameraFrame())
# print(real.extrinsics)
# pcd,rgbdImage = real.getPCD(True)
# real.displayImages(rgbdImage)
# real.displayPCD(pcd)
# real.pipe.stop()

# o3d.visualization.draw()
# r.displayStream()
# finally:
#     r.pipe.stop()
# profile = pipe.start(config)
# depth_sensor = profile.get_device().first_depth_sensor()
# print(
# r.displayStream(pipe,config)

In [None]:
class ObjectDetection():
    # This class creates a RealSense Object, takes images and returns Open3D point clouds corresponding to blocks
    def __init__(self,robot_model,moveRelative = True):
        # :moveRelative boolean - True if the gripper moves to block positions in the camera frame, false if moving to world frame positions (via rtb_model)
        # robot_model is object of type RTB_Model
        self.real = RealSense()
        self.real.initConnection()
        if moveRelative:
            T_C = np.eye(4)
            # R = np.array([[0,-1,0],[1,0,0],[0,0,1]])
            t = np.array([0,9,59.3]) / 1000
            R = np.array([[0,1,0],[-1,0,0],[0,0,1]])
            camera_frame_transform = sm.SE3.Rt(R,t)
            self.real.cameraFrameTransform = np.array(camera_frame_transform)
            print(f"Camera Frame Transform:\n{self.real.cameraFrameTransform}")
            self.real.extrinsics = np.array(camera_frame_transform.inv())
            print(f"Extrinsics:\n{self.real.extrinsics}")
        else:
            T_C = robot_model.getCameraFrame()
            print(T_C)
            self.real.cameraFrameTransform = np.array(T_C)
            self.real.extrinsics = np.array(T_C.inv())

    def getMask(self,colorImage,minHSV,maxHSV):
        # :img 3-channel rgb image as numpy array
        # :minHSV minimum hsv bound
        # :maxHSV max hsv bound
        # Segments image based on bounds in hsv color space
        # returns a 1-channel binary numpy array that results from segmentation
        # convert image to hsv
        hsv = cv.cvtColor(np.array(colorImage),cv.COLOR_RGB2HSV)
        mask = cv.inRange(hsv,minHSV,maxHSV)
        # binary closing with 10 x 10 kernel (dilation then erosion) to fill in gaps
        closedMask = nd.binary_closing(mask,np.ones((10,10)),iterations=5)
        return closedMask
    
    
    def colorSegmentation(self,colorImage,depthImage):
        # Takes a depth and color image from the realsense and processes the images
        # colorImage,depthImage = RGBD_Image.color,RGBD_Image.depth
        # Returns a tuple containing the pcd's associated with each set of blocks
        redMask = self.getMask(colorImage,(0,110,20),(15,255,255))
        yellowMask = self.getMask(colorImage,(20,90,20),(35,255,255))
        blueMask = self.getMask(colorImage,(95,90,20),(130,255,255)) 
        
        print("Masks")
        fig,ax = plt.subplots(1,5)
        ax[0].imshow(redMask)
        ax[1].imshow(yellowMask)
        ax[2].imshow(blueMask)
        ax[3].imshow(colorImage)
        ax[4].imshow(depthImage)
        plt.show()
        
        redDepthImage = np.multiply(depthImage,redMask.astype(int)).astype('float32')
        yellowDepthImage = np.multiply(depthImage,yellowMask.astype(int)).astype('float32')
        blueDepthImage = np.multiply(depthImage,blueMask.astype(int)).astype('float32')

        print("Depth Images (Segmented)")
        fig,ax = plt.subplots(1,3)
        ax[0].imshow(redDepthImage)
        ax[1].imshow(yellowDepthImage)
        ax[2].imshow(blueDepthImage)
        plt.show()
        
        # SEGMENT PCD INTO RED,YELLOW,BLUE BLOCKS    
        depthScale = self.real.depthScale
        
        
        # Create Segmented RGBD Images for Each Color
        redRGDB_Image = o3d.geometry.RGBDImage.create_from_color_and_depth(
            o3d.geometry.Image(colorImage),
            o3d.geometry.Image(redDepthImage),
            convert_rgb_to_intensity=False,
            depth_scale=1
        )
        
        yellowRGDB_Image = o3d.geometry.RGBDImage.create_from_color_and_depth(
            o3d.geometry.Image(colorImage),
            o3d.geometry.Image(yellowDepthImage),
            convert_rgb_to_intensity=False,
            depth_scale=1
        )
    
        blueRGBD_Image = o3d.geometry.RGBDImage.create_from_color_and_depth(
            o3d.geometry.Image(colorImage),
            o3d.geometry.Image(blueDepthImage),
            convert_rgb_to_intensity=False,
            depth_scale=1
        )
        
        print("RGBD Depth Images)")
        fig,ax = plt.subplots(1,3)
        ax[0].imshow(np.array(redRGDB_Image.depth))
        ax[1].imshow(np.array(yellowRGDB_Image.depth))
        ax[2].imshow(np.array(blueRGBD_Image.depth))
        plt.show()
        
        print(f"Extrinsics: {self.real.extrinsics}")
        # testExtrinsic = np.array([[1,0,0,0],[0,1,0,0],[0,0,1,1],[0,0,0,1]])
        # print(f"testExtrinsics: \n{testExtrinsic}")
        # Create Point Clouds for Each Color
        redPCD = o3d.geometry.PointCloud.create_from_rgbd_image(
            redRGDB_Image,
            self.real.pinholeInstrinsics,
            project_valid_depth_only=True,
            extrinsic = self.real.extrinsics
        )
        yellowPCD = o3d.geometry.PointCloud.create_from_rgbd_image(
            yellowRGDB_Image,
            self.real.pinholeInstrinsics,
            project_valid_depth_only=True,
            extrinsic = self.real.extrinsics
        )
        bluePCD = o3d.geometry.PointCloud.create_from_rgbd_image(
            blueRGBD_Image,
            self.real.pinholeInstrinsics,
            project_valid_depth_only=True,
            extrinsic = self.real.extrinsics
        )
        
        # Downsample point cloud's based on realsense voxel_size parameter
        redPCD = redPCD.voxel_down_sample(voxel_size=self.real.voxelSize)
        yellowPCD = yellowPCD.voxel_down_sample(voxel_size=self.real.voxelSize)
        bluePCD = bluePCD.voxel_down_sample(voxel_size=self.real.voxelSize)
        
        redPCD.paint_uniform_color([1,0,0])
        yellowPCD.paint_uniform_color([1,1,0])
        bluePCD.paint_uniform_color([0,0,1])
        # o3d.visualization.draw_geometries([redPCD,yellowPCD,bluePCD])
        return (redPCD,yellowPCD,bluePCD)
