In [1]:
import argparse
import json
import os
import shutil
import urllib
from concurrent.futures import ThreadPoolExecutor
import time

import pandas as pd
from dwca.read import DwCAReader

import logging
import datetime


In [2]:
# Define the arguments
home_dir = os.getcwd()

# data_dir = "/bask/homes/r/rybf4168/vjgo8416-amber/data/gbif-species-trainer-AMI-fork"
data_dir = "/Users/lbokeria/Documents/projects/gbif-species-trainer-data"

species_checklist_path = os.path.join(home_dir,"species_checklists","uksi-moths-keys-nodup-small.csv")

dwca_occurrence_df_path = os.path.join(data_dir,"occurrence_dataframes")

dwca_multimedia_df_path = os.path.join(data_dir,"dwca_files","multimedia_lepidoptera.csv")

output_location_path = os.path.join(data_dir,"gbif_images","sandbox")

In [3]:
# Read the multimedia file
media_df = pd.read_csv(dwca_multimedia_df_path)

In [4]:
# read species list
moth_data = pd.read_csv(species_checklist_path)

taxon_keys = list(moth_data["accepted_taxon_key"])
taxon_keys = [int(taxon) for taxon in taxon_keys]

In [5]:
def fetch_meta_data(data: pd.DataFrame):
    """returns the relevant metadata for a GBIF observation"""

    fields = [
        "decimalLatitude",
        "decimalLongitude",
        "order",
        "family",
        "genus",
        "species",
        "acceptedScientificName",
        "year",
        "month",
        "day",
        "datasetName",
        "taxonID",
        "acceptedTaxonKey",
        "lifeStage",
        "basisOfRecord",
    ]

    meta_data = {}

    for field in fields:
        if pd.isna(data[field]):
            meta_data[field] = "NA"
        else:
            meta_data[field] = data[field]

    return meta_data

In [6]:
# for count, i_taxon_key in enumerate(taxon_keys):
    
#     fetch_image_data(i_taxon_key)
    
# print("Finished downloading for the given list!", flush=True)


In [7]:
def fetch_image_data(i_taxon_key: int):
    global skip_non_adults
    
    species_meta_data = {}

    # get taxa information specific to the species
    taxon_data = moth_data[moth_data["accepted_taxon_key"] == i_taxon_key]

    family_name         = taxon_data["family_name"].item()
    genus_name          = taxon_data["genus_name"].item()
    species_name        = taxon_data["gbif_species_name"].item()
    write_location      = os.path.join(output_location_path,family_name,genus_name,species_name)

    # Read the occurrence dataframe
    if os.path.isfile(os.path.join(dwca_occurrence_df_path,
                                    str(i_taxon_key) + ".csv")): 
        i_occ_df = pd.read_csv(os.path.join(dwca_occurrence_df_path,
                                            str(i_taxon_key) + ".csv"))
        total_occ = len(i_occ_df)
        print(f"Downloading for {species_name}", flush=True) 
    else:
        logger.warning(
            f"No occurrence csv file found for {species_name}, taxon key {i_taxon_key}"
            )
        return

    # creating hierarchical folder structure for image storage
    if not os.path.isdir(write_location):
        try:
            os.makedirs(write_location)
        except:
            print(f"Could not create the directory for {write_location}", flush=True)
            return
        
    image_count = 0

    if total_occ != 0:
        # print(f"{species_name} has some occurrences")
        
        for idx, row in i_occ_df.iterrows():
            
            if skip_non_adults:
                
                if (not pd.isna(row["lifeStage"])) & (row["lifeStage"] != "Adult"):
                    
                    # print("Life stage is", row["lifeStage"], "skipping...")
                
                    continue
            
            obs_id = row["id"]

            # check occurrence entry in media dataframe
            try:
                media_entry = media_df.loc[media_df["coreid"] == obs_id]

                if not media_entry.empty:
                    # print(f"{species_name} has some media")
                          
                    if len(media_entry) > 1:  # multiple images for an observation
                        media_entry = media_entry.iloc[0, :]
                        image_url = media_entry["identifier"]
                    else:
                        image_url = media_entry["identifier"].item()
                else:
                    
                    # print(f"{species_name} has NO media")
                    continue
                    
            except Exception as e:
                print(e, flush=True)
                continue

            # download image
            if os.path.isfile(write_location + "/" + str(obs_id) + ".jpg"):
                image_count += 1
            else:
                try:
                    urllib.request.urlretrieve(
                        image_url, write_location + "/" + str(obs_id) + ".jpg"
                    )
                    image_count += 1
                    # m_data = fetch_meta_data(row)
                    # meta_data[str(obs_id) + ".jpg"] = m_data
                except:
                    print(f"Error downloading URL: '{image_url}'")
                    continue
                
            # Get meta data for this occurrence
            occ_meta_data = fetch_meta_data(row)
            species_meta_data[str(obs_id) + ".jpg"] = occ_meta_data
            
            if image_count >= max_data_sp:
                break
            
        # Dump metadata
        with open(write_location + "/" + "meta_data.json", "w") as outfile:
            json.dump(species_meta_data, outfile)            
            
    print(f"Downloading complete for {species_name} with {image_count} images.",
            flush=True)
    
    return
    

In [8]:
# Then, define a function that manages the parallel execution:
def download_images_concurrently(taxon_keys,use_parallel,n_workers):
    
    begin = time.time()

    
    if use_parallel:
        with ThreadPoolExecutor(max_workers=n_workers) as executor:

            # You can use the executor to parallelize your function call:
            results = list(executor.map(fetch_image_data, taxon_keys))
    
    else:
       
        for i_taxon_key in taxon_keys:
            print(f"Calling for {i_taxon_key}")
            fetch_image_data(i_taxon_key)
   

    end = time.time()
            
    print("Finished downloading for the given list! Time taken:", 
          round(end - begin), 
          "seconds",
          flush=True)

In [9]:
def setup_logger():
    
    # Specify the directory where you want to save the log files
    log_dir = "log_files"

    # Ensure the directory exists
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    # Use the timestamp string to create a unique filename for the log file
    timestamp    = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    log_filename = os.path.join(log_dir, f'download_log_{timestamp}.log')
    
    # Get the root logger
    logger = logging.getLogger()
    
    # If logger has handlers, clear them
    for handler in logger.handlers[:]:
        handler.close()
        logger.removeHandler(handler)
    
    # Configure the logger
    logging.basicConfig(filename=log_filename, level=logging.INFO,
                        format='%(asctime)s - %(levelname)s - %(message)s')


In [10]:
# Setup logger
setup_logger()
logger = logging.getLogger()

# Start the run
n_workers = 5    
use_parallel = True
max_data_sp = 100
skip_non_adults = True

# Lastly, call the function with your taxon keys:
download_images_concurrently(taxon_keys,use_parallel,n_workers)

Downloading for Pyropteron chrysidiformis
Downloading for Paranthrene tabaniformis
Downloading for Bembecia ichneumoniformis
Downloading for Pennisetia hylaeiformis
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Larva skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Downloading for Sesia apiformis
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Life stage is Imago skipping...
Lif

In [None]:
i_taxon_key = 1940838

i_occ_df = pd.read_csv(os.path.join(dwca_occurrence_df_path,
                                    str(i_taxon_key) + ".csv"))

In [None]:
i_occ_df