# SDM: Generate knowledge graph CSVs

In [1]:
import pandas as pd
import datetime as dt
import pyarrow as pa
import pyarrow.fs
import pyarrow.parquet
from itertools import count
from hdfs import InsecureClient

In [2]:
# Get datasources
datasource_df = pd.read_csv("data/datasource.csv")
datasource_df

Unnamed: 0,id,baseurl,label,comment
0,ds_0,https://api.gbif.org/,gbif,GBIF—the Global Biodiversity Information Facil...
1,ds_1,https://neo.gsfc.nasa.gov/archive/geotiff.float/,neo,One of the best places to study Earth is from ...
2,ds_2,https://biogeo.ucdavis.edu/data/worldclim/,worldclim,WorldClim is a database of high spatial resolu...


In [3]:
# Setup HDFS client
client = InsecureClient("http://localhost:9870", user="bdm")

# List all existing files (file.csv: id,url,format,version,timestamp,datasource,label,comment)
fileIdGen = ("file_" + str(c) for c in count())
persistent_dir = "/user/bdm/persistent/"
files = []

# GBIF
gbif_ds = datasource_df[datasource_df["label"] == "gbif"].id.to_list()[0]
gbif_version = "1"
gbif_format = "PARQUET"
gbif_filename = "occurrence"
gbif_timestamps_dir = persistent_dir + "gbif/{}/".format(gbif_version)
gbif_timestamps = client.list(gbif_timestamps_dir)
for ts in gbif_timestamps:
    for download_key in client.list(gbif_timestamps_dir + ts):
        files.append({
            "id": fileIdGen.__next__(),
            "url": gbif_timestamps_dir + ts + "/" + download_key + "/" + gbif_filename + ".parquet",
            "format": gbif_format,
            "version": gbif_version,
            "timestamp": dt.datetime.strptime(ts, '%Y-%m-%d_%H-%M-%S'),
            "datasource": gbif_ds,
            "label": gbif_filename,
            "comment": "Download key: " + download_key,
        })
        
# NEO
neo_ds = datasource_df[datasource_df["label"] == "neo"].id.to_list()[0]
neo_version = None
neo_format = "PARQUET"
neo_datasets = client.list(persistent_dir + "neo/")
for ds in neo_datasets:
    neo_timestamps_dir = persistent_dir + "neo/" + ds + "/"
    neo_timestamps = client.list(neo_timestamps_dir)
    for ts in neo_timestamps:
        for filename in client.list(neo_timestamps_dir + ts):
            if ds == 'SRTM_RAMP2_TOPO':
                raster_date = dt.datetime.strptime(filename, '%Y.pq')
            else:
                raster_date = dt.datetime.strptime(filename, '%Y-%m.pq')
            files.append({
                "id": fileIdGen.__next__(),
                "url": neo_timestamps_dir + ts + "/" + filename,
                "format": neo_format,
                "version": neo_version,
                "timestamp": dt.datetime.strptime(ts, '%Y-%m-%d_%H-%M-%S'),
                "datasource": neo_ds,
                "label": "{} ({})".format(ds, raster_date.strftime("%Y-%m")),
                "comment": "Date: " + str(raster_date),
            })

# WorldClim
worldclim_ds = datasource_df[datasource_df["label"] == "worldclim"].id.to_list()[0]
worldclim_version = "2.1"
worldclim_format = "PARQUET"
worldclim_timestamps_dir = persistent_dir + "worldclim/{}/".format(worldclim_version)
worldclim_timestamps = client.list(worldclim_timestamps_dir)
for ts in worldclim_timestamps:
    for res in client.list(worldclim_timestamps_dir + ts):
        for var in client.list(worldclim_timestamps_dir + ts + "/" + res):
            for filename in client.list(worldclim_timestamps_dir + ts + "/" + res + "/" + var):
                if filename.endswith(".pq"):
                    worldclim_format = "PARQUET"
                elif filename.endswith(".tif"):
                    worldclim_format = "TIF"
                files.append({
                    "id": fileIdGen.__next__(),
                    "url": worldclim_timestamps_dir + ts + "/" + res + "/" + var + "/" + filename,
                    "format": worldclim_format,
                    "version": worldclim_version,
                    "timestamp": dt.datetime.strptime(ts, '%Y-%m-%d_%H-%M-%S'),
                    "datasource": worldclim_ds,
                    "label": "{} ({})".format(var, filename.split('.')[0]),
                    "comment": "Resolution: {}".format(res),
                })
      
