# Notebook overview
Downloads or copies images referenced in CSV metadata files for the binary GBIF dataset, using a local cache when possible and concurrent downloads for missing images.

- Selects relevant CSVs files in a folder and processes each file
- Copies available images from a cache; downloads missing images with ThreadPoolExecutor
- Adds tracking columns ('image_downloaded', 'image_download_fail_reason') and saves updated CSVs
- Handles hostname-specific SSL verification

The notebook was exported as a Python script and run in a console using Tmux to execute it.

In [None]:
import pandas as pd

import os
import shutil
from typing import Optional
import requests
from urllib.parse import urlparse

from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess

In [None]:
# Number of Threads
MAX_WORKERS = 12

# Folder with CSV files (selects all relevant csv files in the folder - function select_files)
CSV_FOLDER_PATH = r'/home/jleick/masterArbeitProjekt/data/ami_dataset/ami_gbif/fine-grained_classification/metadata'

# Folder to save the downloaded pictures (creates a folder for each csv file)
DESTINATION_PATH = '/home/jleick/masterArbeitProjekt/data/ami_download/ami_gbif/fetched_images_binary'

# Cache folder for available pictures
CACHE = r'/home/jleick/masterArbeitProjekt/data/ami_download/ami_gbif/cached_images'

In [None]:
### Select csv files in given folder Path

def select_files(csv_folder_path: str):
    folder_contains = os.listdir(csv_folder_path)
    folder_filtered = []
    # Filter all relevant files
    for filename in folder_contains:
        if ('.csv' in filename and 'all' in filename):
            folder_filtered.append(filename)

    # Remove the .csv files from the selection for which a 'download.csv' file already exists.
    for filename in folder_contains:
        if ('.csv' in filename and 'all' in filename and 'download' in filename):
            filename = filename.replace('_download.csv', '.csv')
            if filename in folder_filtered:
                folder_filtered.remove(filename)
    
    return folder_filtered

#call funktion
folder_filtered = select_files(CSV_FOLDER_PATH)

# Print all selected files
for folder in folder_filtered:
    print(folder)

# DUBLICATED CODE (CODE EXIST IN OTHER FILE TOO)

04_ami-gbif_fine-grained_all_test.csv
04_ami-gbif_fine-grained_all_val.csv
04_ami-gbif_fine-grained_all_train.csv


In [None]:
### load .csv file into pandas DataFrame (optional with number of rows for testing)

def load_data(data_path: str, nrows: Optional[int] = None):
    # Load data into Pandas DataFrame
    df = pd.read_csv(data_path, sep=',', header='infer', nrows=nrows)
    # print(f'>>> {df.shape} - Shape of: {data_path.split("/")[-1]}')

    # Print Columns with only NaN in Column
    # empty_columns = df.columns[df.isna().all()]
    # print(f'>>> column: {list(empty_columns)} contains only NaN')

    # Check if df contains duplicated URLs
    if not df['identifier'].is_unique:
        duplicates = df['identifier'].duplicated(keep=False).sum()
        print(f'Duplicate: {duplicates} are included in identifier')
        #raise Exception('Duplicates are included in identifier') # assert url_list.is_unique

    return df

### Test function
# test_path = os.path.join(CSV_FOLDER_PATH, folder_filtered[0])
# data = load_data(test_path, 5)
# data

# DUBLICATED CODE (CODE EXIST IN OTHER FILE TOO)

In [None]:
### add column 'image_downloaded' and 'image_download_fail_reason' to df to track download

def add_download_Columns_to_df(df: pd.DataFrame):

    column_name_download = 'image_downloaded'
    if column_name_download not in df.columns:
        df[column_name_download] = False
        print(f'>>> {df.shape} - Added column: {column_name_download}')

    column_name_download_fail = 'image_download_fail_reason'
    if column_name_download_fail not in df.columns:
        df[column_name_download_fail] = False
        print(f'>>> {df.shape} - Added column: {column_name_download_fail}')
    
    return column_name_download, column_name_download_fail

### Test function
# column_name_download, column_name_download_fail = add_download_Columns_to_df(data)
# print( f'return: {column_name_download}' )
# print( f'return: {column_name_download_fail}' )
# data

In [None]:
### copy availables images from cache to destination

def copy_availabe_images(df: pd.DataFrame, source_dir: str, destination_dir: str, column_name_download: str, column_name_download_fail: str):
    for index, image_pfad in df['image_path'].items():
        source_path = os.path.join(source_dir, image_pfad)

        if os.path.exists(source_path):
            try:
                destination_path = os.path.join(destination_dir, image_pfad)
                if not os.path.exists(destination_path):
                    os.makedirs(os.path.dirname(destination_path), exist_ok=True)
                    shutil.copyfile(source_path, destination_path)

                    df.at[index, column_name_download] = True
                    df.at[index, column_name_download_fail] = 'no error'
                    print(f'Image (at index: {index}) found and copied: {image_pfad}')
                else:
                    df.at[index, column_name_download] = True
                    df.at[index, column_name_download_fail] = 'no error'
                    print(f'Image (at index: {index}) exist already - no copy necessary : {image_pfad}')
            except Exception as e:
                print(f'Error Image (at index: {index}) copying image {image_pfad}: {e}')

In [None]:
### download missing images

