Код для поєднання датасетів у один великий (поєднує зображення і анотації)

# Imports

In [None]:
import os
import shutil
import zipfile
import xml.etree.ElementTree as ET

# Constants

In [None]:
download_from_gdrive = True
download_dataset_name = 'main_data.zip'
file_url = 'https://drive.google.com/file/d/163a9Ee7VObzEfHA3DaLadEnvENROLooF/view?usp=sharing'

files_to_add_to_dataset = ['/content/annot_3.zip']

created_end_dataset_name = 'connected_dataset'
download_after_creating = False
set_to_drive = True
drive_path = ''

## Some preprocessing

In [None]:
if download_from_gdrive:
  files_to_add_to_dataset += [os.path.join('/content', download_dataset_name)]

created_end_dataset_name_with_zip = created_end_dataset_name + '.zip'
created_end_dataset_path = os.path.join('/content', created_end_dataset_name_with_zip)

# Download data

In [None]:
import gdown
import re

In [None]:
def convert_drive_link(original_link):
  if "https://drive.google.com/uc?id=" in original_link:
    return original_link
  original_link = original_link.replace('?usp=sharing', '').replace('?usp=drive_link', '')
  pattern = r"https://drive\.google\.com/file/d/([a-zA-Z0-9_-]+)/view"

  matcher = re.match(pattern, original_link)

  if matcher:
    file_id = matcher.group(1)
    converted_link = f"https://drive.google.com/uc?id={file_id}"
    return converted_link
  else:
    raise Exception(f"Not realized Google Drive link format.\nGiven link is {original_link}")
    return None


def install_from_google_drive(link, name, path=None, force_download = False):
  full_path = name
  if path is not None:
    full_path = os.path.join(path, full_path)
  if not force_download:
    if os.path.exists(full_path):
      print('The data already exists')
      return

  print('Start downloading')
  gdown.download(convert_drive_link(link), full_path, quiet=False)
  print('\nDownloading have ended')

In [None]:
if download_from_gdrive:
  install_from_google_drive(file_url, download_dataset_name)

Start downloading


Downloading...
From (original): https://drive.google.com/uc?id=163a9Ee7VObzEfHA3DaLadEnvENROLooF
From (redirected): https://drive.google.com/uc?id=163a9Ee7VObzEfHA3DaLadEnvENROLooF&confirm=t&uuid=70a6606d-7f26-4bbb-8cfc-aead9dfe9511
To: /content/main_data.zip
100%|██████████| 95.7M/95.7M [00:01<00:00, 55.9MB/s]


Downloading have ended





# Some functions

In [None]:
def indent(elem, level=0):
  i = "\n" + level*"  "
  if len(elem):
    if not elem.text or not elem.text.strip():
      elem.text = i + "  "
    if not elem.tail or not elem.tail.strip():
      elem.tail = i
    for elem in elem:
      indent(elem, level+1)
    if not elem.tail or not elem.tail.strip():
      elem.tail = i
  else:
    if level and (not elem.tail or not elem.tail.strip()):
      elem.tail = i

def get_max_id(root):
    """Get the maximum value of the id attribute among image elements."""
    max_id = 0
    for image in root.findall("image"):
        image_id = int(image.get("id"))
        if image_id > max_id:
            max_id = image_id
    return max_id

def add_anotations_to_element_tree(new_root, path_to_annotations, image_dir_name='image'):
  # Parse the input XML file
  ident = get_max_id(new_root)
  tree = ET.parse(path_to_annotations)
  root = tree.getroot()

  # Copy image elements from the original file to the new file
  for image in root.findall("image"):
      new_image = ET.SubElement(new_root, "image")
      for attr in image.attrib:
          if attr == 'id':
            ident += 1
            new_image.set(attr, str(ident))
          elif attr == 'name':
            old_name = image.get(attr).split('/')
            if len(old_name) > 1:
              old_name = old_name[1:]
            new_name = [image_dir_name] + old_name
            new_name = '/'.join(new_name)
            new_image.set(attr, new_name)
          else:
            new_image.set(attr, image.get(attr))

      # Copy polygon elements from the original image to the new image
      for polygon in image.findall("polygon"):
          new_polygon = ET.SubElement(new_image, "polygon")
          for attr in polygon.attrib:
              new_polygon.set(attr, polygon.get(attr))

In [None]:
def copy_files(source_dir, dest_dir, verbose=True):
    # Iterate through files in the source directory
    copy_num, exist_num = 0, 0
    for filename in os.listdir(source_dir):
        source_file = os.path.join(source_dir, filename)
        dest_file = os.path.join(dest_dir, filename)

        # Check if the file already exists in the destination directory
        if not os.path.exists(dest_file):
            # Copy the file from source to destination
            shutil.copy2(source_file, dest_dir)
            copy_num += 1
        else:
            if verbose:
                print(f"File '{filename}' already exists in '{dest_dir}', skipping.")
                exist_num += 1
    if verbose:
        print(f"Copied num: {copy_num}, already exists: {exist_num}")


