In [1]:
import re
import json
import requests
import numpy as np
import pandas as pd
import pyarrow as pa
from odp.dto import MetadataDto
from odp.client import OdpClient
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from odp.dto.catalog import DatasetDto, DatasetSpec
from odp.dto.common.contact_info import ContactInfo
from odp.client.exc import OdpResourceExistsError, OdpValidationError

from tqdm import tqdm
tqdm.pandas()
pd.set_option('display.max_columns', None)

## Data preprocessing
- https://obis.org/dataset/47edd06e-212a-4677-9abb-6384a0cac20a
- WoRMS REST API: https://www.marinespecies.org/rest/

In [2]:
# reading obis data
data = pd.read_csv('dwca-agrra-benthic-v1/occurrence.txt', sep = '\t', engine = 'python')

data.columns = [re.sub(r'(?<!^)(?=[A-Z][a-z])', '_', col).lower() for col in data.columns]

data = data.rename(columns = {"decimal_latitude": "latitude", "decimal_longitude": "longitude",
                              "locationid": "location_id"})
print(data.shape)
data.head()

(66248, 41)


Unnamed: 0,id,bibliographic_citation,institution_code,collection_code,basis_of_record,dynamic_properties,occurrenceid,catalog_number,occurrence_remarks,record_number,recorded_by,individual_count,sex,life_stage,occurrence_status,preparations,other_catalog_numbers,associated_references,event_date,year,month,day,field_number,water_body,country,state_province,county,locality,verbatim_depth,minimum_depth_in_meters,maximum_depth_in_meters,verbatim_coordinates,latitude,longitude,coordinate_precision,identified_by,date_identified,type_status,scientific_nameid,scientific_name,scientific_name_authorship
0,1,"Marks, K.W. 2007. AGRRA Database, version (05/...",AGRRA,agrra_benthic,HumanObservation,,1,41260,,,,,,,present,,,,2000-08-04T12:00:00Z,2000,8,4,,,,,,,,9.1,9.1,,18.463051,-77.358131,,,,,urn:lsid:marinespecies.org:taxname:287911,Agaricia agaricites,
1,2,"Marks, K.W. 2007. AGRRA Database, version (05/...",AGRRA,agrra_benthic,HumanObservation,,2,25455,,,,,,,present,,,,1999-07-10T12:00:00Z,1999,7,10,,,,,,,,11.2,11.8,,18.506945,-87.753609,,,,,urn:lsid:marinespecies.org:taxname:288889,Porites astreoides,
2,3,"Marks, K.W. 2007. AGRRA Database, version (05/...",AGRRA,agrra_benthic,HumanObservation,,3,24547,,,,,,,present,,,,1999-08-30T12:00:00Z,1999,8,30,,,,,,,,10.6,11.8,,20.114166,-87.45639,,,,,urn:lsid:marinespecies.org:taxname:287963,Montastraea faveolata,
3,4,"Marks, K.W. 2007. AGRRA Database, version (05/...",AGRRA,agrra_benthic,HumanObservation,,4,29176,,,,,,,present,,,,1999-11-18T12:00:00Z,1999,11,18,,,,,,,,18.6,18.6,,27.905333,-93.591499,,,,,urn:lsid:marinespecies.org:taxname:287964,Montastraea franksi,
4,5,"Marks, K.W. 2007. AGRRA Database, version (05/...",AGRRA,agrra_benthic,HumanObservation,,5,20682,,,,,,,present,,,,1999-08-15T12:00:00Z,1999,8,15,,,,,,,,8.0,8.0,,26.3535,-76.977997,,,,,urn:lsid:marinespecies.org:taxname:207479,Montastraea annularis,


