# Env

In [None]:
# Set base path
BASE_PATH = ""

In [5]:
import os
import sys
sys.path.append(os.path.abspath(".."))

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import col, countDistinct, count, desc, broadcast, lower, sum, row_number, floor
from pyspark.sql.window import Window

from helpers.variables import COLS_TAXONOMIC
from helpers.data_analysis import init_spark, create_freq, view_freq, check_sparsity
from helpers.gbif import fetch_gbif_chunk, fetch_gbif_iter, retry_failed_chunks, insert_records_to_mongo, fetch_publisher_key
#from helpers.text_search import flatten_dict, full_text_search_rdd, flatten_list_to_string, extract_fields

import pandas as pd

In [None]:
# bash:
# mongod --dbpath="${BASE_PATH}/gbif/gbif_mongo" --fork --logpath="${BASE_PATH}/mongo_logs/gbif_mongo.log"
# Check process status
# ps aux | grep mongod

from pymongo import MongoClient

# Connect to the MongoDB server
client = MongoClient("mongodb://localhost:27017/")

db = client["gbif"] 
collection_registry = db["registry"]  
collection_grscicoll_collection = db["grscicoll_collection"]  
collection_grscicoll_institution = db["grscicoll_institution"] 
collection_organization = db["organization"] 

# Download GRSciColl Collection

In [None]:
base_url = "https://api.gbif.org/v1/grscicoll/collection"
all_records_collection, all_logs_collection = fetch_gbif_iter(base_url, params=None, limit=500)

In [56]:
insert_records_to_mongo(
    collection_grscicoll_collection,
    all_records_collection,
    unique_key = "key"
)

Inserting records: 100%|██████████| 8928/8928 [00:03<00:00, 2948.26doc/s]


In [None]:
base_url = "https://api.gbif.org/v1/grscicoll/institution"
all_records_institution, all_logs_institution = fetch_gbif_iter(base_url, params=None, limit=500)

In [60]:
insert_records_to_mongo(
    collection_grscicoll_institution,
    all_records_institution,
    unique_key = "key"
)

Inserting records: 100%|██████████| 8832/8832 [00:03<00:00, 2890.41doc/s]


# Download organization

In [None]:
base_url = "https://api.gbif.org/v1/organization"
all_records_organization, all_logs_organization = fetch_gbif_iter(base_url, params=None, limit=500)

In [63]:
insert_records_to_mongo(
    collection_organization,
    all_records_organization,
    unique_key = "key"
)

Inserting records: 100%|██████████| 2980/2980 [00:01<00:00, 2762.60doc/s]


# Download occurrence registery

In [None]:
%%time
base_url = "https://api.gbif.org/v1/dataset"
all_records, all_logs = fetch_gbif_iter(base_url, params = None, limit=500)

MongoDB uses the `_id` field as a unique identifier for each document. 

In [5]:
collection.count_documents({})

49968

# Approach Summary