files_df = pd.DataFrame(data=files)
files_df.to_csv("data/file.csv", index=False, header=True)
files_df

Unnamed: 0,id,url,format,version,timestamp,datasource,label,comment
0,file_0,/user/bdm/persistent/gbif/1/2022-05-11_13-04-4...,PARQUET,1,2022-05-11 13:04:40,ds_0,occurrence,Download key: 0273021-210914110416597
1,file_1,/user/bdm/persistent/gbif/1/2022-05-11_13-10-5...,PARQUET,1,2022-05-11 13:10:57,ds_0,occurrence,Download key: 0203956-210914110416597
2,file_2,/user/bdm/persistent/gbif/1/2022-05-11_13-11-5...,PARQUET,1,2022-05-11 13:11:51,ds_0,occurrence,Download key: 0204028-210914110416597
3,file_3,/user/bdm/persistent/gbif/1/2022-05-11_13-12-5...,PARQUET,1,2022-05-11 13:12:51,ds_0,occurrence,Download key: 0273021-210914110416597
4,file_4,/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11...,PARQUET,,2022-05-11 13:21:07,ds_1,MOD_LSTD_M (2000-02),Date: 2000-02-01 00:00:00
...,...,...,...,...,...,...,...,...
3288,file_3288,/user/bdm/persistent/worldclim/2.1/2022-06-02_...,TIF,2.1,2022-06-02 11:35:08,ds_2,wind (10),Resolution: 5m
3289,file_3289,/user/bdm/persistent/worldclim/2.1/2022-06-02_...,PARQUET,2.1,2022-06-02 11:35:08,ds_2,wind (11),Resolution: 5m
3290,file_3290,/user/bdm/persistent/worldclim/2.1/2022-06-02_...,TIF,2.1,2022-06-02 11:35:08,ds_2,wind (11),Resolution: 5m
3291,file_3291,/user/bdm/persistent/worldclim/2.1/2022-06-02_...,PARQUET,2.1,2022-06-02 11:35:08,ds_2,wind (12),Resolution: 5m


In [4]:
# Setup pyarrow
import os
import subprocess

os.environ["HADOOP_HOME"] = "/home/bdm/BDM_Software/hadoop"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["CLASSPATH"] = str(subprocess.check_output(['{}/bin/hadoop'.format(os.environ["HADOOP_HOME"]), 'classpath', '--glob']))

In [11]:
# Create spark session and set up the database driver
from pyspark.sql import SparkSession
spark = (SparkSession
         .builder
         .config("spark.jars", "/usr/local/postgresql-42.2.5.jar")
         .master("local")
         .appName("species_rasters")
         .enableHiveSupport()
         .getOrCreate())

In [28]:
# List file attributes (attribute.csv: id,file,relatedattr,label,comment)

hdfs_base = "hdfs://pikachu.fib.upc.es:27000"

attributes = []
attrIdGen = ("attribute_" + str(c) for c in count())

files = files_df.loc[files_df.format == "PARQUET", ["id", "url"]]
for _, file in files.iterrows():
    print("Processing {} ({})".format(file["id"], file["url"]))
    try:
        df = spark.read.parquet(hdfs_base + file["url"])
        for attr in df.columns:
            attributes.append({
                "id": attrIdGen.__next__(),
                "file": file["id"],
                "relatedattr": None,
                "label": attr,
                "comment": None,
            })
    except:
        print("Failed to read file schema of " + file["id"])

attributes_df = pd.DataFrame(data=attributes)
attributes_df.to_csv("data/attribute.csv", index=False, header=True)
attributes_df