In [3]:
#data[data["eventDate"].str.contains("1970", na=False)]
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 66248 entries, 0 to 66247
Data columns (total 41 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   id                          66248 non-null  int64  
 1   bibliographic_citation      66248 non-null  object 
 2   institution_code            66248 non-null  object 
 3   collection_code             66248 non-null  object 
 4   basis_of_record             66248 non-null  object 
 5   dynamic_properties          0 non-null      float64
 6   occurrenceid                66248 non-null  int64  
 7   catalog_number              66248 non-null  int64  
 8   occurrence_remarks          0 non-null      float64
 9   record_number               0 non-null      float64
 10  recorded_by                 0 non-null      float64
 11  individual_count            0 non-null      float64
 12  sex                         0 non-null      float64
 13  life_stage                  0 n

In [4]:
data['scientific_name'] = data['scientific_name'].apply(
    #lambda x: re.sub(r'(?:\s+sp\.|\s+\.sp)\s*$', '', x.lower()).strip()
    lambda x: re.sub(r'\b(?:sp\.|\.sp)(?=\W|$)', '', x.lower()).strip()
)

# Name X | Name XX{3,4, ...} -> Name
#data['scientific_name'] = data['scientific_name'].apply(lambda x: x.rstrip(' x').lower())
data['scientific_name'] = data['scientific_name'].apply(
    lambda x: re.sub(r'\s+x+$', '', x.strip(), flags=re.IGNORECASE)
)

# remove parenthesis from names
# e.g. porites colonensis (sensu p. humann) --> porites colonensis
data['scientific_name'] = data['scientific_name'].str.split('(').str[0].str.strip()
# data['scientific_name'] = data['scientific_name'].str.replace(r'\s*\(.*', '', regex=True)

if "event_date" in data.columns:

    #data["event_date"] = data["event_date"].apply(lambda x: x + "-01-01" if len(str(x)) == 4 else x)
    data["event_date"] = data["event_date"].apply(
        lambda x: x + "-01-01" if len(str(x)) == 4 else (x + "-01" if len(str(x)) == 7 else x)
    )
    data['event_date'] = pd.to_datetime(data['event_date'])

# TO DO: extract info from website or metadata
else: 
    pass

if "individual_count" in data.columns:
    # just integers
    data['individual_count'] = data['individual_count'].fillna(0).astype(int)

#target_cols = ["latitude", "longitude", "scientific_name", "event_date", "individual_count",
 #             "minimum_depth_in_meters", "maximum_depth_in_meters"]
target_cols = ["scientific_name", "valid_scientific_name", "matching_type", "unaccept_reason",
               "rank", "individual_count", "event_date", "minimum_depth_in_meters",
               "maximum_depth_in_meters", "latitude", "longitude", "country"]

data = data[data.columns.intersection(target_cols)].reset_index(drop = True)

print(data.shape)
data.head()

(66248, 8)


Unnamed: 0,individual_count,event_date,country,minimum_depth_in_meters,maximum_depth_in_meters,latitude,longitude,scientific_name
0,0,2000-08-04 12:00:00+00:00,,9.1,9.1,18.463051,-77.358131,agaricia agaricites
1,0,1999-07-10 12:00:00+00:00,,11.2,11.8,18.506945,-87.753609,porites astreoides
2,0,1999-08-30 12:00:00+00:00,,10.6,11.8,20.114166,-87.45639,montastraea faveolata
3,0,1999-11-18 12:00:00+00:00,,18.6,18.6,27.905333,-93.591499,montastraea franksi
4,0,1999-08-15 12:00:00+00:00,,8.0,8.0,26.3535,-76.977997,montastraea annularis


## Data enrichment

In [5]:
def custom_locator(lat, lon):
    """
    Returns the country name for a given pair of latitude and longitude.

    Parameters:
        lat (str | float): Latitude value of the location.
        lon (str | float): Longitude value of the location.

    Returns:
        str: Name of the country corresponding to the provided coordinates.
    """
    lat = str(lat)
    lon = str(lon)
    
    # initialize Nominatim API 
    geolocator = Nominatim(user_agent="hub_ocean_geopy_app", timeout = 10)
    geocode = RateLimiter(geolocator.reverse, min_delay_seconds=1)

    location = geocode(f"{lat},{lon}", language="en")

    try:
        address = location.raw['address']
        return address.get('country', 'unknown')

    except:
        return "unknown"

In [6]:
data_individual_counts = data[data['individual_count'] == 0]\
    .groupby(["latitude", "longitude", "scientific_name"])\
    .size()\
    .reset_index(name = "calculated_individual_count")

data = data.merge(data_individual_counts, on = ["latitude", "longitude", "scientific_name"], how = 'left')

# fill original column
data['individual_count'] = np.where(
    data['individual_count'].isnull() | (data['individual_count'] == 0),
    data['calculated_individual_count'],
    data['individual_count']
)

del data_individual_counts

In [None]:
# verify metrics
# data[(data["scientific_name"] == "favia leptophyla") & (np.isclose(data["latitude"], -17.992001, atol=1e-6))]

In [7]:
# keep records where "country" is null and drop repeated coordinates
coordinates = data[data['country'].isnull()][["latitude", "longitude"]].drop_duplicates()

coordinates["country_coordinates"] = coordinates.progress_apply(
        lambda x: custom_locator(x.latitude, x.longitude), axis = 1
)

data = data.merge(coordinates, on = ["latitude", "longitude"], how = 'left')
del coordinates

# fill original column
data['country'] = np.where(data['country'].isnull(), data['country_coordinates'], data['country'])
data.head(3)

100%|████████████████████████████████████████████████████████████████████████████████████| 731/731 [12:15<00:00,  1.01s/it]


Unnamed: 0,individual_count,event_date,country,minimum_depth_in_meters,maximum_depth_in_meters,latitude,longitude,scientific_name,calculated_individual_count,country_coordinates
0,19,2000-08-04 12:00:00+00:00,Jamaica,9.1,9.1,18.463051,-77.358131,agaricia agaricites,19,Jamaica
1,2,1999-07-10 12:00:00+00:00,Mexico,11.2,11.8,18.506945,-87.753609,porites astreoides,2,Mexico
2,18,1999-08-30 12:00:00+00:00,Mexico,10.6,11.8,20.114166,-87.45639,montastraea faveolata,18,Mexico


In [8]:
def get_worms_accepted_names(scientific_names, like = "true",
                             marine_only = "true", extant_only = "false"):
    """
    Compares a list of marine species names with WoRMS records using the WoRMS API
    to check if the names are valid.

    The request uses the following URL parameters:
    ?scientificnames[]=NAME1&scientificnames[]=NAME2&like=true&marine_only=true

    Parameters:
        scientific_names (list[str]): 
            A list of scientific names of marine species to be checked.
        like (str): 
            Whether to apply a SQL-like search (i.e., adds a '%' after the name).
            Defaults to "true".
        marine_only (str): 
            If "true", limits results to marine taxa. Defaults to "true".
        extant_only (str): 
            If "true", limits results to extant (living) taxa. Defaults to "true".

    Returns:
        list[dict]: 
            A list of dictionaries containing information from the WoRMS API, 
            such as scientific name, taxonomic status (e.g., accepted), valid name, etc.
            The API may return multiple records per species name.
    """

    # build the query parameters with repeated 'scientificnames[]'
    # the [ ] in scientificnames[] matters a lot to the WoRMS API
    # because it's built with PHP-style parameter handling
    url = "https://www.marinespecies.org/rest/AphiaRecordsByNames"

    # cannot use dictionary here because it can only have one 
    # value per key. If you write:
    # {"scientificnames[]": "A", "scientificnames[]": "B"}
    # the second one overwrites the first
    params = [("scientificnames[]", name) for name in scientific_names]
    params += [
        ("like", like),
        ("marine_only", marine_only),
        ("extant_only", extant_only)
    ] # at the end, this is also a list of key-value pairs

    headers = {"accept": "application/json"}
    response = requests.get(url, params = params, headers = headers)

    if response.status_code != 200:
        return f"Error: {response.status_code}"

    # [
    #    [{}, {}, {}], # specie 1
    #    [{}],         # specie 2
    #    [{}, {}]      # specie 3
    # ]
    return response.json()

In [9]:
scientific_names = data.scientific_name.unique().tolist() #["Agaricia agaricites", "Porites astreoides", "NotARealSpecies"]
results = get_worms_accepted_names(scientific_names)

worms_data_list = [{
    "scientific_name": sp_name,
    "valid_name": worms_response[0].get("valid_name") if worms_response else "unknown",
    "matching_type": worms_response[0].get("status") if worms_response else "unknown",
    "rank": worms_response[0].get("rank") if worms_response else "unknown",
    "unaccept_reason": worms_response[0].get("unacceptreason") if worms_response else "unknown",
} for sp_name, worms_response in zip(scientific_names, results)]


In [10]:
worms_data = pd.DataFrame(worms_data_list)
worms_data[["valid_name", "matching_type", "rank"]] = worms_data[["valid_name", "matching_type", "rank"]]\
                                                        .apply(lambda col: col.str.lower())

data = data.merge(worms_data, on = 'scientific_name', how = 'left')
data = data.rename(columns = {"valid_name": "valid_scientific_name"}) # replace for correct species names
data = data[data.columns.intersection(target_cols)].reset_index(drop = True)
data[["valid_scientific_name", "matching_type", "rank"]] = (
    data[["valid_scientific_name", "matching_type", "rank"]]
    .fillna("unknown")
)

data['unaccept_reason'] = data['unaccept_reason'].fillna("None")

print(data.shape)
data.head()

(66248, 12)


Unnamed: 0,individual_count,event_date,country,minimum_depth_in_meters,maximum_depth_in_meters,latitude,longitude,scientific_name,valid_scientific_name,matching_type,rank,unaccept_reason
0,19,2000-08-04 12:00:00+00:00,Jamaica,9.1,9.1,18.463051,-77.358131,agaricia agaricites,Agaricia agaricites,accepted,Species,
1,2,1999-07-10 12:00:00+00:00,Mexico,11.2,11.8,18.506945,-87.753609,porites astreoides,Porites astreoides,accepted,Species,
2,18,1999-08-30 12:00:00+00:00,Mexico,10.6,11.8,20.114166,-87.45639,montastraea faveolata,Orbicella faveolata,superseded combination,Species,
3,59,1999-11-18 12:00:00+00:00,unknown,18.6,18.6,27.905333,-93.591499,montastraea franksi,Orbicella franksi,superseded combination,Species,
4,6,1999-08-15 12:00:00+00:00,Bahamas,8.0,8.0,26.3535,-76.977997,montastraea annularis,Orbicella annularis,superseded combination,Species,


## Ingesting into ODP
### 1. Create dataset manifest to add to catalog

In [11]:
# hub ocean client
client = OdpClient()

In [12]:
dataset_name = "Atlantic and Gulf Rapid Reef Assessment - Benthic"

# Declare a dataset manifest to add to the catalog
dataset_manifest = DatasetDto(
    metadata = MetadataDto(
        name = re.sub(r"\s*-\s*|\s+", "-", dataset_name.lower()),
        display_name = dataset_name,
        description = "Benthic cover dataset from the Atlantic and Gulf Rapid Reef Assessment (AGRRA) Program, "
        "documenting coral reef benthic community composition across sites in the wider Caribbean.",
        #external_url = "https://obis.org/dataset/47edd06e-212a-4677-9abb-6384a0cac20a", # ------ doesn't work ------
        labels = {"hubocean.io/test": True, 'tabular_v2': 1.0},
    ),
    spec = DatasetSpec(
        storage_controller = "registry.hubocean.io/storageController/storage-tabular",
        storage_class = "registry.hubocean.io/storageClass/tabular",
        maintainer = ContactInfo(
            contact = "Brenda Varguez <Brenda.Varguez@oceandata.earth>",
            organisation = "Hub Ocean",
        ),
    ),
)

# ingest info into catalog (to know resource exist)
try:
    created_resource_manifest = client.catalog.create(dataset_manifest)
    #print("Resource created successfully:", created_resource_manifest) # old version
    print("Resource created successfully:", created_resource_manifest.qualified_name)

except OdpValidationError as e:
    print("Validation Error:", e)
    
except OdpResourceExistsError:
    #created_resource_manifest = client.catalog.get("catalog.hubocean.io/dataset/" + dataset_manifest.metadata.name)
    created_resource_manifest = client.catalog.get("catalog.hubocean.io/dataset/" + dataset_manifest.metadata.name)
    print("Dataset already exists. Fetched existing dataset:", created_resource_manifest.qualified_name)
    

Resource created successfully: catalog.hubocean.io/dataset/atlantic-and-gulf-rapid-reef-assessment-benthic


### 2. Check if resource was correctly created

In [13]:
json.loads(client.catalog.get("catalog.hubocean.io/dataset/" + dataset_manifest.metadata.name).json())

{'spec': {'storage_class': 'registry.hubocean.io/storageClass/tabular',
  'storage_controller': 'registry.hubocean.io/storageController/storage-tabular',
  'data_collection': None,
  'maintainer': {'contact': 'Brenda Varguez <Brenda.Varguez@oceandata.earth>',
   'organisation': 'Hub Ocean'},
  'citation': None,
  'documentation': [],
  'attributes': [],
  'facets': None,
  'tags': []},
 'kind': 'catalog.hubocean.io/dataset',
 'version': 'v1alpha3',
 'metadata': {'name': 'atlantic-and-gulf-rapid-reef-assessment-benthic',
  'display_name': 'Atlantic and Gulf Rapid Reef Assessment - Benthic',
  'description': 'Benthic cover dataset from the Atlantic and Gulf Rapid Reef Assessment (AGRRA) Program, documenting coral reef benthic community composition across sites in the wider Caribbean.',
  'uuid': '097d6612-d4fc-414d-af8d-0b9dffc9d03f',
  'labels': {'tabular_v2': 1.0, 'hubocean.io/test': True},
  'owner': '6ca9c8a9-2883-4336-bee2-f66fdd75e0d2'},
 'status': {'num_updates': 0,
  'created_tim

### 3. Create schema for dataset

Before you can start adding data to the table, you need to create the table schema. This can be done using the [pyarrow.Schema](https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html) object:

In [14]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 66248 entries, 0 to 66247
Data columns (total 12 columns):
 #   Column                   Non-Null Count  Dtype              
---  ------                   --------------  -----              
 0   individual_count         66248 non-null  int64              
 1   event_date               66248 non-null  datetime64[ns, UTC]
 2   country                  66248 non-null  object             
 3   minimum_depth_in_meters  64710 non-null  float64            
 4   maximum_depth_in_meters  64710 non-null  float64            
 5   latitude                 66248 non-null  float64            
 6   longitude                66248 non-null  float64            
 7   scientific_name          66248 non-null  object             
 8   valid_scientific_name    66248 non-null  object             
 9   matching_type            66248 non-null  object             
 10  rank                     66248 non-null  object             
 11  unaccept_reason          662

In [15]:
# 1. Get Dto
table_schema = client.table_v2(
    client.catalog.get("catalog.hubocean.io/dataset/" + dataset_manifest.metadata.name)
)

# 2. Create new table with our desired schema
schema = pa.schema([
    #pa.field("id", pa.string(), nullable=False, metadata={"index":"1"}),
    pa.field("scientific_name", pa.string(),
             metadata = {"description": "The scientific name of the species"}),
    pa.field("valid_scientific_name", pa.string(),
             metadata = {"description": "The scientific name of the species according to WoRMS"}),
    pa.field("matching_type", pa.string(),
             metadata = {
                 "description": """
                 Taxonomic status or relationship of the matched scientific name.
                 Values reflect how the name in the dataset corresponds to current taxonomic standards
                 """
             }),
    pa.field("unaccept_reason", pa.string(),
             metadata = {"description": "Reason why the name is not accepted"}),
    pa.field("rank", pa.string(),
             metadata = {"description": "Taxonomic rank of the species"}),
    pa.field("latitude", pa.float64(),
             metadata = {"description": "Latitude of the observation"}),
    pa.field("longitude", pa.float64(),
             metadata = {"description": "Longitude of the observation"}),
    pa.field("minimum_depth_in_meters", pa.float64(),
             metadata = {"description": "Minimum depth of the observation in meters"}),
    pa.field("maximum_depth_in_meters", pa.float64(),
             metadata = {"description": "Maximum depth of the observation in meters"}),
    #pa.field('geometry', pa.string(), metadata={  # geometry type with metadata
     #   b'isGeometry': b'1'
    #})
    pa.field("event_date", pa.date32(),
             metadata = {"description": "Date of the observation"}),
    pa.field("individual_count", pa.int64(),
             metadata = {"description": "Count of individuals observed"}),
    pa.field("country", pa.string(),
             metadata = {"description": "Country where the observation was made"})
])

table_schema.create(schema)

# 3. Verify the new schema
print(table_schema.schema())

scientific_name: string
  -- field metadata --
  description: 'The scientific name of the species'
valid_scientific_name: string
  -- field metadata --
  description: 'The scientific name of the species according to WoRMS'
matching_type: string
  -- field metadata --
  description: '
                 Taxonomic status or relationship of the' + 146
unaccept_reason: string
  -- field metadata --
  description: 'Reason why the name is not accepted'
rank: string
  -- field metadata --
  description: 'Taxonomic rank of the species'
latitude: double
  -- field metadata --
  description: 'Latitude of the observation'
longitude: double
  -- field metadata --
  description: 'Longitude of the observation'
minimum_depth_in_meters: double
  -- field metadata --
  description: 'Minimum depth of the observation in meters'
maximum_depth_in_meters: double
  -- field metadata --
  description: 'Maximum depth of the observation in meters'
event_date: date32[day]
  -- field metadata --
  description: 'Dat

### 4. Ingest data from dataframe

In [16]:
with table_schema as tx:
  tx.insert(data.to_dict(orient='records'))

In [17]:
# one row
next(table_schema.select().rows())

{'scientific_name': 'agaricia agaricites',
 'valid_scientific_name': 'Agaricia agaricites',
 'matching_type': 'accepted',
 'unaccept_reason': 'None',
 'rank': 'Species',
 'latitude': 18.4630508423,
 'longitude': -77.3581314087,
 'minimum_depth_in_meters': 9.1,
 'maximum_depth_in_meters': 9.1,
 'event_date': datetime.date(2000, 8, 4),
 'individual_count': 19,
 'country': 'Jamaica'}

In [18]:
next(table_schema.select().pages(size=4))

[{'scientific_name': 'agaricia agaricites',
  'valid_scientific_name': 'Agaricia agaricites',
  'matching_type': 'accepted',
  'unaccept_reason': 'None',
  'rank': 'Species',
  'latitude': 18.4630508423,
  'longitude': -77.3581314087,
  'minimum_depth_in_meters': 9.1,
  'maximum_depth_in_meters': 9.1,
  'event_date': datetime.date(2000, 8, 4),
  'individual_count': 19,
  'country': 'Jamaica'},
 {'scientific_name': 'porites astreoides',
  'valid_scientific_name': 'Porites astreoides',
  'matching_type': 'accepted',
  'unaccept_reason': 'None',
  'rank': 'Species',
  'latitude': 18.5069446564,
  'longitude': -87.7536087036,
  'minimum_depth_in_meters': 11.2,
  'maximum_depth_in_meters': 11.8,
  'event_date': datetime.date(1999, 7, 10),
  'individual_count': 2,
  'country': 'Mexico'},
 {'scientific_name': 'montastraea faveolata',
  'valid_scientific_name': 'Orbicella faveolata',
  'matching_type': 'superseded combination',
  'unaccept_reason': 'None',
  'rank': 'Species',
  'latitude': 20