Marie Grosjean is the Data Adminstrator from GBIF. She developed an [NLP approach](https://data-blog.gbif.org/post/gbif-citizen-science/) to identify and automatically label datasets as citizen science (CS) using the metadata available via the [GBIF Dataset API](https://techdocs.gbif.org/en/openapi/v1/registry#/). Marie tagged these datasets in the API using a `machineTag` and there are [638 identified dataset matches](http://api.gbif.org/v1/dataset?machineTagNamespace=citizenScience.gbif.org).

We've noticed that the NLP model has failed to capture many citizen science datasets and also mislabeled some non-CS dataset, such as [AntWeb](https://www.gbif.org/dataset/13b70480-bd69-11dd-b15f-b8a03c50a862) ([JSON](https://api.gbif.org/v1/dataset/13b70480-bd69-11dd-b15f-b8a03c50a862)).

We developed an rule-based full-text search approach to increase the precision and coverage:

- **Scope of search:** 
    - Specific multilingual (English, Spanish, Portuguese, French) keywords (singular & plural) for "citizen" and "citizen science"
    - Known and verified CS publishers such as iNaturalist, Observation.org, etc.
    - Known and verified CS datasets
- **Preprocessing**
    - Remove all machineTags to prevent introducing results from Marie's NLP model's prior prediction
    - No additional text cleaning steps applied
- **Pros & Cons**
    - Pros: Fast, scalable, reproducible, deterministic
    - Cons: May introduce recall by missing datasets that do not contain the predefined search terms. May also introduce non-CS datasets that contains seach terms in the metadata.

In [None]:
%%time
base_url = "https://api.gbif.org/v1/dataset?machineTagNamespace=citizenScience.gbif.org"
all_records, all_logs = fetch_gbif_iter(base_url, params = None, limit=500)

In [5]:
key_values = [record['key'] for record in all_records if 'key' in record]
matching_df_nlp = spark.createDataFrame([(value,) for value in key_values], ["datasetKey"])

# Spark full-text search

In [2]:
from pyspark.sql import SparkSession

# Check if there is an active Spark session
spark= SparkSession.getActiveSession()

In [5]:
spark.stop()

In [6]:
spark = (SparkSession.builder
             .appName("GBIF EDA")
             .config("spark.executor.instances", "64")
             .config("spark.executor.memory", "75G")
             .config("spark.executor.cores", "12")
             .config("spark.sql.parquet.enableVectorizedReader", "false") 
             .getOrCreate())

In [3]:
spark.sparkContext.addPyFile(os.path.abspath("../helpers/text_search.py"))

from text_search import flatten_dict, full_text_search_rdd, flatten_list_to_string, extract_fields

In [7]:
publisher_citizen_sci = [
    "iNaturalist.org",
    "Observation.org",
    "naturgucker.de",
    "Questagame",
    "Pl@ntNet",
    "NatureMapr",
    "Citizen Science - ALA Website",
    "BioCollect",
    "Tweed Koala Sightings",
    "Koala Action Group",
    "TilapiaMap",
    "Blauwtipje.nl",
    "Great Koala Count 2",
    #"myFOSSIL eMuseum",
    "Superb Parrot Monitoring project",
    "SLU Artdatabanken",
    "Sibecocenter LLC"

]

publisher_citizen_sci_df = pd.DataFrame(fetch_publisher_key(publisher_citizen_sci))

In [8]:
datasetKey_citizen_sci = [
    "84a649ce-ff81-420d-9c41-aa1de59e3766", # Citizen Science - ALA Website
    "cca13f2c-0d2c-4c2f-93b9-4446c0cc1629"  # BugGuide published by United States Geological Survey
]

In [9]:
search_terms = [
    # 'citizen' in different languages, in singular and plural
    "citizen", "citizens",
    # ciencia ciencias
    "ciudadana", "ciudadano", # feminine masculine
    "ciudadanas", "ciudadanos",
    "cidadã", "cidadãs",
    "citoyenne", "citoyennes",
    
    #"inaturalist", "observation.org",
    
    # 'citizen science' in different languages, in singular and plural
    "citizen science", "citizen-science",
    "ciencia ciudadana", # Spanish
    "ciência cidadã",    # Portuguese
    "science citoyenne"  # French
]

search_terms.extend(list(pd.DataFrame(publisher_citizen_sci_df)["key"])) # keys of the known citizen science publishers
search_terms.extend(datasetKey_citizen_sci)                              # keys of citizen science datasets

In [10]:
%%time
rdd = spark.sparkContext.parallelize(
    list(collection_registry.find({}))
    # list(collection.find({}, {"machineTags": 0})) # Exclude the machineTag from the downstream full text search
)

rdd_filtered = rdd.map(lambda record: {k: v for k, v in record.items() if k != "machineTags"})

CPU times: user 8.54 s, sys: 1.82 s, total: 10.4 s
Wall time: 11.5 s


In [10]:
rdd = None

**Without `machineTags`**

In [11]:
%%time
term_counts = []
for term in search_terms:
    matching_rdd = full_text_search_rdd(rdd_filtered, term)
    count = matching_rdd.count() 
    print(f"Term: {term}, Count: {count}")
    term_counts.append({"Term": term, "Count": count})  

25/01/08 16:46:39 WARN TaskSetManager: Stage 0 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Term: citizen, Count: 353


25/01/08 16:46:43 WARN TaskSetManager: Stage 1 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.
25/01/08 16:46:46 WARN TaskSetManager: Stage 2 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: citizens, Count: 63


25/01/08 16:46:49 WARN TaskSetManager: Stage 3 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciudadana, Count: 36


25/01/08 16:46:51 WARN TaskSetManager: Stage 4 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciudadano, Count: 34


25/01/08 16:46:53 WARN TaskSetManager: Stage 5 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciudadanas, Count: 0


25/01/08 16:46:56 WARN TaskSetManager: Stage 6 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciudadanos, Count: 13


25/01/08 16:46:58 WARN TaskSetManager: Stage 7 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: cidadã, Count: 1


25/01/08 16:47:01 WARN TaskSetManager: Stage 8 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: cidadãs, Count: 0


25/01/08 16:47:03 WARN TaskSetManager: Stage 9 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: citoyenne, Count: 1925


25/01/08 16:47:06 WARN TaskSetManager: Stage 10 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: citoyennes, Count: 16


25/01/08 16:47:08 WARN TaskSetManager: Stage 11 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: citizen science, Count: 271


25/01/08 16:47:10 WARN TaskSetManager: Stage 12 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: citizen-science, Count: 8


25/01/08 16:47:13 WARN TaskSetManager: Stage 13 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciencia ciudadana, Count: 23


25/01/08 16:47:15 WARN TaskSetManager: Stage 14 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: ciência cidadã, Count: 0


25/01/08 16:47:18 WARN TaskSetManager: Stage 15 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: science citoyenne, Count: 1897


25/01/08 16:47:20 WARN TaskSetManager: Stage 16 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 0cab7ac5-e262-4cb5-b6ec-2f524c1f7975, Count: 0


25/01/08 16:47:23 WARN TaskSetManager: Stage 17 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 28eb1a3f-1c15-4a95-931a-4af90ecb574d, Count: 1


25/01/08 16:47:25 WARN TaskSetManager: Stage 18 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: c8d737e0-2ff8-42e8-b8fc-6b805d26fc5f, Count: 1


25/01/08 16:47:27 WARN TaskSetManager: Stage 19 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: bb646dff-a905-4403-a49b-6d378c2cf0d9, Count: 1


25/01/08 16:47:30 WARN TaskSetManager: Stage 20 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 17726825-3aa2-4fbc-b0e3-303d3fc5702f, Count: 1


25/01/08 16:47:32 WARN TaskSetManager: Stage 21 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: da86174a-a605-43a4-a5e8-53d484152cd3, Count: 2


25/01/08 16:47:35 WARN TaskSetManager: Stage 22 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: d9bea9d3-13a5-4768-bbf4-560b9aa95a73, Count: 2


25/01/08 16:47:37 WARN TaskSetManager: Stage 23 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: adc174cd-c752-4eee-9630-7c1209eb1c4a, Count: 0


25/01/08 16:47:39 WARN TaskSetManager: Stage 24 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 1d9d1d61-7871-44b4-9ceb-0bc2fe809d2e, Count: 26


25/01/08 16:47:42 WARN TaskSetManager: Stage 25 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 12379077-6d25-42da-a88d-5f4916e5bba9, Count: 1


25/01/08 16:47:44 WARN TaskSetManager: Stage 26 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: facf3dd1-4b89-4780-ba49-b5fdef24c7d3, Count: 1


25/01/08 16:47:47 WARN TaskSetManager: Stage 27 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 5efc1e1a-ba2f-471e-a8ed-e5849225e949, Count: 1


25/01/08 16:47:49 WARN TaskSetManager: Stage 28 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: f987a53f-bbe7-4a5e-b956-47187dbe3069, Count: 1


25/01/08 16:47:52 WARN TaskSetManager: Stage 29 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: f48f13a4-84e9-4b13-bf97-cc387d3c3229, Count: 1


25/01/08 16:47:54 WARN TaskSetManager: Stage 30 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 990eac22-e9a5-4499-baca-3e79e5cabd41, Count: 1


25/01/08 16:47:56 WARN TaskSetManager: Stage 31 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: b8323864-602a-4a7d-9127-bb903054e97d, Count: 1


25/01/08 16:47:59 WARN TaskSetManager: Stage 32 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 477dbd45-674f-4389-8952-7c9bbde10f68, Count: 3


25/01/08 16:48:01 WARN TaskSetManager: Stage 33 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.


Term: 84a649ce-ff81-420d-9c41-aa1de59e3766, Count: 1




Term: cca13f2c-0d2c-4c2f-93b9-4446c0cc1629, Count: 1
CPU times: user 230 ms, sys: 149 ms, total: 379 ms
Wall time: 1min 24s


                                                                                

In [11]:
# Perform search all terms
matching_rdd = full_text_search_rdd(rdd_filtered, search_terms)

match_count = matching_rdd.count()
#results = matching_rdd.collect()
match_count

25/02/05 15:31:11 WARN TaskSetManager: Stage 0 contains a task of very large size (1579 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

2378

In [12]:
%%time
fields_to_extract = ["_id", "publishingOrganizationKey"]
extract_fields_func = extract_fields(fields_to_extract)

matching_df = spark.createDataFrame(matching_rdd.map(extract_fields_func)).distinct()
matching_df.cache()

matching_df.count()

25/01/14 12:02:57 WARN TaskSetManager: Stage 2 contains a task of very large size (1061 KiB). The maximum recommended task size is 1000 KiB.

CPU times: user 15.3 ms, sys: 25.4 ms, total: 40.6 ms
Wall time: 6.51 s


                                                                                

2378

In [15]:
# Free memory
rdd_filtered = None 
matching_rdd = None

In [16]:
from pyspark.sql.types import StructField, StringType, StructType
rdd_grscicoll_institution_all = spark.sparkContext.parallelize(
    list(collection_grscicoll_institution.find({}))
)

fields_to_extract = [
    "_id", 
    "name",
    "code",
    "types",
    "institutionalGovernances",      # Instutional governance of a GrSciColl institution
    "disciplines",                   # Discipline of a GrSciColl institution. Accepts multiple values, for example
    "masterSource"                   # DATASET, ORGANIZATION
]

extract_fields_func = extract_fields(fields_to_extract, fields_to_flatten = ["types", "institutionalGovernances", "disciplines"])

schema = StructType([
    StructField("_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("code", StringType(), True),
    StructField("types", StringType(), True),
    StructField("institutionalGovernances", StringType(), True),
    StructField("disciplines", StringType(), True),  # Flattened to String
    StructField("masterSource", StringType(), True)
])

grscicoll_institution_df = spark.createDataFrame(
    rdd_grscicoll_institution_all.map(extract_fields_func),
    schema
)

grscicoll_institution_type_list = rdd_grscicoll_institution_all.map(extract_fields(["types"]))

In [18]:
grscicoll_institution_df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- code: string (nullable = true)
 |-- types: string (nullable = true)
 |-- institutionalGovernances: string (nullable = true)
 |-- disciplines: string (nullable = true)
 |-- masterSource: string (nullable = true)



In [25]:
check_sparsity(grscicoll_institution_df).show(truncate=False)

+------------------------+--------------------+
|column_name             |sparsity            |
+------------------------+--------------------+
|_id                     |0.0                 |
|name                    |0.0                 |
|code                    |0.004981884057971014|
|types                   |0.5180027173913043  |
|institutionalGovernances|0.9039855072463768  |
|disciplines             |0.9394248188405797  |
|masterSource            |0.0                 |
+------------------------+--------------------+



In [110]:
create_freq_rdd(grscicoll_institution_type_list, key="types")

[('UniversityCollege', 289),
 ('MuseumHerbariumPrivateNonProfit', 9),
 ('Herbarium', 3205),
 ('MedicalResearchInstitute', 7),
 ('OtherTypeResearchInstitutionBiorepository', 89),
 ('BiomedicalResearchInstitute', 10),
 ('Museum', 438),
 ('OtherInstitutionalType', 196),
 ('ZooAquarium', 10),
 ('BotanicalGarden', 63)]

In [17]:
rdd_grscicoll_collection_all = spark.sparkContext.parallelize(
    list(collection_grscicoll_collection.find({}))
)

fields_to_extract = [
    "_id", 
    "name",
    "code",
    "contentTypes",
    "preservationTypes",
    "institutionKey",      
    "institutionName",                 
    "institutionCode",
    "occurrenceCount"
]

extract_fields_func = extract_fields(fields_to_extract, fields_to_flatten = ["contentTypes", "preservationTypes"])

grscicoll_collection_df = spark.createDataFrame(
    rdd_grscicoll_collection_all.map(extract_fields_func)
)

grscicoll_collection_contentTypes_list = rdd_grscicoll_institution_all.map(extract_fields(["contentTypes"]))
grscicoll_collection_preservationTypes_list = rdd_grscicoll_institution_all.map(extract_fields(["preservationTypes"]))
rdd_grscicoll_collection_all = None

In [19]:
grscicoll_collection_df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- code: string (nullable = true)
 |-- contentTypes: string (nullable = true)
 |-- institutionCode: string (nullable = true)
 |-- institutionKey: string (nullable = true)
 |-- institutionName: string (nullable = true)
 |-- name: string (nullable = true)
 |-- occurrenceCount: long (nullable = true)
 |-- preservationTypes: string (nullable = true)



In [61]:
check_sparsity(grscicoll_collection_df).show(truncate=False)

+-----------------+--------------------+
|column_name      |sparsity            |
+-----------------+--------------------+
|_id              |0.0                 |
|code             |0.015568996415770609|
|contentTypes     |0.6937724014336918  |
|institutionCode  |0.02128136200716846 |
|institutionKey   |0.00739247311827957 |
|institutionName  |0.00739247311827957 |
|name             |0.0                 |
|occurrenceCount  |0.8538306451612904  |
|preservationTypes|0.7720654121863799  |
+-----------------+--------------------+



# Find match in occurrence

In [None]:
spark_df = spark.read.parquet(f"{BASE_PATH}/gbif/attributes/cols_of_interest")

In [19]:
filtered_df = (
    spark_df
    .join(
        broadcast(matching_df), 
        matching_df._id == spark_df.datasetKey,
        how = "inner"
    )
)

## Match count stats

In [21]:
filtered_df.select("datasetKey").distinct().count()

107

In [25]:
filtered_df.select("publisher").distinct().count()

                                                                                

58

In [23]:
filtered_df.select("source_id").distinct().count()

110323916

In [59]:
spark_df.groupBy("basisOfRecord").count().orderBy(desc("count")).show()

+-------------------+---------+
|      basisOfRecord|    count|
+-------------------+---------+
|  HUMAN_OBSERVATION|181826921|
| PRESERVED_SPECIMEN| 58611705|
|    MATERIAL_SAMPLE|  3272814|
|MACHINE_OBSERVATION|  3044865|
|    FOSSIL_SPECIMEN|  2001234|
|    LIVING_SPECIMEN|    99740|
|         OCCURRENCE|    92770|
|        OBSERVATION|    35199|
|  MATERIAL_CITATION|     1492|
+-------------------+---------+



In [24]:
filtered_df.groupBy("basisOfRecord").count().orderBy(desc("count")).show()

+-------------------+---------+
|      basisOfRecord|    count|
+-------------------+---------+
|  HUMAN_OBSERVATION|179199017|
|    MATERIAL_SAMPLE|   174405|
| PRESERVED_SPECIMEN|   136773|
|MACHINE_OBSERVATION|    48093|
|        OBSERVATION|    17002|
|    FOSSIL_SPECIMEN|    14087|
|         OCCURRENCE|     1859|
+-------------------+---------+



## Source ID multiple images subset

Check on subset source ID with multiple images

In [7]:
col_list = ["source_id", "publisher", "basisOfRecord", "datasetKey"]
source_id_with_multiple_uuids = (
    spark_df.groupBy(*col_list)
    .agg(
        F.countDistinct("uuid").alias("distinct_uuid_count"),
    )
    .filter(col("distinct_uuid_count") > 1)
)
#source_id_with_multiple_uuids.cache()
#source_id_with_multiple_uuids.count()

In [22]:
source_id_with_multiple_uuids.agg(sum("distinct_uuid_count").alias("sum_value")).show(truncate=False)

+---------+
|sum_value|
+---------+
|121343610|
+---------+



In [16]:
filtered_df = (
    source_id_with_multiple_uuids
    .join(
        broadcast(matching_df), 
        matching_df._id == source_id_with_multiple_uuids.datasetKey,
        how = "inner"
    )
)

In [17]:
filtered_df.select("source_id").distinct().count()

36404253

In [18]:
view_freq(filtered_df, "basisOfRecord", truncate=False)

+-------------------+--------+-------+
|basisOfRecord      |count   |bucket |
+-------------------+--------+-------+
|HUMAN_OBSERVATION  |36365160|10m+   |
|PRESERVED_SPECIMEN |26239   |10k-50k|
|MATERIAL_SAMPLE    |10847   |10k-50k|
|FOSSIL_SPECIMEN    |1655    |1k-5k  |
|MACHINE_OBSERVATION|232     |101-500|
|OCCURRENCE         |117     |101-500|
|OBSERVATION        |3       |1-10   |
+-------------------+--------+-------+



In [None]:
(
    spark_df
    .join(
        source_id_with_multiple_uuids.filter(cond_camera_trap),
        on = "source_id",
        how = "inner"
    )
    .count()
)

In [28]:
source_id_with_multiple_uuids.groupBy("basisOfRecord").count().orderBy(desc("count")).show()

+-------------------+--------+
|      basisOfRecord|   count|
+-------------------+--------+
|  HUMAN_OBSERVATION|37227100|
| PRESERVED_SPECIMEN| 3489859|
|    FOSSIL_SPECIMEN|  276909|
|MACHINE_OBSERVATION|  235632|
|    MATERIAL_SAMPLE|  212826|
|    LIVING_SPECIMEN|   19681|
|        OBSERVATION|    3992|
|         OCCURRENCE|    2998|
|  MATERIAL_CITATION|     254|
+-------------------+--------+



In [29]:
filtered_df.groupBy("basisOfRecord").count().orderBy(desc("count")).show()

+-------------------+--------+
|      basisOfRecord|   count|
+-------------------+--------+
|  HUMAN_OBSERVATION|36365160|
| PRESERVED_SPECIMEN|   26239|
|    MATERIAL_SAMPLE|   10847|
|    FOSSIL_SPECIMEN|    1655|
|MACHINE_OBSERVATION|     232|
|         OCCURRENCE|     117|
|        OBSERVATION|       3|
+-------------------+--------+



### HUMAN_OBSERVATION

Most of the `HUMAN_OBSERVATION` are identified as citizen science

In [178]:
human_observation_unmatched_df = (
    spark_df
    .filter(col("basisOfRecord")=="HUMAN_OBSERVATION")
    .join(
        broadcast(matching_df), 
        matching_df._id == spark_df.datasetKey,
        how = "left_anti"
    )
)

In [180]:
view_freq(
    human_observation_unmatched_df,
    ["publisher", "datasetKey"],
    30,
    truncate = False
)
create_freq(human_observation_unmatched_df, ["publisher", "datasetKey"]).count()

+-----------------------------------------------------+------------------------------------+-------+---------+
|publisher                                            |datasetKey                          |count  |bucket   |
+-----------------------------------------------------+------------------------------------+-------+---------+
|Xeno-canto Foundation for Nature Sounds              |b1047888-ae52-4179-9dd5-5448ea342a24|1404870|1m-5m    |
|United States Geological Survey                      |cca13f2c-0d2c-4c2f-93b9-4446c0cc1629|492239 |100k-500k|
|Natural History Museum Rotterdam                     |6db2a74e-98c5-4be3-ae30-3ec8dc68b0f4|284982 |100k-500k|
|Vermont Center for Ecostudies                        |cf3bdc30-370c-48d3-8fff-b587a39d72d6|264469 |100k-500k|
|Senckenberg                                          |e5774d90-9f01-42bb-a747-32331be82b18|74534  |50k-100k |
|India Biodiversity Portal                            |c6b86c40-ff71-4e5e-902c-111f400d0d56|66174  |50k-100k |
|

575

### MATERIAL_SAMPLE

In [36]:
view_freq(
    filtered_df.filter(col("basisOfRecord").isin(["MATERIAL_SAMPLE"])),
    ["publisher", "datasetKey"],
    truncate = False
)

+-----------------------------------------------------------+------------------------------------+-----+-------+
|publisher                                                  |datasetKey                          |count|bucket |
+-----------------------------------------------------------+------------------------------------+-----+-------+
|Natural History Museum of Denmark                          |cb8a261a-66cb-4068-809e-9e773359bb30|10846|10k-50k|
|Miljøstyrelsen / The Danish Environmental Protection Agency|963a6b96-4d22-4428-86e4-afee52cf4a8e|1    |1-10   |
+-----------------------------------------------------------+------------------------------------+-----+-------+



## Exclude Museum Specimens & GRSciColl

In [20]:
basisOfRecord_specimen_list=["PRESERVED_SPECIMEN", "FOSSIL_SPECIMEN", "MATERIAL_SAMPLE", "LIVING_SPECIMEN", "MATERIAL_CITATION"]

### Find GRSciColl Dataset Collection Match

In [21]:
dataset_collection_match_df = (
    filtered_df
    .join(
        broadcast(grscicoll_collection_df),
        filtered_df.collectionCode == grscicoll_collection_df.code,
        how="inner"
    )
    .filter(
        col("basisOfRecord").isin(basisOfRecord_specimen_list)
    )
    .groupBy(["basisOfRecord", "publisher", "datasetKey"])
    .count()
    .orderBy(desc("count"))
    .distinct()
)
dataset_collection_match_df.show(truncate=False)

                                                                                

+------------------+--------------------------------------------------------+------------------------------------+-----+
|basisOfRecord     |publisher                                               |datasetKey                          |count|
+------------------+--------------------------------------------------------+------------------------------------+-----+
|PRESERVED_SPECIMEN|Bailey-Matthews National Shell Museum                   |417f4d21-959b-4773-90a2-c38d1822d873|3321 |
|FOSSIL_SPECIMEN   |Virginia Museum of Natural History                      |66ca5555-102d-4d50-baaa-a949a2846666|3673 |
|PRESERVED_SPECIMEN|Mohonk Preserve                                         |ffe1030d-42d1-4bb5-8400-1123cc859a5a|6270 |
|PRESERVED_SPECIMEN|Virginia Museum of Natural History                      |4cb3d289-8d1e-4831-9c01-df02084b999a|835  |
|FOSSIL_SPECIMEN   |Virginia Museum of Natural History                      |212a4207-f330-49e7-8c84-51dccf3f1a8b|41652|
|PRESERVED_SPECIMEN|Virginia Mus

### Find GRSciColl Dataset Institute Match

In [22]:
dataset_institution_match_df = (
    filtered_df
    .join(
        broadcast(grscicoll_institution_df),
        filtered_df.institutionCode == grscicoll_institution_df.code,
        how="inner"
    )
    .filter(
        col("basisOfRecord").isin(basisOfRecord_specimen_list)
    )
    .groupBy(["basisOfRecord", "publisher", "datasetKey"])
    .count()
    .orderBy(desc("count"))
    .distinct()
)
dataset_institution_match_df.show(truncate=False)



+------------------+--------------------------------------------------------+------------------------------------+------+
|basisOfRecord     |publisher                                               |datasetKey                          |count |
+------------------+--------------------------------------------------------+------------------------------------+------+
|PRESERVED_SPECIMEN|Herbarium du CRSN-Lwiro                                 |bcffbc19-b28c-42ae-995a-ff852e158ce5|4969  |
|PRESERVED_SPECIMEN|National Museum of Natural History, Luxembourg          |b6dc7422-a09d-4986-b5ac-e8f6f50a9732|99    |
|PRESERVED_SPECIMEN|Butler University, Friesner Herbarium                   |900c9d31-e4a7-4e75-96a4-b701fddc1d0b|312   |
|FOSSIL_SPECIMEN   |Virginia Museum of Natural History                      |66ca5555-102d-4d50-baaa-a949a2846666|3673  |
|MATERIAL_SAMPLE   |Natural History Museum of Denmark                       |cb8a261a-66cb-4068-809e-9e773359bb30|174401|
|PRESERVED_SPECIMEN|Butl

                                                                                

In [23]:
dataset_grscicoll_match_df = dataset_institution_match_df.union(dataset_collection_match_df).distinct().orderBy(desc("count"))
dataset_grscicoll_match_df.show(50, truncate=False)

                                                                                

+------------------+--------------------------------------------------------+------------------------------------+------+
|basisOfRecord     |publisher                                               |datasetKey                          |count |
+------------------+--------------------------------------------------------+------------------------------------+------+
|MATERIAL_SAMPLE   |Natural History Museum of Denmark                       |cb8a261a-66cb-4068-809e-9e773359bb30|174401|
|FOSSIL_SPECIMEN   |Virginia Museum of Natural History                      |212a4207-f330-49e7-8c84-51dccf3f1a8b|41652 |
|FOSSIL_SPECIMEN   |Virginia Museum of Natural History                      |212a4207-f330-49e7-8c84-51dccf3f1a8b|10413 |
|PRESERVED_SPECIMEN|Mohonk Preserve                                         |ffe1030d-42d1-4bb5-8400-1123cc859a5a|6270  |
|PRESERVED_SPECIMEN|Herbarium du CRSN-Lwiro                                 |bcffbc19-b28c-42ae-995a-ff852e158ce5|4969  |
|FOSSIL_SPECIMEN   |Virg

### Find GRSciColl Records Match

In [24]:
# Collection 
record_collection_match_df = (
    filtered_df
    .join(
        broadcast(grscicoll_collection_df),
        filtered_df.collectionCode == grscicoll_collection_df.code,
        how="inner"
    )
    .filter(
        col("basisOfRecord").isin(basisOfRecord_specimen_list)
    )
    .select("uuid")
    .distinct()
)

# Institution
record_institution_match_df = (
    filtered_df
    .join(
        broadcast(grscicoll_institution_df),
        filtered_df.institutionCode == grscicoll_institution_df.code,
        how="inner"
    )
    .filter(
        col("basisOfRecord").isin(basisOfRecord_specimen_list)
    )
    .select("uuid")
    .distinct()
)

record_specimen_match_df = record_collection_match_df.union(record_institution_match_df).distinct()

### Remove Matched Datasets & Records

In [25]:
filtered_df_x_grscicoll = (
    filtered_df
    .join(
        broadcast(record_specimen_match_df),
        on = "uuid",
        how="left_anti"
    )
    .join(
        broadcast(dataset_grscicoll_match_df.select("datasetKey")),
        on = "datasetKey",
        how = "left_anti"
    )
)

In [26]:
view_freq(filtered_df_x_grscicoll, "basisOfRecord")



+-------------------+---------+---------+
|basisOfRecord      |count    |bucket   |
+-------------------+---------+---------+
|HUMAN_OBSERVATION  |179198863|10m+     |
|PRESERVED_SPECIMEN |122674   |100k-500k|
|MACHINE_OBSERVATION|48093    |10k-50k  |
|OBSERVATION        |17002    |10k-50k  |
|OCCURRENCE         |1859     |1k-5k    |
|MATERIAL_SAMPLE    |4        |1-10     |
|FOSSIL_SPECIMEN    |1        |1-10     |
+-------------------+---------+---------+



                                                                                

In [27]:
filtered_df_x_grscicoll.count()

                                                                                768]]]

