In [1]:
import boto3
import uuid
import labelbox as lb
from tqdm import tqdm
from API_key_personal import PERSONAL_API_KEY
from pprint import pprint

# Clients
s3 = boto3.client('s3')

labelbox = lb.Client(PERSONAL_API_KEY) 

# Bucket where pictures are stored
bucket_name = "lb-mlse"

# Dataset name
dataset_name = "Animal_ML"

paginator = s3.get_paginator("list_objects_v2")


pages = paginator.paginate(Bucket=bucket_name, Prefix="animals")


dataset = list()

for page in pages:
    for obj in page["Contents"]:
        object_key = obj["Key"]

        object_url = f'https://{bucket_name}.s3.us-east-1.amazonaws.com/{object_key}'
        #https://lb-mlse.s3.us-east-1.amazonaws.com/video/video-168
        
        data = dict()
        data["row_data"] = object_url
        data["global_key"] = str(uuid.uuid4())
        data["media_type"] = "IMAGE"
        
        dataset.append(data)
    

In [2]:
print(len(dataset))
pprint(dataset[0])

26179
{'global_key': '03ce405a-46d3-4df5-a0c5-ae2165ca454e',
 'media_type': 'IMAGE',
 'row_data': 'https://lb-mlse.s3.us-east-1.amazonaws.com/animals/image-0'}


In [3]:
from concurrent.futures import ThreadPoolExecutor
#Import function
def upload_data_rows_threading(assets: list[dict[str:str]], dataset: lb.Dataset, batch_size: int) -> None:
    """Upload a data rows to Labelbox in parallel

    :param assets: List of data row objects
    :param dataset: Labelbox dataset to upload 
    :param object_name: Amount of data rows per parallel operation
    """

    payload = [assets[i:i+batch_size] for i in range(0, len(assets), batch_size)]

    def threading_callback(assets: list[dict[str:str]]):
        try:
            task = dataset.create_data_rows(assets)
            task.wait_till_done(1800)
        
            if task.errors:
                print(task.errors)
            
            return task
        except:
            return task

    with ThreadPoolExecutor(max_workers=3) as executer:
        results:list[lb.Task] = list(tqdm(executer.map(threading_callback, payload), total=len(payload), colour="red", desc="Upload Data Rows"))
    
    in_progress_lst: list[lb.Task] = []
    error_lst: list[lb.Task] = []
    success_lst: list[lb.Task] = []
    for result in results:
        status = result.status
        if status == "IN_PROGRESS":
            in_progress_lst.append(result)
        elif status == "FAILED":
            error_lst.append(result)
        else:
            success_lst.append(result)

    
    print(f"Errors: {len(error_lst)}")
    print(f"Success: {len(success_lst)}")

    if error_lst:
        error = []
        for task in error_lst:
            print(task.errors)
            error.append(task.failed_data_rows)
        with open("errors.txt", "a") as file:
            file.write(str(error))
    elif in_progress_lst:
        for task in in_progress_lst:
            with open("./task_bug.txt", "a") as file:
                file.write(task.uid)

In [6]:
# Upload

labelbox_dataset = labelbox.create_dataset(name=dataset_name)
upload_data_rows_threading(dataset, labelbox_dataset, 3500)

Upload Data Rows:   0%|[31m          [0m| 0/8 [01:38<?, ?it/s]
