In [1]:
from tqdm.notebook import tqdm
import os
from glob import glob
import json
import matplotlib.pyplot as plt
from typing import List, Dict, Tuple

import albumentations as A
import cv2
import numpy as np
import torch
from torch import nn 
from torchvision.ops import DeformConv2d 
from torch.utils.data import Dataset, DataLoader

import pycocotools.coco as coco
from pycocotools.cocoeval import COCOeval

In [2]:
base = 'data/F0'
Images = os.path.join(base, 'Images', '1/*.tiff') 
Poses = os.path.join(base, 'Poses', '1.json')

In [8]:
images = [os.path.basename(fn) for fn in sorted(glob(Images))]
poses = json.load(open(Poses, 'rb'))['images']

images_with_pose = []
for pose in poses:  # pose is a dictionary
    images_with_pose.append(pose['imagefile'])

images_with_pose = sorted(images_with_pose)

In [12]:
# all images are annotated with a pose! (F0 lane 1)
images = np.asarray(images)
images_with_pose = np.asarray(images_with_pose)
(images == images_with_pose).all()

True

In [21]:
def poses_info(folder_paths: List[str]):
    for folder_path in folder_paths:
        for image_folder in glob(os.path.join(folder_path, 'Images', '*')):
            folder_nr = image_folder.split('\\')[-1]
            
            Images = os.path.join(image_folder, '*.tiff')
            images = [os.path.basename(fn) for fn in sorted(glob(Images))] 
            
            Poses = os.path.join(folder_path, 'Poses', f'{folder_nr}.json')
            poses = json.load(open(Poses, 'rb'))['images']

            images_with_pose = []
            for pose in poses:  # pose is a dictionary
                images_with_pose.append(pose['imagefile'])
            
            images_with_pose = sorted(images_with_pose)
            images = np.asarray(images)
            images_with_pose = np.asarray(images_with_pose)
            if (images == images_with_pose).all():
                # all images have a pose
                print('passed at:', image_folder)
            else:
                print('failed at:', image_folder)


poses_info([f'data/F{i}' for i in range(12)])

passed at: data/F0\Images\1
passed at: data/F0\Images\10
passed at: data/F0\Images\11
passed at: data/F0\Images\12
passed at: data/F0\Images\13
passed at: data/F0\Images\2
passed at: data/F0\Images\3
passed at: data/F0\Images\4
passed at: data/F0\Images\5
passed at: data/F0\Images\6
passed at: data/F0\Images\7
passed at: data/F0\Images\8
passed at: data/F0\Images\9
passed at: data/F1\Images\1
passed at: data/F1\Images\2
passed at: data/F1\Images\3
passed at: data/F1\Images\4
passed at: data/F1\Images\5
passed at: data/F1\Images\6
passed at: data/F2\Images\1
passed at: data/F2\Images\2
passed at: data/F2\Images\3
passed at: data/F2\Images\4
passed at: data/F2\Images\5
passed at: data/F2\Images\6
passed at: data/F2\Images\7
passed at: data/F2\Images\8
passed at: data/F3\Images\1
passed at: data/F3\Images\10
passed at: data/F3\Images\11
passed at: data/F3\Images\12
passed at: data/F3\Images\2
passed at: data/F3\Images\3
passed at: data/F3\Images\4
passed at: data/F3\Images\5
passed at: da

In [32]:
def extract_data(folder_paths: List[str]): 
    data = {}
    nr = 0  # unique lane number, can be used to shuffle in training
    for folder_path in folder_paths:
        pose_files = sorted(glob(os.path.join(folder_path, 'Poses', '*.json')))
        label_files = sorted(glob(os.path.join(folder_path, 'Labels', '*.json')))
        for pose_file, label_file in zip(pose_files, label_files):
            data[nr] = {} 
            poses = json.load(open(pose_file, 'rb'))['images']
            labels = json.load(open(label_file, 'rb'))['Labels']
            
            images_with_poses = []
            polys = []
            for pose in poses:
                images_with_poses.append((pose['imagefile'], pose['M3x4']))

            if isinstance(labels, list) and len(labels):  
                for label in labels:
                    polys.append(label['poly'])
                # for each lane we have a single annotated image (for the integral image)
                data[nr]['annotated_image'] = labels[0]['imagefile']
                # polygons: [ [[x1, y1], ...next point], ...next polygon ]
                data[nr]['polys'] = polys
            else:  # otherwise empty => no annotations (no humans)
                data[nr]['annotated_image'] = data[nr]['polys'] = None

            # we have the camera extrinsics for each image in the lane
            data[nr]['images_with_poses'] = images_with_poses
            nr += 1
            
    return data


