# Explore the dwca format

In [None]:
from dwca.read import DwCAReader
from dwca.darwincore.utils import qualname as qn

import pandas as pd
import dask.dataframe as dd
from pathlib import Path

import json

import matplotlib.pyplot as plt
import numpy as np
import os
from abc import abstractmethod

from urllib.parse import unquote, urlparse
from pathlib import PurePosixPath
from PIL import Image

import asyncio, asyncssh, sys
from typing import TypedDict
from collections import defaultdict

import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
occurrences_path = Path("/home/george/codes/gbif-request/data/classif/mini/0013397-241007104925546.zip")

In [None]:
with DwCAReader(occurrences_path) as dwca:
    print(dwca.metadata)

In [None]:
dwca = DwCAReader(occurrences_path)

In [None]:
row = dwca.get_corerow_by_position(0)

Exploration first

In [None]:
lengths = []
for row in dwca:
    lengths+= [len(row.extensions)]

In [None]:
min(lengths), max(lengths)

In [None]:
plt.hist(lengths)
plt.loglog()

In [None]:
dwca.extension_files

In [None]:
dwca.core_file.file_descriptor

In [None]:
row.descriptor.__dict__

In [None]:
def pretty_print(d):
    """Pretty print for dictionary.
    """
    d = {k.split('/')[-1]:v for k, v in d.items()}
    print(json.dumps(d, indent=4))

for i in range(len(row.extensions)):
    pretty_print(row.extensions[i].data)

In [None]:
pretty_print(row.data)

In [None]:
len(list(row.data.keys())), len(list(row.extensions[-1].data.keys()))

In [None]:
for k in row.extensions[-1].data.keys():
    if k not in list(row.data.keys()):
        print(k.split('/')[-1])

Create a downloader file : merge occurrence and multimedia metadata in a Python dictionary

In [None]:
keys_mult = [
    # "gbifID",
    "type",
    "format",
    "identifier",
    "references",
    "created",
    "creator",
    "publisher",
    "license",
    "rightsHolder"
]

keys_occ = [
    "gbifID",

    # Recording metadata
    "basisOfRecord",
    "recordedBy",
    "continent",
    "countryCode",
    "stateProvince",
    "county",
    "municipality",
    "locality",
    "verbatimLocality",
    "decimalLatitude",
    "decimalLongitude",
    "coordinateUncertaintyInMeters", 
    "eventDate",
    "eventTime",

    # Copyrights metadata
    # "license",
    # "rightsHolder",


    # Individual metadata
    "sex",

    # Taxon metadata
    "acceptedNameUsageID", 
    "scientificName", 
    "kingdom", 
    "phylum", 
    "class", 
    "order", 
    "family", 
    "genus",
    "specificEpithet",
    "taxonRank",
    "taxonomicStatus",

    # Storage metadata
    "taxonKey",
    "acceptedTaxonKey",
    "datasetKey",
    "kingdomKey",
    "phylumKey",
    "classKey",
    "orderKey",
    "familyKey",
    "genusKey",
    "speciesKey",
    ]

keys_file = [
    "filename"
]

# Check if all the keys above are in the row metadata
row_keys = [k.split('/')[-1] for k in list(row.data.keys())]
for k in keys_occ:
    if k not in row_keys:
        print(k)

In [None]:
row.extensions[0].data

In [None]:
images_metadata = {}

# Add keys for occurrence and multimedia
for k in keys_occ + keys_mult:
    images_metadata[k] = []

for row in dwca:

    # The last element of the extensions is the verbatim and is (almost) a duplicate of row data
    # And is thus not needed.
    extensions = row.extensions[:-1]

    for e in extensions:
        # Do not consider empty URLs
        identifier = e.data['http://purl.org/dc/terms/identifier']

        if identifier != '':
            # Add occurrence metadata
            # This is identical for all multimedia
            for k,v in row.data.items():
                k = k.split('/')[-1]
                if k in keys_occ:
                    images_metadata[k] += [v]

            # Add extension metadata
            for k,v in e.data.items():
                k = k.split('/')[-1]
                if k in keys_mult:
                    images_metadata[k] += [v]

            # Add image name for future download
            # Hashing of the image URL


In [None]:
# Save the metadata

# Solution 1 - with pandas
output_path = occurrences_path.parent / "tmp.parquet"
pd.DataFrame(images_metadata).to_parquet(output_path, engine='pyarrow', compression='gzip')

Remove empty speciesKey and co.

In [None]:
df = pd.read_parquet(path = occurrences_path.parent / "tmp.parquet")

In [None]:
GBIF_KEYS = [
    "kingdomKey",
    "phylumKey",
    "classKey",
    "orderKey",
    "familyKey",
    "genusKey",
    "speciesKey",
]

