In [23]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size","16g") \
    .getOrCreate()

pd.set_option('display.max_columns', 500)

In [24]:
#!wget "https://obis-datasets.ams3.digitaloceanspaces.com/exports/obis_20231025.parquet"
#!wget "https://hosted-datasets.gbif.org/datasets/gisd_2011-11-20.zip"
#!wget "https://hosted-datasets.gbif.org/datasets/iucn/iucn-2022-1.zip"


df = spark.read \
    .parquet('/Users/helloworld/Downloads/obis_20231025.parquet')

In [25]:
from dwca.read import DwCAReader

with DwCAReader('/Users/helloworld/Downloads/gisd_2011-11-20.zip') as dwca:
   print("Core data file is: {}".format(dwca.descriptor.core.file_location)) # => 'taxon.txt'

   invasive_df = dwca.pd_read(dwca_red.descriptor.core.file_location, parse_dates=True)


with DwCAReader('/Users/helloworld/Downloads/iucn-2022-1.zip') as dwca_red:
   print("Core data file is: {}".format(dwca_red.descriptor.core.file_location)) # => 'taxon.txt'

   redlist_df = dwca_red.pd_read(dwca_red.descriptor.core.file_location, parse_dates=True)

redlist_df = redlist_df.dropna(subset=["scientificNameAuthorship"])
redlist_df['scientificName'] = redlist_df.apply(lambda row: row['scientificName'].replace(row['scientificNameAuthorship'], '').strip(), axis=1)
redlist_df.tail(20)

Core data file is: taxon.txt
Core data file is: taxon.txt


Unnamed: 0,id,scientificName,kingdom,phylum,class,order,family,genus,specificEpithet,scientificNameAuthorship,taxonRank,infraspecificEpithet,taxonomicStatus,acceptedNameUsageID,bibliographicCitation,references
254562,22704840,Melanodryas vittata,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,PETROICIDAE,Melanodryas,vittata,"(Quoy & Gaimard, 1830)",species,,accepted,22704840,BirdLife International 2022. Melanodryas vitta...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254563,22704507,Dasyornis brachypterus,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,DASYORNITHIDAE,Dasyornis,brachypterus,"(Latham, 1801)",species,,accepted,22704507,BirdLife International 2022. Dasyornis brachyp...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254564,22709303,Humblotia flavirostris,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,MUSCICAPIDAE,Humblotia,flavirostris,"Milne-Edwards & Oustalet, 1885",species,,accepted,22709303,BirdLife International 2022. Humblotia flaviro...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254565,103823897,Hypsipetes parvirostris,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,PYCNONOTIDAE,Hypsipetes,parvirostris,"Milne-Edwards & Oustalet, 1885",species,,accepted,103823897,BirdLife International 2022. Hypsipetes parvir...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254566,22703608,Atrichornis rufescens,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,ATRICHORNITHIDAE,Atrichornis,rufescens,"(Ramsay, 1867)",species,,accepted,22703608,BirdLife International 2022. Atrichornis rufes...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254567,22703612,Atrichornis clamosus,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,ATRICHORNITHIDAE,Atrichornis,clamosus,"(Gould, 1844)",species,,accepted,22703612,BirdLife International 2022. Atrichornis clamo...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254568,22703776,Stipiturus mallee,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,MALURIDAE,Stipiturus,mallee,"Campbell, 1908",species,,accepted,22703776,BirdLife International 2022. Stipiturus mallee...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254569,22703786,Amytornis dorotheae,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,MALURIDAE,Amytornis,dorotheae,"(Mathews, 1914)",species,,accepted,22703786,BirdLife International 2022. Amytornis dorothe...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254570,22704493,Pardalotus quadragintus,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,PARDALOTIDAE,Pardalotus,quadragintus,"Gould, 1838",species,,accepted,22704493,BirdLife International 2022. Pardalotus quadra...,https://apiv3.iucnredlist.org/api/v3/taxonredi...
254571,22704751,Aphelocephala pectoralis,ANIMALIA,CHORDATA,AVES,PASSERIFORMES,ACANTHIZIDAE,Aphelocephala,pectoralis,"(Gould, 1871)",species,,accepted,22704751,BirdLife International 2022. Aphelocephala pec...,https://apiv3.iucnredlist.org/api/v3/taxonredi...


In [27]:
# Assuming 'df' is your original Spark DataFrame and 'core_df' contains your invasive species data
obis_columns = ["marine", "class", "genus", "species", "variety", "individualCount", "eventDate", "eventTime", "year", "month", "day", "country", "locality", "decimalLongitude", "decimalLatitude"]

# Select the relevant columns and repartition
newdf = df.select(obis_columns).repartition(300)
newdf.registerTempTable('obis_data')

# Register the invasive species DataFrame as a temporary table
# Assuming 'core_df' is a Pandas DataFrame with a column 'scientificName' for invasive species
invasive_species = spark.createDataFrame(invasive_df).withColumnRenamed('scientificName', 'species')
invasive_species.registerTempTable('invasive_species')

redlist_species = spark.createDataFrame(redlist_df).withColumnRenamed('scientificName', 'species')
redlist_species.registerTempTable('redlist_species')

# Use SQL to filter 'obis_data', join with 'invasive_species', and sum 'individualCount'
aggregated_invasive_counts = spark.sql("""
    SELECT obis.species, sum(obis.individualCount) as total_individualCount
    FROM obis_data obis
    INNER JOIN invasive_species inv ON obis.species = inv.species
    WHERE obis.individualCount > 0 AND country = "Mexico"
    GROUP BY obis.species
    ORDER BY total_individualCount DESC
""")

aggregated_redlist_counts = spark.sql("""
    SELECT obis.species, sum(obis.individualCount) as total_individualCount
    FROM obis_data obis
    INNER JOIN redlist_species red ON obis.species = red.species
    WHERE obis.individualCount > 0 AND country = "Mexico"
    GROUP BY obis.species
    ORDER BY total_individualCount DESC
""")

# Show the result
aggregated_invasive_df = aggregated_invasive_counts.toPandas()
aggregated_redlist_df = aggregated_redlist_counts.toPandas()

24/02/19 18:10:37 WARN TaskSetManager: Stage 99 contains a task of very large size (8050 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [29]:
aggregated_invasive_df

Unnamed: 0,species,total_individualCount
0,Xiphophorus hellerii,763.0
1,Oreochromis mossambicus,753.0
2,Gambusia affinis,717.0
3,Poecilia reticulata,308.0
4,Oncorhynchus mykiss,123.0
5,Cyprinus carpio,82.0
6,Tubastraea coccinea,20.0
7,Alitta succinea,18.0
8,Oreochromis aureus,12.0
9,Carassius auratus,10.0


In [30]:
aggregated_redlist_df

Unnamed: 0,species,total_individualCount
0,Triphoturus mexicanus,11726.0
1,Archosargus rhomboidalis,6359.0
2,Eucinostomus gula,5625.0
3,Cyclothone acclinidens,5126.0
4,Harengula thrissina,4895.0
...,...,...
1945,Photonectes albipennis,1.0
1946,Benthodesmus simonyi,1.0
1947,Benthalbella dentata,1.0
1948,Lophiodes monodi,1.0