179388496

# Generate Lookup Table for Multi-Images Subset

In [2]:
from pyspark.sql import SparkSession

# Check if there is an active Spark session
spark= SparkSession.getActiveSession()

In [None]:
(
    filtered_df_x_grscicoll
    .repartition(10)
    .write
    .mode("overwrite")
    .parquet(f"{BASE_PATH}/gbif/attributes/occurrence_citizen_science")
)

                                                                                / 768]

In [None]:
filtered_df_x_grscicoll = spark.read.parquet(f"{BASE_PATH}/gbif/attributes/occurrence_citizen_science")
lookup_tbl = spark.read.parquet(f"{BASE_PATH}/gbif/lookup_tables/2024-05-01/lookup_tables")

In [8]:
lookup_tbl_citizen_science = (
    filtered_df_x_grscicoll
    .join(source_id_with_multiple_uuids.select("source_id"), on="source_id", how="inner")
    .select(["uuid", "source_id", "basisOfRecord", "publisher", "datasetKey", "scientificName", "taxonRank"] + COLS_TAXONOMIC)
    .join(lookup_tbl, on="uuid", how="inner")
)

In [None]:
spark_df = spark.read.parquet(f"{BASE_PATH}/gbif/attributes/cols_of_interest")

filtered_df = (
    spark_df
    .join(
        filtered_df_x_grscicoll.select("uuid"),
        on = "uuid",
        how = "inner"
    )
)