In [None]:
def zip_folder(folder_path, zip_path):
    # Create a ZipFile object in write mode
    with zipfile.ZipFile(zip_path, 'w') as zipf:
        # Walk through all the files and subdirectories in the given folder
        for root, _, files in os.walk(folder_path):
            # Iterate through each file
            for file in files:
                # Get the full path of the file
                file_path = os.path.join(root, file)
                # Add the file to the zip archive
                zipf.write(file_path, os.path.relpath(file_path, folder_path))

def extract_zip(zip_file, extract_to):
    # Create a ZipFile object in read mode
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        # Extract all contents of the zip file to the specified directory
        zip_ref.extractall(extract_to)

In [None]:
def remove_shit_if_exist(path_to_dir, shit_files=['.DS_Store'], shit_dirs=['__MACOSX']):
  dir_files = os.listdir(path_to_dir)
  for sh in shit_files:
    if sh in dir_files:
      sh_path = os.path.join(path_to_dir, sh)
      os.remove(sh_path)

  for sh in shit_dirs:
    if sh in dir_files:
      sh_path = os.path.join(path_to_dir, sh)
      shutil.rmtree(sh_path)


In [None]:
def create_big_dataset_from_parts(path_to_folders, dataset_name='connected_dataset', for_extracted='exctracted', to_zip=False, force=False, verbose=True):
  images_dataset_path = os.path.join(dataset_name, 'image')
  annotations_dataset_path = os.path.join(dataset_name, 'annotations.xml')
  new_root = ET.Element("annotations")

  if not os.path.exists(dataset_name):
    os.mkdir(dataset_name)
    os.mkdir(images_dataset_path)
  elif force:
    shutil.rmtree(dataset_name)

    os.mkdir(dataset_name)
    os.mkdir(images_dataset_path)
  else:
    daset_dir_elements = os.listdir(dataset_name)
    if len(daset_dir_elements) == 0:
      os.mkdir(images_dataset_path)
    elif len(daset_dir_elements) == 1:
      raise Exception(f'Folder {dataset_name} has 1 element, not realized yet')
    elif len(daset_dir_elements) == 2:
      # TODO: realize it !!!
      raise Exception(f'Folder {dataset_name} has 2 elements, not realized yet')
    else:
      raise Exception(f'Folder {dataset_name} has more than 2 elements, not realized yet')

  for path_to_folder in path_to_folders:
    if path_to_folder.endswith('.zip'):
      folder_name = path_to_folder.split('/')[-1][:-4]
      path_to_zip = os.path.join(for_extracted, folder_name)
      extract_zip(path_to_folder, path_to_zip)
      remove_shit_if_exist(path_to_zip)
      if len(os.listdir(path_to_zip)) == 1:
        some_crutch_path = os.path.join(path_to_zip, os.listdir(path_to_zip)[0])
        remove_shit_if_exist(some_crutch_path)
        inner_folder = os.listdir(some_crutch_path)
        if len(inner_folder) == 2:
          path_to_zip = some_crutch_path

      path_to_folder = path_to_zip
    remove_shit_if_exist(path_to_folder)
    folder_files = os.listdir(path_to_folder)
    if len(folder_files) > 2:
      raise Exception(f'Folder {path_to_folder} has more than 2 elements: {folder_files}\nIt must has only folder with images + xml with annotations')
    elif len(folder_files) < 2:
      raise Exception(f'Folder {path_to_folder} has less than 2 elements: {folder_files}\nIt must has folder with images + xml with annotations')

    annot_file = [i for i in folder_files if i.endswith('.xml')]
    if len(annot_file) > 1:
      raise Exception(f'Folder {path_to_folder} has more than 1 xml: {annot_file}')
    elif len(annot_file) < 1:
      raise Exception(f'Folder {path_to_folder} hasn`t xml with annotations')

    annot_file = os.path.join(path_to_folder, annot_file[0])

    folder_with_images = [i for i in folder_files if not i.endswith('.xml')][0]
    folder_with_images = os.path.join(path_to_folder, folder_with_images)

    copy_files(folder_with_images, images_dataset_path, verbose=verbose)
    add_anotations_to_element_tree(new_root, annot_file)

    if verbose:
      print(f'Dataset {path_to_folder} added')

  indent(new_root)
  new_tree = ET.ElementTree(new_root)
  new_tree.write(annotations_dataset_path, encoding="utf-8", xml_declaration=True)

  if to_zip:
    zip_folder(dataset_name, dataset_name + '.zip')

# Conecting

In [None]:
create_big_dataset_from_parts(files_to_add_to_dataset, created_end_dataset_name, to_zip=True, force=True)

Copied num: 56, already exists: 0
Dataset exctracted/annot_3 added
Copied num: 267, already exists: 0
Dataset exctracted/main_data added


## Download

In [None]:
from google.colab import files
if download_after_creating:
  files.download(created_end_dataset_name_with_zip)

## Set to gd

In [None]:
from google.colab import drive
if set_to_drive:
  drive.mount('/content/drive')
  data_gd_path = os.path.join(drive_path, created_end_dataset_name_with_zip)
  if not os.path.exists(drive_path):
    raise Exception(f'Can`t find gd path {drive_path}')
  if os.path.exists(data_gd_path):
    os.remove(data_gd_path)

  shutil.copy2(created_end_dataset_path, data_gd_path)

Mounted at /content/drive