df = df.loc[df['speciesKey'].notna() & (df['speciesKey'] != '')]

Remove duplicates

In [None]:
df = pd.read_parquet(path = occurrences_path.parent / "tmp.parquet")

In [None]:
# Remove the duplicates from images_metadata
# if a file is used several times all concerned rows are dropped.

# Solution 1 - no pandas, maybe overly complicated

# from collections import defaultdict

# def list_duplicates(seq):
#     """https://stackoverflow.com/a/5419576
#     """
#     tally = defaultdict(list)
#     for i,item in enumerate(seq):
#         tally[item].append(i)
#     return ((key,locs) for key,locs in tally.items() 
#                             if len(locs)>1)

# print(len(list(list_duplicates(images_metadata['identifier']))[0][1]))



# Solution 2 - pandas, much simpler

df = pd.DataFrame(images_metadata)
# print(df.duplicated(subset='identifier', keep=False).astype(int).sum())
df.drop_duplicates(subset='identifier', keep=False, inplace=True)
print(df.duplicated(subset='identifier', keep=False).astype(int).sum())


In [None]:
df.to_parquet(output_path, engine='pyarrow', compression='gzip')

Limit number of download per species

In [None]:
df = pd.read_parquet(path = occurrences_path.parent / "tmp.parquet")

In [None]:
len(df)

In [None]:
max_num_images_per_species = 500
df = df.groupby('taxonKey').filter(lambda x: len(x) <= max_num_images_per_species)

In [None]:
# Get the scientific name of the maximum occurence
# df.iloc[df.groupby('taxonKey').count().idxmax()]
df[df['taxonKey'] == df['taxonKey'].value_counts().idxmax()]['scientificName'].iloc[0]
# df[df['taxonKey'] == df.groupby('taxonKey')['taxonKey'].count().idxmax()]['scientificName'].iloc[0]

In [None]:
df.to_parquet(output_path, engine='pyarrow', compression='gzip')

Download the images

In [None]:
df = pd.read_parquet(path = occurrences_path.parent / "tmp.parquet")

In [None]:
urls = df.identifier
formats = df.format 
species = df.speciesKey
occs = [(u,f,s) for u,f,s in zip(urls, formats, species)]

In [None]:
occs = [(row.identifier, row.format, row.speciesKey) for row in df.itertuples(index=False)]

Check the final df

In [None]:
occ_path = Path("/home/george/codes/gbifxdl/data/classif/mini/0013397-241007104925546.parquet")
df = pd.read_parquet(occ_path)

In [None]:
len(df)

In [None]:
df.head()

Postprocessing, remove duplicates

In [None]:
import pandas as pd

# Sample DataFrame
df = pd.DataFrame({
   'sha256': ['abc', 'abc', 'def', 'ghi', 'ghi', 'ghi'],
   'speciesKey': [1, 1, 2, 3, 4, 3]
})

# Step 1: Group by sha256 and apply the heuristic
def process_duplicates(group):
    if group['speciesKey'].nunique() == 1:
        # Only one speciesKey, keep one row
        return group.iloc[:1]
    else:
        # Multiple speciesKey, drop all duplicates
        return pd.DataFrame(columns=group.columns)

# Apply the function to each group of sha256
result = df.groupby('sha256', group_keys=False).apply(process_duplicates)

print(result)

In [None]:
# Sample DataFrame
df = pd.DataFrame({
   'sha256': ['abc', 'abc', 'def', 'ghi', 'ghi', 'ghi'],
   'speciesKey': [1, 1, 2, 3, 4, 3],
   'filename': ['file1.jpg', 'file2.jpg', 'file3.jpg', 'file4.jpg', 'file5.jpg', 'file6.jpg']
})

# List to store removed sha256 values and files
removed_files = []

# Function to process duplicates based on heuristic
def process_duplicates(group):
    if group['speciesKey'].nunique() == 1:
        # Only one speciesKey, keep one row, delete the duplicates' files
        for index, row in group.iloc[1:].iterrows():  # Keep the first row, delete the rest
            file_path = f"{row['speciesKey']}/{row['filename']}"
            # if os.path.exists(file_path):
                # os.remove(file_path)
            removed_files.append(file_path)
        return group.iloc[:1]  # Keep only the first row
    
    else:
        # Multiple speciesKey, remove all rows and delete associated files
        for index, row in group.iterrows():
            file_path = f"{row['speciesKey']}/{row['filename']}"
            # if os.path.exists(file_path):
                # os.remove(file_path)
            removed_files.append(file_path)
        
        # Return an empty DataFrame for this group
        return pd.DataFrame(columns=group.columns)

# Apply the function to each group of sha256
result = df.groupby('sha256', group_keys=False).apply(process_duplicates, include_groups=True)