In [7]:
from pyspark.sql.functions import col, when, mean, sum as spark_sum

# Generate boolean columns and cast to integers for aggregation
spark_df_with_flags = filtered_df.withColumn(
    "is_taxon_higher_rank", 
    when(col("issue").contains("TAXON_MATCH_HIGHERRANK"), 1).otherwise(0)
).withColumn(
    "is_taxon_match_none", 
    when(col("issue").contains("TAXON_MATCH_NONE"), 1).otherwise(0)
).withColumn(
    "is_taxon_match_fuzzy", 
    when(col("issue").contains("TAXON_MATCH_FUZZY"), 1).otherwise(0)
).withColumn(
    "is_any", 
    when((col("issue").contains("TAXON_MATCH_FUZZY")) | (col("issue").contains("TAXON_MATCH_NONE")) | (col("issue").contains("TAXON_MATCH_HIGHERRANK")), 1).otherwise(0)
)

# Calculate sum and mean for each numeric column
result_df = spark_df_with_flags.agg(
    spark_sum("is_taxon_higher_rank").alias("sum_taxon_higher_rank"),
    mean("is_taxon_higher_rank").alias("mean_taxon_higher_rank"),
    spark_sum("is_taxon_match_none").alias("sum_taxon_match_none"),
    mean("is_taxon_match_none").alias("mean_taxon_match_none"),
    spark_sum("is_taxon_match_fuzzy").alias("sum_taxon_match_fuzzy"),
    mean("is_taxon_match_fuzzy").alias("mean_taxon_match_fuzzy"),
    spark_sum("is_any").alias("sum_is_any"),
    mean("is_any").alias("mean_is_any")
)

