In [7]:
import numpy as np
from PIL import Image
import json

# define the path to the TuSimple dataset
data_dir = 'D:/Data/TUSimple/clips/'

# function to read the training data and annotations
def read_training_data(data_dir):
    X = []
    Y = []

    # iterate over the training data
    for i in range(1, 3617):
        # read the image file
        img_file = f"{data_dir}/clips/0531/{i:04d}/img/0000.jpg"
        img = np.array(Image.open(img_file))

        # read the annotation file
        annotation_file = f"{data_dir}/clips/0531/{i:04d}/anno/0000.json"
        with open(annotation_file, 'r') as f:
            annotation = json.load(f)

        # extract the lane markings from the annotation
        lane_markings = np.zeros_like(img[:,:,0])
        for lane in annotation['lanes']:
            if lane:
                lane_points = np.array(lane)
                lane_points = np.transpose(np.vstack((lane_points, np.arange(len(lane_points)))))
                lane_points = lane_points[np.all(lane_points > 0, axis=1)]
                lane_points = lane_points[np.all(lane_points < np.array(img.shape[:2]).reshape(1,2), axis=1)]
                lane_points = lane_points.astype(np.int32)
                lane_markings[lane_points[:, 1], lane_points[:, 0]] = 1

        # append the image and label to the training data
        X.append(img)
        Y.append(lane_markings)

    # convert the training data to numpy arrays
    X = np.array(X)
    Y = np.array(Y)

    return X, Y

# function to normalize the training data
def normalize(X):
    X = X / 255.0
    X = X.astype(np.float32)
    return X

# function to perform data augmentation
def augment(X, Y):
    # flip the images horizontally
    X_flip = np.flip(X, axis=2)
    Y_flip = np.flip(Y, axis=2)

    # concatenate the original and flipped images
    X_aug = np.concatenate((X, X_flip), axis=0)
    Y_aug = np.concatenate((Y, Y_flip), axis=0)

    return X_aug, Y_aug

# function to load the data
def load_data():
    # read the training data and annotations
    X_train, Y_train = read_training_data(data_dir)

    # normalize the training data
    X_train = normalize(X_train)

    # perform data augmentation
    X_train, Y_train = augment(X_train, Y_train)

    return X_train, Y_train

In [8]:
read_training_data(data_dir)

FileNotFoundError: [Errno 2] No such file or directory: 'D:/Data/TUSimple/clips//clips/0531/0001/img/0000.jpg'

In [10]:
import json
import numpy as np
import cv2
import os
import argparse

TRAIN_SET = ['label_data_0313.json', 'label_data_0601.json']
VAL_SET = ['label_data_0531.json']
TRAIN_VAL_SET = TRAIN_SET + VAL_SET
TEST_SET = ['test_label.json']

def gen_label_for_json(args, image_set):
    H, W = 720, 1280
    SEG_WIDTH = 30
    save_dir = args.savedir

    os.makedirs(os.path.join(args.root, args.savedir, "list"), exist_ok=True)
    list_f = open(os.path.join(args.root, args.savedir, "list", "{}_gt.txt".format(image_set)), "w")

    json_path = os.path.join(args.root, args.savedir, "{}.json".format(image_set))
    with open(json_path) as f:
        for line in f:
            label = json.loads(line)
            # ---------- clean and sort lanes -------------
            lanes = []
            _lanes = []
            slope = [] # identify 0th, 1st, 2nd, 3rd, 4th, 5th lane through slope
            for i in range(len(label['lanes'])):
                l = [(x, y) for x, y in zip(label['lanes'][i], label['h_samples']) if x >= 0]
                if (len(l)>1):
                    _lanes.append(l)
                    slope.append(np.arctan2(l[-1][1]-l[0][1], l[0][0]-l[-1][0]) / np.pi * 180)
            _lanes = [_lanes[i] for i in np.argsort(slope)]
            slope = [slope[i] for i in np.argsort(slope)]

            idx = [None for i in range(6)]
            for i in range(len(slope)):
                if slope[i] <= 90:
                    idx[2] = i
                    idx[1] = i-1 if i > 0 else None
                    idx[0] = i-2 if i > 1 else None
                else:
                    idx[3] = i
                    idx[4] = i+1 if i+1 < len(slope) else None
                    idx[5] = i+2 if i+2 < len(slope) else None
                    break
            for i in range(6):
                lanes.append([] if idx[i] is None else _lanes[idx[i]])

            # ---------------------------------------------

            img_path = label['raw_file']
            seg_img = np.zeros((H, W, 3))
            list_str = []  # str to be written to list.txt
            for i in range(len(lanes)):
                coords = lanes[i]
                if len(coords) < 4:
                    list_str.append('0')
                    continue
                for j in range(len(coords)-1):
                    cv2.line(seg_img, coords[j], coords[j+1], (i+1, i+1, i+1), SEG_WIDTH//2)
                list_str.append('1')

            seg_path = img_path.split("/")
            seg_path, img_name = os.path.join(args.root, args.savedir, seg_path[1], seg_path[2]), seg_path[3]
            os.makedirs(seg_path, exist_ok=True)
            seg_path = os.path.join(seg_path, img_name[:-3]+"png")
            cv2.imwrite(seg_path, seg_img)

            seg_path = "/".join([args.savedir, *img_path.split("/")[1:3], img_name[:-3]+"png"])
            if seg_path[0] != '/':
                seg_path = '/' + seg_path
            if img_path[0] != '/':
                img_path = '/' + img_path
            list_str.insert(0, seg_path)
            list_str.insert(0, img_path)
            list_str = " ".join(list_str) + "\n"
            list_f.write(list_str)


def generate_json_file(save_dir, json_file, image_set):
    with open(os.path.join(save_dir, json_file), "w") as outfile:
        for json_name in (image_set):
            with open(os.path.join(args.root, json_name)) as infile:
                for line in infile:
                    outfile.write(line)

def generate_label(args):
    save_dir = "D:/Data/TUSimple/Labels"
    os.makedirs(save_dir, exist_ok=True)
    generate_json_file(save_dir, "train_val.json", TRAIN_VAL_SET)
    generate_json_file(save_dir, "test.json", TEST_SET)

    print("generating train_val set...")
    gen_label_for_json(args, 'train_val')
    print("generating test set...")
    gen_label_for_json(args, 'test')

In [11]:
import json
import os

data_dir = 'D:/Data/TUSimple'

In [12]:
for clip in os.listdir(os.path.join(data_dir, 'clips')):#access ../clips clip=imgfloder
    for img_folder in os.listdir(os.path.join(data_dir, 'clips', clip)): #../clips/imgfloder
        img_folder_path = os.path.join(data_dir, 'clips', clip, img_folder)
        for img_file in os.listdir(img_folder_path):
            img_path = os.path.join(img_folder_path, img_file)

            # create a new annotation dictionary
            annotation = {}
            annotation['lanes'] = []

            # write the annotation file
            annotation_file = os.path.join(data_dir, 'clips', clip, 'anno', img_folder, img_file.replace('.jpg', '.json'))
            with open(annotation_file, 'w') as f:
                json.dump(annotation, f)

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'D:/Data/TUSimple\\clips\\0313-1\\img'

In [13]:
import json


In [17]:
import pandas as pd

df = pd.read_json("D:/Data/TUSimple/label_data_0313.json")

ValueError: Trailing data