def download_image(index: int, df: pd.DataFrame, destination_dir, column_name_download: str, column_name_download_fail: str):
    if not df.at[index, column_name_download]:
        url = df.at[index, 'identifier']
        image_path = df.at[index, 'image_path']
        destination_path = os.path.join(destination_dir, image_path)
        os.makedirs(os.path.dirname(destination_path), exist_ok=True)

        ### Start - Addition to deactivate verification for specific hostnames
        problematic_host = 'monarch.calacademy.org' # Addition to deactivate verification for specific hostnames

        try:
            # Analyze the URL to get the hostname
            parsed_url = urlparse(url)
            hostname = parsed_url.netloc # for example 'monarch.calacademy.org'

            # Set the verify parameter based on the hostname
            should_verify = True # Always verify by default
            if hostname == problematic_host:
                should_verify = False # Deactivate only for the problematic domain
                print(f' Deaktiviere SSL-Verifizierung fÃ¼r {url}')
            
            with requests.get(url, stream=True, timeout=300, verify=should_verify) as response:
            ### Ende - Addition to deactivate verification for specific hostnames

                response.raise_for_status()
#               with open(destination_path, 'wb') as file:
#                   shutil.copyfileobj(response.raw, file)
                with open(destination_path, 'wb') as file:
                    for chunk in response.iter_content(chunk_size=8192):
                        file.write(chunk)

            df.at[index, 'image_downloaded'] = True
            df.at[index, 'image_download_fail_reason'] = 'no error'
            print(f'{index}: Successful download {url}')
        except Exception as e:
            df.at[index, 'image_downloaded'] = False
            df.at[index, 'image_download_fail_reason'] = f'{e}'
            print(f'{index}: Failed download {url}: {e}')
        return image_path
    return

In [36]:
### Execute download_image function with ThreadPoolExecutor

def download_images_with_executorpool(max_workers: int, download_image: callable, df: pd.DataFrame, destination: str, column_name_download: str, column_name_download_fail: str):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
         futures = [executor.submit(download_image, index, df, destination, column_name_download, column_name_download_fail) for index in df.index]
         for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f'Error while retreving result from future: {e}')

In [37]:
### RUN - call functions to process file

def process_file(filename: str, csv_folder_path: str, destination_path: str, CACHE: str, MAX_WORKERS: int):
    # create file paths from filename
    DATA = os.path.join(csv_folder_path, filename)
    if ('download' not in filename):
        DATA_UPDATE = DATA.replace('.csv', '_download.csv')
    else:
        DATA_UPDATE = DATA

    if ('download' in filename):
        filename = filename.replace('_download.csv', '.csv')
    DESTINATION = os.path.join(destination_path, filename.replace('.csv', '/'))
    print('-------------Variables----------------')
    print(f'>>> Processing CSV file from: {DATA}')
    print(f'>>> Saving CSV file to: {DATA_UPDATE}')
    print(f'>>> Destination folder: {DESTINATION}')
    print(f'>>> Cache folder: {CACHE}')

    # process file
    print(f'>>> Load data and add column to track existing downloads')
    df = load_data(DATA, 5)
    column_name_download, column_name_download_fail = add_download_Columns_to_df(df)

    print(f'>>> Starting copy availabe images from cache to destination')
    copy_availabe_images(df, CACHE, DESTINATION, column_name_download, column_name_download_fail)

    print(f'>>> Starting downloading images with {MAX_WORKERS} workers')
    download_images_with_executorpool(MAX_WORKERS, download_image, df, DESTINATION, column_name_download, column_name_download_fail)

    print(f'>>> Save updated Dataframe to: {DATA_UPDATE}')
    df.to_csv(DATA_UPDATE, na_rep="NULL", index=False)

    #print('>>> Copy files to cache if not available')
    #subprocess.run(f'rsync -av --ignore-existing {DESTINATION} {CACHE}', shell=True, check=True)
    # !rsync -av --ignore-existing {DESTINATION} {CACHE}

    print(f'>>> FINISH Process: {filename}\n\n')


### call funktions
for filename in folder_filtered:
    process_file(filename, CSV_FOLDER_PATH, DESTINATION_PATH, CACHE, MAX_WORKERS)

-------------Variables----------------
>>> Processing CSV file from: /home/jleick/masterArbeitProjekt/data/ami_dataset/ami_gbif/fine-grained_classification/metadata/04_ami-gbif_fine-grained_all_test.csv
>>> Saving CSV file to: /home/jleick/masterArbeitProjekt/data/ami_dataset/ami_gbif/fine-grained_classification/metadata/04_ami-gbif_fine-grained_all_test_download.csv
>>> Destination folder: /home/jleick/masterArbeitProjekt/data/ami_download_temp/ami_gbif/fetched_images_fine-grained/04_ami-gbif_fine-grained_all_test/
>>> Cache folder: /home/jleick/masterArbeitProjekt/data/ami_download/ami_gbif/cached_images
>>> Load data and add column to track existing downloads
>>> (5, 17) - Added column: image_downloaded
>>> (5, 18) - Added column: image_download_fail_reason_1
>>> Starting copy availabe images from cache to destination
Image (at index: 0) found and copied: 50c9509d-22c7-4a22-a47d-8c48425ef4a7/1024178685.jpg
Image (at index: 1) found and copied: 50c9509d-22c7-4a22-a47d-8c48425ef4a7/10