result_df.show(truncate=False)

                                                                                

+---------------------+----------------------+--------------------+---------------------+---------------------+----------------------+----------+--------------------+
|sum_taxon_higher_rank|mean_taxon_higher_rank|sum_taxon_match_none|mean_taxon_match_none|sum_taxon_match_fuzzy|mean_taxon_match_fuzzy|sum_is_any|mean_is_any         |
+---------------------+----------------------+--------------------+---------------------+---------------------+----------------------+----------+--------------------+
|2383044              |0.013284263222765411  |81029               |4.516956315860968E-4 |690920               |0.003851529030044379  |3154993   |0.017587487884395885|
+---------------------+----------------------+--------------------+---------------------+---------------------+----------------------+----------+--------------------+



## N_MAX_FILES: 100

In [23]:
N_MAX_FILES = 100

unique_paths = lookup_tbl_citizen_science.select("path").distinct()
window_spec = Window.orderBy("path")
unique_paths_with_row = unique_paths.withColumn(
    "row_number", row_number().over(window_spec)
)

grouped_paths = unique_paths_with_row.withColumn(
    "group_id", floor((col("row_number") - 1) / N_MAX_FILES)
).drop("row_number")

result_lookup_tbl = (
    lookup_tbl_citizen_science
    .join(grouped_paths, on="path", how="left")
    .select(["uuid", "source_id", "basisOfRecord", "publisher", "datasetKey", "scientificName", "taxonRank"] + COLS_TAXONOMIC+ ["path", "group_id"])
    .repartition(1, "group_id")
)