# Get the list of removed sha256 and files
removed_files_list = removed_files

# Output the cleaned DataFrame, removed sha256 list, and removed file paths
print("Cleaned DataFrame:")
print(result)

print("\nList of removed files:")
print(removed_files_list)

In [None]:
class FileManager:
    @abstractmethod
    def save(img, img_path):
        pass 

    @abstractmethod
    def remove(img_path):
        pass 

class LocalFileManager(FileManager):
    def save(img, img_path):
        with open(img_path, 'wb') as handler:
            handler.write(img)
    
    def remove(img_path):
        if os.path.exists(img_path):
            os.remove(img_path)


In [None]:
df = pd.read_parquet(path = occurrences_path.with_suffix('.parquet'))

In [None]:
len(df)

In [None]:
df.head()

In [None]:
df['sha256'] 

In [None]:
df['sha256'] = np.arange(10)

In [None]:
df.to_parquet('/home/george/codes/gbifxdl/data/classif/mini/0013397-241007104925546.parquet', engine='pyarrow', compression='gzip')

In [None]:
url = 'sftp://erda:2222/datasets/tests'

PurePosixPath(unquote(urlparse(url).path))

In [None]:
Path(urlparse(url).path)

In [None]:
def get_image_paths(folder):
    """
    Recursively collect all image file paths in a folder.
    Returns a list of image file paths.
    """
    image_paths = []
    for root, _, files in os.walk(folder):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')):
                image_paths.append(os.path.join(root, file))
    return image_paths

files = get_image_paths("/home/george/codes/gbifxdl/data/classif/mini/images")
len(files)

In [None]:
remote_dir = "sftp://gmo@ecos.au.dk:@io.erda.au.dk/datasets/test3"
o = urlparse(remote_dir)
remote_dir = Path(o.path)
netloc = o.netloc
sftp_server = f"{o.scheme}://{o.netloc}"
print(sftp_server)

In [None]:
u,s=o.netloc.split(':@')

Check integrity of Parquet file

In [None]:
# df = pd.read_parquet("/home/george/codes/gbifxdl/data/classif/mini/0013397-241007104925546.parquet")
df1 = pd.read_parquet("/home/george/codes/gbifxdl/data/classif/lepi_small/0060185-241126133413365.parquet")
df2 = pd.read_parquet("/home/george/codes/gbifxdl/data/classif/lepi_small/0060185-241126133413365_v2.parquet")

In [None]:
df1[~df1.apply(tuple,1).isin(df2.apply(tuple,1))]

In [None]:
df1.equals(df2)

In [None]:
len(df1), len(df2)

In [None]:
df2.tail()

In [None]:
df1[df1['continent']=='']

In [None]:
df1['url_hash'].tail(), df2['filename'].tail()

In [None]:
df[df.img_hash.duplicated(keep=False)]

In [None]:
df.keys()

In [None]:
len(df[df.status == 'downloading_failed'])

Try to stream the DWCA file to avoid loading it into memory

In [None]:
occurrences_path = Path("/mnt/c/Users/au761367/OneDrive - Aarhus universitet/Codes/python/gbifxdl/data/classif/mini/0013397-241007104925546.zip")

In [None]:
with DwCAReader(occurrences_path) as dwca:
    print(dwca.metadata)

In [None]:
dwca = DwCAReader(occurrences_path)

In [None]:
row = dwca.get_corerow_by_position(0)

SFTPClient connection

In [None]:
from paramiko import SSHClient, AutoAddPolicy, SFTPClient, Transport, RSAKey

class SFTPHandler:
    def __init__(self, host, port, username, rsa_key_path=None, working_dir="/"):
        """
        Initialize the SFTPHandler with RSA key authentication.
        :param host: SFTP server hostname
        :param port: SFTP server port
        :param username: Username for authentication
        :param rsa_key_path: Path to the RSA private key file (optional)
        :param rsa_key_str: RSA private key as a string (optional)
        """
        self.transport = Transport((host, port))
        
        # Load RSA Key
        if rsa_key_path:
            rsa_key = RSAKey.from_private_key_file(rsa_key_path)
        else:
            raise ValueError("Either 'rsa_key_path' or 'rsa_key_str' must be provided.")
        
        # Connect with RSA Key
        self.transport.connect(username=username, pkey=rsa_key)
        self.sftp = SFTPClient.from_transport(self.transport)
        self.create_folder(working_dir)
        self.sftp.chdir(working_dir)
    
    def create_folder(self, folder):
        try:
            self.sftp.mkdir(folder)
        except IOError:
            pass  # Folder likely exists

    def upload_file(self, folder, filename, file_data):
        self.create_folder(folder)
        remote_path = os.path.join(folder, filename)
        self.sftp.putfo(file_data, remote_path)
        # with self.sftp.open(remote_path, "wb") as f:
        #     f.write(file_data)

    def close(self):
        self.sftp.close()
        self.transport.close()

