In [25]:
import collections
from pathlib import Path

import pyarrow as pa
import pyarrow.csv
import pyarrow.compute
import pyarrow.parquet as pq
import requests
from tqdm.auto import tqdm

In [22]:
schema_in = pa.schema([
    ("timestamp", pa.timestamp("us")),
    ("location_lat", pa.float64()),
    ("location_long", pa.float64()),
    ("individual_id", pa.int64()),
    ("deployment_id", pa.int64()),
    ("tag_id", pa.int64()),
    ("visible", pa.bool_()),
    ("sensor_type", pa.string()),
])

schema_out = pa.schema([
    ("timestamp", pa.timestamp("us")),
    ("location_lat", pa.float64()),
    ("location_long", pa.float64()),
    ("study_id", pa.int64()),
    ("individual_id", pa.int64()),
    ("deployment_id", pa.int64()),
    ("tag_id", pa.int64()),
    ("visible", pa.bool_()),
    ("sensor_type", pa.string())
])

In [23]:
# Convert from CSV to one big parquet file

with pq.ParquetWriter("locations_gps.parquet", schema=schema_out) as writer:
    for path in tqdm(sorted(Path("studies").glob("*/location.csv"))):
        # Read the CSV file using pyarrow
        try:
            table = pa.csv.read_csv(path, convert_options=pa.csv.ConvertOptions(column_types=schema_in))
        except pa.ArrowInvalid:
            continue
        
        # Clean up the data, limit to GPS
        table = table.drop_null()
        table = table.filter(table["visible"])
        table = table.filter(pa.compute.and_(table["visible"], pa.compute.equal(table["sensor_type"], "gps")))
        table = table.sort_by([("individual_id", "ascending"),
                               ("deployment_id", "ascending"),
                               ("timestamp", "ascending")])

        # Add the study_id
        study_id = int(path.parent.name.split("-")[0])
        table = table.add_column(3, "study_id", pa.array(len(table) * [study_id], pa.int64()))

        # Append to the output file
        writer.write_table(table)

  0%|          | 0/476 [00:00<?, ?it/s]

In [28]:
GBIF_KEYS = ["rank", "kingdom", "phylum", "class", "order", "family", "genus", "species"]

# Get all taxon names, retrieve additional information
all_taxons = set()
for path in tqdm(sorted(Path("studies").glob("*/individual.csv"))):
    all_taxons.update(pd.read_csv(path, dtype=str)["taxon_canonical_name"])
all_taxons = sorted(tx for tx in all_taxons if isinstance(tx, str))

taxons_data = {}

for taxon in tqdm(all_taxons):
    response = requests.get("https://api.gbif.org/v1/species", params=dict(name=taxon, limit=1000))
    assert response.ok
    results = response.json()["results"]
    counters = collections.defaultdict(collections.Counter)
    for result in results:
        if result.get("taxonomicStatus") != "ACCEPTED":
            continue
        for key in GBIF_KEYS:
            counters[key][result.get(key)] += 1
    taxons_data[taxon] = {
        key: max(counters[key].items(), key=lambda x: x[1])[0] if key in counters else None
        for key in GBIF_KEYS
    }

  0%|          | 0/476 [00:00<?, ?it/s]

  0%|          | 0/293 [00:00<?, ?it/s]

In [37]:
# Convert metadata about individuals to one big parquet file

schema_in_indiv = pa.schema([
    ("id", pa.int64()),
    ("earliest_date_born", pa.timestamp("us")),
    ("latest_date_born", pa.timestamp("us")),
    ("exact_date_of_birth", pa.timestamp("us")),
    ("local_identifier", pa.string()),
    ("sex", pa.string()),
    ("taxon_canonical_name", pa.string()),
])

schema_out_indiv = pa.schema([
    ("id", pa.int64()),
    ("earliest_date_born", pa.timestamp("us")),
    ("latest_date_born", pa.timestamp("us")),
    ("exact_date_of_birth", pa.timestamp("us")),
    ("local_identifier", pa.string()),
    ("sex", pa.string()),
    ("taxon_canonical_name", pa.string()),
    *((f"taxon_{key}", pa.string())
      for key in GBIF_KEYS)
])

with pq.ParquetWriter("individuals.parquet", schema=schema_out_indiv) as writer:
    for path in tqdm(sorted(Path("studies").glob("*/individual.csv"))):
        # Read the CSV file using pyarrow
        try:
            table = pa.csv.read_csv(path, convert_options=pa.csv.ConvertOptions(column_types=schema_in_indiv))
        except pa.ArrowInvalid:
            continue
            
        # Add details about taxon
        for key in GBIF_KEYS:
            table = table.append_column(
                f"taxon_{key}",
                pa.array([taxons_data[tx][key] if tx in taxons_data else None
                          for tx in table["taxon_canonical_name"].to_pylist()],
                         type=pa.string())
            )
        
        # Keep only wanted columns
        table = table.select(schema_out_indiv.names)

        # Append to the output file
        writer.write_table(table)

  0%|          | 0/476 [00:00<?, ?it/s]