Processing file_0 (/user/bdm/persistent/gbif/1/2022-05-11_13-04-40/0273021-210914110416597/occurrence.parquet)
Processing file_1 (/user/bdm/persistent/gbif/1/2022-05-11_13-10-57/0203956-210914110416597/occurrence.parquet)
Processing file_2 (/user/bdm/persistent/gbif/1/2022-05-11_13-11-51/0204028-210914110416597/occurrence.parquet)
Processing file_3 (/user/bdm/persistent/gbif/1/2022-05-11_13-12-51/0273021-210914110416597/occurrence.parquet)
Processing file_4 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-02.pq)
Processing file_5 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-03.pq)
Processing file_6 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-04.pq)
Processing file_7 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-05.pq)
Processing file_8 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-06.pq)
Processing file_9 (/user/bdm/persistent/neo/MOD_LSTD_M/2022-05-11_13-21-07/2000-07.pq)
Processing file_10 (/user/bdm/pers

Unnamed: 0,id,file,relatedattr,label,comment
0,attribute_0,file_0,,gbifid,
1,attribute_1,file_0,,datasetkey,
2,attribute_2,file_0,,occurrenceid,
3,attribute_3,file_0,,kingdom,
4,attribute_4,file_0,,phylum,
...,...,...,...,...,...
5101,attribute_5101,file_3287,,proj_raster,
5102,attribute_5102,file_3289,,proj_raster_path,
5103,attribute_5103,file_3289,,proj_raster,
5104,attribute_5104,file_3291,,proj_raster_path,


In [80]:
# List PostgreSQL columns (attribute.csv: id,table,datatype,originalAttr,foreignKey,label,comment)

psql_url = "jdbc:postgresql://localhost:5432/species_rasters"
psql_driver = "org.postgresql.Driver"
psql_user = "postgres"
psql_password = "postgres"

tables = ["background", "samples"]
orig_attrs = {
    "species": "file_3", # Null for background
    "longitude": "file_3", # Null for background
    "latitude": "file_3", # Null for background
    "tmin": "file_3035",
    "tmax": "file_3011",
    "tavg": "file_2987",
    "prec": "file_2939",
    "srad": "file_2963",
    "wind": "file_3083",
    "vapr": "file_3059",
    "bio": "file_2913",
    "elev": "file_2915",
    "SRTM_RAMP2_TOPO": "file_268",
    "MOD_LSTD_M": "file_268",
}

col_comments = {
    "species": "Name of the species of interest or 'background'",
    "longitude": "Latitude (WGS 84)",
    "latitude": "Longitude (WGS 84)",
    "tmin": "Minimum temperature (°C) from WorldClim",
    "tmax": "Maximum temperature (°C) from WorldClim",
    "tavg": "Average temperature (°C) from WorldClim",
    "prec": "Precipitation (mm) from WorldClim",
    "srad": "Solar radiation (kJ m^{-2} day^{-1}) from WorldClim",
    "wind": "Wind speed (m s^{-1}) from WorldClim",
    "vapr": "Water vapor pressure (kPa) from WorldClim",
    "bio": "Mean Temperature of Driest Quarter from WorldClim",
    "elev": "Elevation from WorldClim",
    "SRTM_RAMP2_TOPO": "Topography from NEO",
    "MOD_LSTD_M": "Montly Land Surface Temperature [day] from NEO",
}


columns = []
colIdGen = ("column_" + str(c) for c in count())

for table in tables:
    print("Reading from PostgreSQL: " + table)
    df = (spark.read.format("jdbc")
        .option("url", psql_url)
        .option("driver", psql_driver)
        .option("user", psql_user)
        .option("password", psql_password)
        .option("dbtable", table)
        .load()
    )
    for col, dtype in df.dtypes:
        originalAttr = orig_attrs[col]
        if table == 'background' and col in ["species", "longitude", "latitude"]:
            originalAttr = None
        columns.append({
            "id": colIdGen.__next__(),
            "table": table,
            "datatype": dtype,
            "originalAttr": originalAttr,
            "foreignKey": None,
            "label": col,
            "comment": col_comments[col],
        })
        
columns_df = pd.DataFrame(data=columns)
columns_df.to_csv("data/column.csv", index=False, header=True)
columns_df

Reading from PostgreSQL: background
Reading from PostgreSQL: samples


Unnamed: 0,id,table,datatype,originalAttr,foreignKey,label,comment
0,column_0,background,string,,,species,Name of the species of interest or 'background'
1,column_1,background,string,,,longitude,Latitude (WGS 84)
2,column_2,background,string,,,latitude,Longitude (WGS 84)
3,column_3,background,string,file_3035,,tmin,Minimum temperature (°C) from WorldClim
4,column_4,background,string,file_3011,,tmax,Maximum temperature (°C) from WorldClim
5,column_5,background,string,file_2987,,tavg,Average temperature (°C) from WorldClim
6,column_6,background,string,file_2939,,prec,Precipitation (mm) from WorldClim
7,column_7,background,string,file_2963,,srad,Solar radiation (kJ m^{-2} day^{-1}) from Worl...
8,column_8,background,string,file_3083,,wind,Wind speed (m s^{-1}) from WorldClim
9,column_9,background,string,file_3059,,vapr,Water vapor pressure (kPa) from WorldClim
