In [80]:
import os
import yaml
import random
from dvc.api import DVCFileSystem

annotation_file = "annotations.yaml"

def build_directory_structure(annotation_dict):
    types = {}
    for annotation_list in annotation_dict.values():
        for entry in annotation_list:
            types[entry["type"]] = 1
    for type in types:
        test_dir = f"data/test/{type}"
        train_dir = f"data/train/{type}"
        if not os.path.exists(test_dir):
            os.makedirs(test_dir)
        if not os.path.exists(train_dir):
            os.makedirs(train_dir)

def frame_to_annotation(annotation_list):
    frame2annot = {}
    for annotation in annotation_list:
        for i in range(annotation["start"], annotation["end"] + 1): #range stop exclusive
            frame2annot[str(i)] = annotation["position"]
    return frame2annot

In [81]:
with open(annotation_file, 'r') as file:
    annotations = yaml.safe_load(file)
data_version = annotations["data_version"]
dvc_fs = DVCFileSystem("..", rev = data_version)
dvc_file_list = dvc_fs.find("/DLCModel/videos/", detail=False, dvc_only=True)

In [84]:
all_data = []
for video in annotations["videos"]:
    frame_annotations = frame_to_annotation(video["annotations"])
    # find the csv file in dvc_fs
    base_name = video['file'].replace(".avi","")
    print(f"Processing {base_name}")
    base_name = base_name + "DLC"
    csv_files = [x for x in dvc_file_list if (base_name in x and x.endswith(".csv"))]
    if len(csv_files) == 0:
        print(f"No annotation for {video}")
        next(video)
    with dvc_fs.open(csv_files[0], "r") as fh:
        for _ in range(3): # skip three headers lines
            next(fh)
        for line in fh:
            line = line.rstrip("\n")
            frame, *posValues = line.split(",")
            if frame in frame_annotations:
                current_data = [frame_annotations[frame]] + posValues
                all_data.append(current_data)

Processing testDogVideo_raw
Processing testDogVideo_raw02100840


In [85]:
testFraction = 0.2 # 20 % of all data reserved for testing
random.shuffle(all_data)
indexPosition = int(len(all_data) * (1 - testFraction))
print(f"{len(all_data)} data sets, using {indexPosition} for training")
trainData, testData = (all_data[0:indexPosition], all_data[indexPosition:])

2498 data sets, using 1998 for training


In [86]:
trainFile = "train.data"
testFile = "test.data"

def writeData(filename, data):
    with open(filename, "w") as fh:
        for entry in data :
            fh.write("\t".join(entry) + "\n")

writeData(trainFile, trainData)
writeData(testFile, testData)