# 2D Dataset handling
Dataset class with 
- Loading
- Prepocessing
- Printing

Also includes pretty-print functions.

In [None]:
# Imports
import tensorflow as tf
tf.config.experimental.set_memory_growth((tf.config.list_physical_devices('GPU'))[0], True)
import os, numpy as np, sys, matplotlib.pyplot as plt, matplotlib.patches as patches, matplotlib, json, cv2, math
import custom_layers

In [None]:
class Dataset2d():
    """ Class for handling 2D Datasets
    """
    ## Constants
    VELOCITY_FACTORS = tf.constant([1.,1.,50./70., 50/0.01]) # Factor of velocities for normalisation
    BATCH_SIZE_SEQUENCE = "sequence" # reference value as alternative for batch size
    DEFAULT_SETTINGS = {
            "step_size":1,
            "random_seed": 13,
            "train_test_ratio": 10, # 1/ratio will be size of test set
            "batch_size": 32, # integer or BATCH_SIZE_SEQUENCE. If BATCH_SIZE_SEQUENCE, will use the individual sequences
            "render_images": False, # if true, renders images instead of a feature vector
            "render_poles": False, # if render_images == True and this is set, poles are being drawn onto the squares
            "filter_zeros": True, # whether to filter lines with zero-velocities. Not everything tested if False. Also deactivates vector normalisation 
    }
    def __init__(self, path, settings={}, name=""):
        """Creates the Dataset object. 
            param path: The filepath of the Dataset
            param settings: An optinal dict with the setting options
            param name: An optional string name of the dataset
        """
        self.path, self.name, self.settings = path + "/", name, self.DEFAULT_SETTINGS.copy()
        for s in settings.keys(): # store given settings
            if s in self.settings.keys(): 
                self.settings[s] = settings[s]
                if s == "batch_size" and type(settings[s])!=int and settings[s]!=self.BATCH_SIZE_SEQUENCE:
                    raise Exception(f"{settings[s]} not a valid batch_size value. Should be int or '{self.BATCH_SIZE_SEQUENCE}'")
            else: raise Exception(f"{s} not a valid setting. Try: {self.DEFAULT_SETTINGS}")
        np.random.seed(self.settings["random_seed"]) # Seed the randomness
        if(self.settings['render_images']):
            self.transformEnvironment = self.transformEnvironment_to_rgb
            if(not self.settings['render_poles']):
                self.drawPoles = lambda _,_1,_2:None
        self.__loadSequences() # load csv files
        self.__storeTestTrain()# prepare tensors
    
    def __loadSequences(self):
        """Reads the csv files and loads them as sequences in the self.sequences list. Shuffles the list"""
        self.sequences = []
        def getCsvFilesRecursive(p):
            files = [p+"/"+f for f in os.listdir(p) if f.endswith(".csv")] # get csv file 
            for d in [p+"/"+d for d in os.listdir(p) if os.path.isdir(p+"/"+d)]: files +=getCsvFilesRecursive(d) # get lower directories
            return files        
        files = getCsvFilesRecursive(self.path) # load all CSV files
        for file in files: # load csv file
            csv = np.loadtxt(file, delimiter=',', skiprows=1, usecols=[i for i in range(12)], dtype=np.float32 )
            if(self.settings['filter_zeros']):
                csv = csv[np.any(csv[:,:4]!=0, axis=1) | np.all(csv==0, axis=1)] # remove zero-vel-vectors
            restarts = np.all(csv==0, axis=1) # restarts are represented as all-0 rows
            restarts = [-1]+[i for i,v in enumerate(restarts) if v]+[len(restarts)]
            self.sequences += [csv[v+1:restarts[i+1]:self.settings["step_size"]] for i,v in enumerate(restarts[:-1]) if len(csv[v+1:restarts[i+1]:self.settings["step_size"]])!=0]
        np.random.shuffle(self.sequences) # Shuffle the sequences, not within an individual sequence
    
    def __storeCovariances(self):
        """Calculates and stores dataset covariances internally"""
        def getEig(dataset):
            labels = np.vstack(dataset)[:,:4]
            if(self.settings['filter_zeros']):
                labels /= np.linalg.norm(labels, axis=-1, keepdims=True) # normalize labels
            sigma = tf.matmul(labels, labels, transpose_a=True) 
            sigma = sigma / len(labels)
            eigVals, eigVects = tf.linalg.eigh(sigma)
            return sigma.numpy(), eigVals.numpy()[::-1], eigVects.numpy()[:,::-1]
        self.train_cov, self.train_eigVals, self.train_eigVects = getEig(self._train_pure)
        self.test_cov, self.test_eigVals, self.test_eigVects = getEig(self._test_pure)
    
    def __storeTestTrain(self):
        """Split data in test and train sections and store internally"""
        testDivisor = len(self.sequences)//self.settings["train_test_ratio"]
        self._test_pure, self._train_pure = self.sequences[:testDivisor], self.sequences[testDivisor:]
        self.__storeCovariances()
        self.test_num_sequences, self.train_num_sequences = len(self._test_pure), len(self._train_pure) 
        self.test_num_datapoints, self.train_num_datapoints = (sum([len(t) for t in dataset]) for dataset in [self._test_pure, self._train_pure])
        if self.settings["batch_size"] == self.BATCH_SIZE_SEQUENCE:
            test, train = ([tf.convert_to_tensor(v) for v in dt] for dt in (self._test_pure, self._train_pure))
            self.test, self.train = (tf.data.Dataset.from_generator(lambda:(d for d in ds), 
                                        output_types=(tf.float32), output_shapes=(None,12)) \
                                     .cache().shuffle(dl) \
                                     .map(self.normaliseDatapoint, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
                                     .prefetch(tf.data.experimental.AUTOTUNE) \
                                     for ds, dl in zip((test, train),(self.test_num_sequences, self.train_num_sequences)))
        else: # Stack sequences to dataset, cache and shuffle it, batch, map and prefetch
            self.test, self.train = (tf.data.Dataset.from_tensor_slices(np.vstack(ds)) \
                                     .cache().shuffle(dl).batch(self.settings["batch_size"],drop_remainder=True) \
                                     .map(self.normaliseDatapoint, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
                                     .prefetch(tf.data.experimental.AUTOTUNE) \
                                     for ds, dl in zip((self._test_pure, self._train_pure), (self.test_num_datapoints, self.train_num_datapoints)))
        
    def __str__(self):
        with np.printoptions(precision=3, suppress=True, sign=' '):
            return  f"Dataset {self.name}\n"+\
                f"Path: {self.path}\nSettings: {json.dumps(self.settings,indent=2)}\n"+\
                f"Number of sequences: {len(self.sequences)}, with total length: {sum([len(s) for s in self.sequences])}\n"+\
                f"Train: {self.train_num_datapoints} datapoints of {self.train_num_sequences} sequences \n"+\
                f"Test: {self.test_num_datapoints} datapoints of {self.test_num_sequences} sequences \n"+\
                f"Train: {self.train}\nTest:  {self.test} \n"+\
                f"Covariance test:\n{self.test_cov}\nEigenvectors (-values below):\n{self.test_eigVects}\n\n {self.test_eigVals} \n"+\
                f"Covariance train:\n{self.train_cov}\nEigenvectors (-values below):\n{self.train_eigVects}\n\n {self.train_eigVals} \n"
        
    @tf.function
    def transformEnvironment(self, environment):
        """Transforms the Environment (model input): target, box1, box2 
        Transforms from local cartesian to local polar coordinate with inverse distance
        Value range angles: -1 to 1, with 0 being forwards
                    distance: 0=infinite distance, theoretical 1=same position
        """
        def getDist(pos): return tf.math.reciprocal_no_nan(tf.norm(pos, axis=-1))*10 # calulate inverse distance to position
        def getAngle(pos): return tf.math.atan2(pos[:,1], pos[:,0])/np.pi # calculate angle (radians/pi) to position
        def getRotation(delta_rot): return delta_rot/np.pi # calculate relative rotation
        return tf.stack([
                getDist(environment[:,0:2]), # distance to target
                getAngle(environment[:,0:2]),# angle to target
                getDist(environment[:,2:4]), # distance to box1
                getAngle(environment[:,2:4]),# angle to box1
                getRotation(environment[:,4]),# rotation of box 1
                getDist(environment[:,5:7]), # distance to box 2
                getAngle(environment[:,5:7]),# angle to box2
                getRotation(environment[:,7]) # rotation of box 2 
               ], axis=1)

    def drawPoles(self, img, boxPose, boxsize):
        """Draws a pole with 5x30 box
         param img: image to draw on
         param boxpose: pose (position and rotation) of box
         param boxsize: sidelength of box
        """
        angle = boxPose[2]
        localOffset = boxsize/2.+15
        a = 0.16514867741462683827912828964394 # diagonal angle in a 5x30 rectangle
        l = 15.206906325745549222499210613005 # half diagonal in a 5x30 rectangle

        si, co = math.sin(angle+a), math.cos(angle+a)
        siM, coM = math.sin(angle-a), math.cos(angle-a)
        cv2.fillConvexPoly(img, np.int32(np.array([[-co,-si],[-coM,-siM],[co,si],[coM,siM]])*l+\
            np.array(boxPose[:2])+(0,300)+(math.cos(angle)*localOffset, math.sin(angle)*localOffset)), (0,0,1))

    def drawSquare(self, img, pose, size): 
        """Draws a box on an image, potentially with pole
         param img: Image to draw on
         param pose: Pose (position and rotation) of the center of the square
         param size: size of the square
        """
        angle = pose[2] + math.pi/4
        the_size = size/math.sqrt(2)
        co, si = np.cos(angle)*the_size, np.sin(angle)*the_size
        cv2.fillConvexPoly(img, np.int32(np.array([[-si,co],[co,si],[si, -co],[-co, -si]])+np.array(pose[:2]))+(0,300), (0,0,1))
        self.drawPoles(img, pose, size)

    def _getRGB_cv(self, feature):
        """Plot a point in the dataset
         param feature: The flat feature vector of the dataset (len(feature) == 8) 
        """
        img = np.zeros((600,600,3), dtype=np.float32) # create empty canvas
        cv2.circle(img, (int(feature[0]),int(feature[1]+300)), 10, (1,0,0), -1) # draw target
        self.drawSquare(img, feature[2:5], 20)
        self.drawSquare(img, feature[5:8], 30)
        return [img]
    
    @tf.function
    def transformEnvironment_to_rgb(self, environment):
        """ Transforms the environment to rgb images
         param environment: feature vector
        """
        @tf.function
        def getRGB_tf(env):
            return tf.numpy_function(self._getRGB_cv, [env], tf.float32)
        return tf.reshape(tf.map_fn(getRGB_tf, environment), (self.settings['batch_size'],600,600,3))
    
    @tf.function
    def normaliseDatapoint(self, data_batch):
        """ Normalises the Datapoint. Takes a batch (batch_size x 12)
        Returns the (feature, label) structure of feature = normalised Environment and label = normalised Velocities"""
        environment, velocity = data_batch[:,4:], data_batch[:,:4]
        velocity *= self.VELOCITY_FACTORS
        if(self.settings['filter_zeros']):
            normalisedVel, _ = tf.linalg.normalize(velocity, axis=-1)
        else: 
            normalisedVel = velocity
            print("Warning: Velocities not normalised")
        normalisedEnv = self.transformEnvironment(environment)
        return normalisedEnv, normalisedVel

In [None]:
if __name__ == '__main__':
    # Benchmark tests for efficient loading
    def benchmark(dataset, num_epochs=2):
        def add(a): # placeholder method for a training step
            pass
            #c=a+1
        import time
        start_time = time.perf_counter()
        for epoch_num in range(num_epochs):
            for featBatch, labelBatch in dataset:
                # Performing a training step
                add(featBatch)
                pass
        tf.print("Execution time:", time.perf_counter() - start_time)
    path= "../Datasets/Dataset_square_boxes/FoP/"
    print("Benchmarks:\n")
    benchmark(Dataset2d(path,{}).train, num_epochs = 5)
    benchmark(Dataset2d(path,{"batch_size":"sequence"}).train, num_epochs = 5)
    benchmark(Dataset2d(path,{'render_images':True, 'batch_size':4}).test, num_epochs = 5)
    benchmark(Dataset2d(path,{'render_images':True, 'render_poles':True, 'batch_size':4}).test, num_epochs = 5)

In [None]:
# Store Model
def storeModel(model, path):
    """Stores the model as json and weights in the folder defined by path"""
    with open(f"{path}/model.json", "w") as f:
        f.write(model.to_json())
    model.save_weights(f"{path}/model_weights.h5")

In [None]:
if __name__ == '__main__':
    eigLayer = custom_layers.Eigenvector_layer(returnEigValues=True) # Triggers constant tf retracing if in function, therefore is outside
def plotDatapoint(feature, label, figsize=(5,5), model=None, loss=None, useEigenvectors=False, useSigma=False, custom_func=None, printTexts=True):
    """Plot a point in the dataset. Handles both rgb labels as well as flat vectors of shape (8,)
     param feature: Featurevector (model input) as environment
     param label: The label to the situation
     param figsize: Size of figures. Default: (5,5) 
     param model: Optionally, apply this model to the data. Default: None, 
     param loss: Optionally, apply this loss to the model data. Default: None, 
     param useEigenvectors: Whether eigenvectors are used. Default: False, 
     param useSigma: Whether the network directly outputs covariances. Default: False, 
     param custom_func: An optional function to be applied on the data as custom_func(feature, label, model, loss). Default: None, 
     param printTexts: Whether to print texts. Default: True
    """
    pprint = print if printTexts else lambda _:None
    def transformPose(pose):
        pos = [func((.5-pose[1])*np.pi)/(pose[0]/10) for func in (np.sin, np.cos)]
        if len(pose)>2:
            rot = pose[2]*np.pi
            return pos+[rot]
        return pos
    def rotate2dVector(vector, angle):
        co, si = np.cos(angle), np.sin(angle)
        R = np.array([ [co, -si], [si, co] ])
        return np.matmul(R, vector)
    def drawBox(ax, pose, width, height, color='b'):
        offset = rotate2dVector([[-width/2],[-height/2]], pose[2])
        patch = patches.Rectangle((pose[0]+offset[0], pose[1]+offset[1]), width,height, angle=np.degrees(pose[2]),linewidth=0,edgecolor='none',facecolor=color)
        ax.add_patch(patch)
    def drawTarget(ax, pose, radius=10):
        ax.add_patch(patches.Circle(pose, radius, facecolor='r'))
    def getArrowTip(lines, scale = 1.):
        vec = (lines[-2]-lines[-1])*scale
        return [lines[-1]+rotate2dVector(vec, np.radians(20*a)) for a in (1,-1)]+[lines[-1]]
    def drawVel(vel, externalPointOffset=30, color='g'): # draw velocity arrows
        speedPerDim = np.array([50., 50., 70., .01])
        if isinstance(vel, tf.Tensor): vel = vel.numpy()
        if np.linalg.norm(vel[:2]) > 1.: # normalise XY movement
            vel[:2]  /= np.linalg.norm(vel[:2])
        scaledDof = vel * speedPerDim * [0.2, 0.2, 0.2, 100]
        centerPoints = [0,0]
        angles=[0]
        for i in range(10):
            centerPoints.append(centerPoints[-1] + rotate2dVector(scaledDof[:2], angles[-1]))
            angles.append(angles[-1] + np.radians(scaledDof[2]))
        externalPointsToDraw = (0,-externalPointOffset), (0,externalPointOffset)
        for p in externalPointsToDraw:
            outerPoints = [c + rotate2dVector(p, a) for c,a in zip(centerPoints, angles)]
            outerPoints = np.array(outerPoints + getArrowTip(outerPoints))
            plt.plot(outerPoints[:,0], outerPoints[:,1],color)
            gripperPoints = [np.array(p)+[abs(p[1]*(3/4)),p[1]*abs(scaledDof[3])], np.array(p)+[abs(p[1]*(3/4)),0]]
            gripperPoints = gripperPoints if np.sign(scaledDof[3])>0 else [gripperPoints[1],gripperPoints[0]]
            gripperPoints = np.array(gripperPoints + getArrowTip(gripperPoints, 0.5))
            plt.plot(gripperPoints[:,0], gripperPoints[:,1], color)
        
    if isinstance(feature, tf.Tensor):
        feature_np = feature.numpy()
    else: feature_np = feature
    plt.figure(figsize=figsize)
    ax = plt.gca()    # Get the current reference
    ax.set_aspect('equal')
    if feature_np.ndim == 1: # assuming the feature vector to have size 8
        target = transformPose(feature_np[:2])
        box1, box2 = transformPose(feature_np[2:5]), transformPose(feature_np[5:8])
        plt.xlim(-400,400)
        plt.ylim(400,-400)
        # Draw boxes / target
        drawBox(ax, box1, 20,20)
        drawBox(ax, box2, 30,30)
        drawTarget(ax, target)
    else:
        # assuming the feature vector to be an image
        feature_np = feature_np+1-np.sum(feature_np,axis=-1, keepdims=True)
        plt.imshow(feature_np, aspect='equal', extent=(-0.5, feature_np.shape[1]-.5, feature_np.shape[0]-feature_np.shape[0]/2, -feature_np.shape[0]/2))
    # Draw robot:
    drawBox(ax, (0,0,0), 20,70, color='r')
    drawBox(ax, (20,-30,0), 20,10, color='black')
    drawBox(ax, (20,30,0), 20,10, color='black')
    drawVel(label)
    with np.printoptions(precision=3, suppress=True, sign=' '):
        label = tf.expand_dims(label,0)
        pprint(f"velocity: {label.numpy()}")
        pprint(f"state: {feature_np}")
        if model != None:
            pred = model.predict(tf.expand_dims(tf.convert_to_tensor(feature), 0))
            pprint(f"Prediction:\n{pred}")
            if loss != None:
                pprint(f"\nLoss:\n{loss(label, pred)}")
            if not useEigenvectors and not useSigma:        
                drawVel(np.squeeze(pred[0]),60, 'orange')
            else:
                if useEigenvectors:
                    eigVects, eigVals = eigLayer(tf.convert_to_tensor(pred))
                else:
                    eigVals, eigVects = tf.linalg.eigh(pred) # useSigma != True
                    eigVals, eigVects = tf.reverse(eigVals, [-1]), tf.reverse(eigVects, [-1])
                drawVel(eigVects[0,:,1],80, 'yellow')                
                drawVel(eigVects[0,:,0],60, 'orange')
                pprint(f"\nEigvectors:\n{eigVects}")
                pprint(f"\nEigvalues:\n{eigVals}")
    if custom_func != None:
        custom_func(feature, label, model, loss)
    plt.show()

def plotDatapointBatch(dataset, numPlots=1, figsize=(20,20), model= None, loss=None, useEigenvectors=False, useSigma=False, custom_func=None, printTexts=True):
    """Plots Batches of the dataset. Default: only show first. Handles both rgb labels as well as flat vectors of shape (8,)
     param dataset: Dataset of which the batches are to be generated 
     param figsize: Size of figures. Default: (20,20) 
     param model: Optionally, apply this model to the data. Default: None, 
     param loss: Optionally, apply this loss to the model data. Default: None, 
     param useEigenvectors: Whether eigenvectors are used. Default: False, 
     param useSigma: Whether the network directly outputs covariances. Default: False, 
     param custom_func: An optional function to be applied on the data as custom_func(feature, label, model, loss). Default: None, 
     param printTexts: Whether to print texts. Default: True
    """
    for featureBatch, labelBatch in dataset:
        for feature,label in zip(featureBatch, labelBatch):
            plotDatapoint(feature, label, figsize, model=model, loss=loss, useEigenvectors=useEigenvectors, useSigma=useSigma, custom_func=custom_func, printTexts=printTexts)
            numPlots -= 1
            if not numPlots: break
        if not numPlots: break

In [None]:
if __name__ == '__main__':
    import import_ipynb
    MomentMetrics = import_ipynb.NotebookLoader(sys.path).load_module("MomentMetrics")
    # Test:
    datasetPath= "../Datasets/Dataset_boxes_with_poles/"
    modelPath= "../JS_Simulation/namedModels/empty_model/"
    ds = Dataset2d(datasetPath, {"batch_size":32, "train_test_ratio":2})
    test = ds.test
    with open(f"{modelPath}/model.json", "r") as f:
        model = tf.keras.models.model_from_json(f.read(), custom_objects={
            'custom_layers':custom_layers, 
            'PathNormalisation_layer':custom_layers.PathNormalisation_layer,
            'Covariance_layer':custom_layers.Covariance_layer})
    model.load_weights(f"{modelPath}/model_weights.h5")
    
    plotDatapointBatch(test, figsize=(5,5), numPlots=1, model=model, 
                       loss=MomentMetrics.momentLoss, useEigenvectors=False, useSigma=True)