In [None]:
(
    result_lookup_tbl
    .write.partitionBy("group_id").mode("overwrite")
    .parquet(f"{BASE_PATH}/gbif/lookup_tables/2024-05-01/lookup_multi_images_citizen_science")
)

In [24]:
result_lookup_tbl.printSchema()

root
 |-- uuid: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- basisOfRecord: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- datasetKey: string (nullable = true)
 |-- scientificName: string (nullable = true)
 |-- taxonRank: string (nullable = true)
 |-- kingdom: string (nullable = true)
 |-- phylum: string (nullable = true)
 |-- class: string (nullable = true)
 |-- order: string (nullable = true)
 |-- family: string (nullable = true)
 |-- genus: string (nullable = true)
 |-- species: string (nullable = true)
 |-- path: string (nullable = true)
 |-- group_id: long (nullable = true)



## N_MAX_FILES: 300

In [9]:
N_MAX_FILES = 300

unique_paths = lookup_tbl_citizen_science.select("path").distinct()
window_spec = Window.orderBy("path")
unique_paths_with_row = unique_paths.withColumn(
    "row_number", row_number().over(window_spec)
)

grouped_paths = unique_paths_with_row.withColumn(
    "group_id", floor((col("row_number") - 1) / N_MAX_FILES)
).drop("row_number")