# training folder names are from F0 to F11
data = extract_data([f'data/F{i}' for i in range(12)])

In [38]:
class Baseset(Dataset):

    def __init__(self, folders: List[str], h: int, w: int, transform=None):
        super().__init__()
        self.folders = folders  # e.g. ['data/F0', ...]
        self.h = h  # image height
        self.w = w  # image width
        self.transform = transform
        
        self.data = self.extract_data(folders)
        self.size = len(list(self.data.keys()))

    @staticmethod
    def extract_data(folders: List[str]):
        '''
        {0: {'images_with_poses': [('name.tiff', pose_M3x4), ...], 
             'annotated_image': 'center.tiff', 
             'polys': [[...]], }, 
         1: {...}, ...}
        ''' 
        data = {}
        nr = 0  # unique lane number, can be used to shuffle in training
        for folder_path in folders:
            pose_files = sorted(glob(os.path.join(folder_path, 'Poses', '*.json')))
            label_files = sorted(glob(os.path.join(folder_path, 'Labels', '*.json')))
            for pose_file, label_file in zip(pose_files, label_files):
                data[nr] = {} 
                poses = json.load(open(pose_file, 'rb'))['images']
                labels = json.load(open(label_file, 'rb'))['Labels']
                
                images_with_poses = []
                polys = []
                for pose in poses:
                    images_with_poses.append((pose['imagefile'], pose['M3x4']))

                if isinstance(labels, list) and len(labels):  
                    for label in labels:
                        polys.append(label['poly'])
                    # each lane has a single annotated image (the integral image)
                    data[nr]['annotated_image'] = labels[0]['imagefile']
                    # polygons: [ [[x1, y1], ...next point], ...next polygon ]
                    data[nr]['polys'] = polys
                else:  # otherwise empty => no annotations (no humans)
                    data[nr]['annotated_image'] = data[nr]['polys'] = None

                # we have the camera extrinsics for each image in the lane
                data[nr]['images_with_poses'] = images_with_poses
                nr += 1
                
        return data

    @staticmethod
    def load_camera_params(param_folder: str = 'calibration/parameters'):
        ''' Camera intrinsics K and distortion coefficients. '''
        K = np.load(os.path.join(folder, 'K.npy'))
        dist_coeffs = np.load(os.path.join(folder, 'dist_coeffs.npy'))
        return K, dist_coeffs

    @staticmethod
    def imread(paths: List[str], K, dist_coeffs, undistort=False):
        images = []
        for path in paths:
            image = cv2.imread(path, cv2.IMREAD_ANYDEPTH)  # 16 bit image
            image = cv2.normalize(image, dst=None, alpha=0, beta=2**16 - 1, 
                norm_type=cv2.NORM_MINMAX)
            image = (image >> 8).astype(np.uint8)  # 8 bit image
            
            if undistort:  # undistort camera images
                h, w = image.shape
                # new camera intrinsics based on free scaling parameter
                refined_K, roi = cv2.getOptimalNewCameraMatrix(K, dist_coeffs, 
                    (w, h), 1, (w, h))
                x, y, w, h = roi
                image = image[y:y+h, x:x+w]
                image = cv2.undistort(image, K, dist_coeffs, None, refined_K)
            
            images.append(image)

        return images

    @staticmethod
    def integrate(images_with_poses: List[Tuple[str, List[List[float]]]], z: float):
        pass

    def __len__(self):
        return self.size

    def _float(self, x: float):
        return float(f'{x:0.2f}')


class IntegralDataset(Baseset):
    ''' For training and testing of the deep learning based integration. '''

    def __init__(self, folders: List[str], h: int, w: int, 
        sequence_len=5, transform=None):
        super().__init__(folders, h, w, transform)
        self.sequence_len = sequence_len

    def __getitem__(self, idx):
        dict_ = self.data[idx]  # select lane

        # randomly select adjacent images


        if self.transform is not None:
            # data augmentation on images
            image_sequence, integral_image = self.transform(image_sequence, integral_image)
        
        return image_sequence, integral_image


class Testset(Baseset):

    def __init__(self, folders: List[str], h: int, w: int, transform=None):
        super().__init__(folders, h, w, transform)

    def __getitem__(self, idx):
        return None

SyntaxError: invalid syntax (<ipython-input-38-f2ea14a1fa11>, line 52)