sftp = SFTPHandler(
    host="io.erda.au.dk",
    port=2222,
    working_dir="datasets/test4",
    username="gmo@ecos.au.dk",
    rsa_key_path="/mnt/c/Users/au761367/.ssh/id_rsa",
)

In [None]:
img_path = "/mnt/c/Users/au761367/OneDrive - Aarhus universitet/Codes/python/gbifxdl/data/classif/mini/images/1011881/1c41c4a0ed1dc2c62fda5f30f3844bddb0f66ed5.jpeg"
img=Image.open(img_path)

In [None]:
type(img)

In [None]:
with open(img_path, 'br') as img:
    sftp.upload_file("/", "1c41c4a0ed1dc2c62fda5f30f3844bddb0f66ed5.jpeg", img)

SFTPClient connection with async

In [None]:
async def run_client() -> None:
    async with asyncssh.connect(
        host="io.erda.au.dk",
        port=2222,
        username="gmo@ecos.au.dk",
        client_keys=["/mnt/c/Users/au761367/.ssh/id_rsa"]
        ) as conn:
        async with conn.start_sftp_client() as sftp:
            await sftp.get('datasets/test3/1011881/1c41c4a0ed1dc2c62fda5f30f3844bddb0f66ed5.jpeg')

try:
    asyncio.run(run_client())
except (OSError, asyncssh.Error) as exc:
    sys.exit('SFTP operation failed: ' + str(exc))

Test of TypedDict

In [None]:
class test(TypedDict):
    coucou: str = "h"
    num: int = None

def f(coucou, num):
    print(f"{coucou*num}")

t = test({"coucou":"hello ", "num":5})
# print(**t)
f(**t)

In [None]:
test(coucou="hello", num=5)

In [None]:
test()

Check metadata quality

In [None]:
pd.read_parquet(path="/home/george/codes/gbifxdl/data/classif/mini/0013397-241007104925546_processing_metadata.parquet")

Test parquet files

In [None]:
parquet_path="/mnt/c/Users/au761367/OneDrive - Aarhus universitet/Codes/python/gbifxdl/data/classif/mini/0013397-241007104925546.parquet"
batch_size = 10
parquet_iter_for_merge = pq.ParquetFile(parquet_path).iter_batches(batch_size=batch_size)
original_table = pa.Table.from_batches([next(parquet_iter_for_merge)])

In [None]:
original_table

Try to use dwca.pd_read 

In [None]:
# This is extremely memory expensive
# with DwCAReader("/home/george/codes/gbifxdl/data/classif/lepi/0061420-241126133413365.zip") as dwca:
#     media_df = dwca.pd_read("multimedia.txt", parse_dates=True, on_bad_lines="skip")
#     occ_df = dwca.pd_read("occurrence.txt", parse_dates=True, on_bad_lines="skip")

In [None]:
with DwCAReader("/home/george/codes/gbifxdl/data/classif/lepi_small/0060185-241126133413365.zip") as dwca:
    media_df = dwca.pd_read("multimedia.txt", parse_dates=True, on_bad_lines="skip", chunksize=1000)

In [None]:
chunk = next(iter(media_df))
chunk.head()

Try to open large parquet file in memory

In [None]:
df = pd.read_parquet("/home/george/codes/gbifxdl/data/classif/lepi/0061420-241126133413365_sampled.parquet")

In [None]:
len(df)

Use dask to open it

In [None]:
df = dd.read_parquet("/home/george/codes/gbifxdl/data/classif/lepi/0061420-241126133413365.parquet")

In [None]:
# Compute speciesKey distribution
species_distribution = df["speciesKey"].value_counts().compute()

# Plot the distribution
species_distribution.plot(kind="bar", figsize=(12, 6))
plt.xlabel("speciesKey")
plt.ylabel("Count")
plt.title("SpeciesKey Distribution")
plt.tight_layout()
plt.show()

In [None]:
species_distribution

In [None]:
# Function to sample 500 rows per speciesKey
def sample_species(group):
    # Ensure sampling is done correctly in Pandas
    return group.sample(n=min(len(group), 500), random_state=42)

# Group by speciesKey and sample
sampled_df = df.groupby("speciesKey").apply(
    sample_species, meta=df
)

# Persist the result (optional, to optimize memory usage)
sampled_df = sampled_df.persist()

# Save the sampled rows to a new Parquet file
output_path = "sampled_species.parquet"
sampled_df.to_parquet(output_path, write_index=False)

print(f"Sampled data saved to {output_path}")