result_lookup_tbl = (
    lookup_tbl_citizen_science
    .join(grouped_paths, on="path", how="left")
    .select(["uuid", "source_id", "basisOfRecord", "publisher", "datasetKey", "scientificName", "taxonRank"] + COLS_TAXONOMIC+ ["path", "group_id"])
    .repartition(1, "group_id")
)

In [None]:
(
    result_lookup_tbl
    .write.partitionBy("group_id").mode("overwrite")
    .parquet(f"{BASE_PATH}/gbif/lookup_tables/2024-05-01/lookup_multi_images_citizen_science_300")
)

                                                                                

In [4]:
def process_group(spark, base_input_path, base_output_path, group_id):

    # Construct paths for the current group
    group_input_path = f"{base_input_path}/group_id={group_id}"
    group_output_path = f"{base_output_path}/group_id={group_id}"


    filtered_df = spark.read.parquet(group_input_path)
    unique_paths = [row['path'] for row in filtered_df.select("path").distinct().collect()]
    
    # Read the combined DataFrame from unique paths
    combined_df = spark.read.parquet(*unique_paths).select(["uuid", "original_size", "resized_size", "image"])

    result_df = combined_df.join(broadcast(filtered_df), on="uuid", how="inner")
    result_df = result_df.dropDuplicates(["uuid"]).repartition(100)
    
    # Write the result to the output path
    result_df.write.mode("overwrite").parquet(group_output_path)
    print(f"Processed and saved results for group_id={group_id} to {group_output_path}")


In [None]:
base_input_path = f"{BASE_PATH}/gbif/lookup_tables/2024-05-01/lookup_multi_images_citizen_science"
base_output_path = f"{BASE_PATH}/gbif/image_lookup/multi_images_citizen_science"

In [3]:
spark.stop()

In [6]:
def init_spark() -> SparkSession:
    spark = (SparkSession.builder
             .appName("GBIF EDA")
             .config("spark.executor.instances", "80")
             .config("spark.executor.memory", "75G")
             .config("spark.executor.cores", "12")
             .config("spark.sql.parquet.enableVectorizedReader", "false") 
             .getOrCreate())
    
    return spark

spark = init_spark()

In [None]:
process_group(spark, base_input_path, base_output_path, "0")

# Comparison & Summary

In [40]:
filtered_df_nlp = (
    spark_df
    .join(
        broadcast(matching_df_nlp),
        on = "datasetKey",
        how="inner"
    )
)
(
    create_freq(spark_df, "basisOfRecord").selectExpr("basisOfRecord", "count AS n_occurrence")
    .join(
        create_freq(filtered_df_nlp, "basisOfRecord").selectExpr("basisOfRecord", "count AS n_CS_nlp"),
        on = "basisOfRecord",
        how="left"
    )
    .join(
        create_freq(filtered_df, "basisOfRecord").selectExpr("basisOfRecord", "count AS n_CS_matched"),
        on = "basisOfRecord",
        how="left"
    )
    .join(
        create_freq(filtered_df_x_grscicoll, "basisOfRecord").selectExpr("basisOfRecord", "count AS n_CS_matched_x_grscicoll"),
        on = "basisOfRecord",
        how="left"
    )
).show(truncate=False)

+-------------------+------------+---------+------------+------------------------+
|basisOfRecord      |n_occurrence|n_CS_nlp |n_CS_matched|n_CS_matched_x_grscicoll|
+-------------------+------------+---------+------------+------------------------+
|HUMAN_OBSERVATION  |181826921   |178283233|179199017   |179199017               |
|PRESERVED_SPECIMEN |58611705    |384164   |136773      |122678                  |
|MATERIAL_CITATION  |1492        |null     |null        |null                    |
|OBSERVATION        |35199       |17002    |17002       |17002                   |
|MACHINE_OBSERVATION|3044865     |93040    |48093       |48093                   |
|MATERIAL_SAMPLE    |3272814     |4        |174405      |4                       |
|OCCURRENCE         |92770       |1995     |1859        |1859                    |
|LIVING_SPECIMEN    |99740       |36       |null        |null                    |
|FOSSIL_SPECIMEN    |2001234     |1        |14087       |1                       |
+---

In [41]:
print(f"Occurrence dataset count: {spark_df.select('datasetKey').distinct().count()}")

print(f"NLP dataset count: {filtered_df_nlp.select('datasetKey').distinct().count()}")

print(f"Full-text search dataset count: {filtered_df.select('datasetKey').distinct().count()}")

print(f"Full-text search dataset count (exclude grscicoll): {filtered_df_x_grscicoll.select('datasetKey').distinct().count()}")

Occurrence dataset count: 1873
NLP dataset count: 63
Full-text search dataset count: 107
Full-text search dataset count (exclude grscicoll): 93


In [42]:
print(f"Occurrence `source_id` count: {spark_df.select('source_id').distinct().count()}")

print(f"NLP `source_id` count: {filtered_df_nlp.select('source_id').distinct().count()}")

print(f"Full-text search `source_id` count: {filtered_df.select('source_id').distinct().count()}")

print(f"Full-text search `source_id` count (exclude grscicoll): {filtered_df_x_grscicoll.select('source_id').distinct().count()}")

Occurrence `source_id` count: 169112381
NLP `source_id` count: 109356415
Full-text search `source_id` count: 110323916
Full-text search `source_id` count (exclude grscicoll): 110286547


| **Type**                                | **Dataset Count** | **`source_id` Count** |
|-----------------------------------------|------------------:|----------------------:|
| Occurrence                              |              1873 |            169,112,381 |
| NLP                                     |                63 |            109,356,415 |
| Full-text search                        |               107 |            110,323,916 |
| Full-text search (exclude grscicoll)    |                93 |            110,286,547 |


In [183]:
view_freq(
    filtered_df_x_grscicoll.filter(
        (col("basisOfRecord") != "HUMAN_OBSERVATION")
    ),
    ["publisher", "basisOfRecord"],
    truncate = False
)

+-----------------------------------------------------------+-------------------+-----+--------+
|publisher                                                  |basisOfRecord      |count|bucket  |
+-----------------------------------------------------------+-------------------+-----+--------+
|SLU Artdatabanken                                          |PRESERVED_SPECIMEN |50220|50k-100k|
|Finnish Biodiversity Information Facility                  |MACHINE_OBSERVATION|47024|10k-50k |
|The Norwegian Biodiversity Information Centre (NBIC)       |PRESERVED_SPECIMEN |39004|10k-50k |
|ClimateWatch                                               |OBSERVATION        |17002|10k-50k |
|University of Texas at Austin, Biodiversity Collections    |PRESERVED_SPECIMEN |11855|10k-50k |
|Danish Mycological Society                                 |PRESERVED_SPECIMEN |11300|10k-50k |
|Finnish Biodiversity Information Facility                  |PRESERVED_SPECIMEN |9257 |5k-10k  |
|myFOSSIL eMuseum             

In [6]:
filtered_df_x_grscicoll.select("source_id").distinct().count()

                                                                                

110286389