In [0]:
%r
if(!require(maps)){install.packages("maps")}
library(maps)
if(!require(countrycode)){install.packages("countrycode")}
library(countrycode)

if(!require(data.table)){install.packages("data.table")}
#library(countrycode)

library(dplyr)
library(sparklyr) # load sparklyr last because there is namespace conflicts between dplyr and sparklyr by design

In [0]:
%r
#inat_plant_post2015 <- sdf_sql(sc,
#"SELECT kingdom,eventdate,recordedBy,occurrenceid,countrycode,scientificname FROM gbif.inat
#WHERE kingdom = 'Plantae' AND SUBSTRING(eventdate,1,4)>2015"
#       ) 

#active_collectors <- inat_plant_post2015 %>%
#  filter(countrycode == "BE") %>%
#  dplyr::count(recordedBy) %>%
#  filter(n>99) %>%
#  pull(recordedBy)
  
#get_active_collectors_for_country <- function(countrycode,cutoff=100){
#  active_collectors <- inat_plant_post2015 %>%
#  filter(countrycode == countrycode) %>%
#  dplyr::count(recordedBy) %>%
#  filter(n>(cutoff - 1)) %>%
#  pull(recordedBy)  
#}

In [0]:
%sql

SELECT kingdom,eventdate,recordedBy,occurrenceid FROM gbif.inat
WHERE kingdom = 'Plantae' AND SUBSTRING(eventdate,1,4)>2015


In [0]:
%r
# declare spark cluster
sc <- sparklyr::spark_connect(method = "databricks")

In [0]:
%r
#sel_cc <- "BE"

#active_collectors <- get_active_collectors_for_country(sel_cc, cutoff=100)

#out_obs <- inat_plant_post2015 %>%
#  filter(countrycode == sel_cc) %>%
#  filter(recordedBy %in% active_collectors)

#SparkR::as.data.frame(out_obs) %>% display

In [0]:
%r
#length(active_collectors)

Fetching the iNaturalist opendata dump from a Meise server (instead of Amazon S3, because I don't want to set it all up)

In [0]:
%r

#url_to_get <- "https://oxalis.br.fgov.be/images/observations.csv.gz"
#options(timeout = max(1000, getOption("timeout"))) #set timeout to something long
#download.file(url = url_to_get, destfile= "/dbfs/FileStore/observations.csv.gz")


In [0]:
%r
# check if we got exactly the right file (the download didn't fail or the file didn't change)
#tools::md5sum("/dbfs/FileStore/observations.csv.gz") == "bbfc3c2536acd7010d06c50970b03237"

d

## We are looking for:

- only european countries
- at least 20 observations over the lifetime of the user, in the country we are exporting
- only observations that need id or research grade
- any taxa 

Then we want to split these out by country, label the output files by two character iso code

**Let's target Spain first**

In [0]:
%r
#inat_open_data <- spark_read_csv(sc, path = "dbfs:/FileStore/observations.csv.gz", name = "inat_open_data", delimiter="\t")

store it as a parquet for reference (and so we don't need to do this again if we run the notebook in the future)

In [0]:
%sql 
CREATE TABLE pieter.inat_data
STORED AS parquet
AS SELECT * FROM inat_open_data 

In [0]:
%r
inat_open_data <- sdf_sql(sc,"SELECT * FROM pieter.inat_data")

In [0]:
%r
#sdf_sql(sc,"SELECT * FROM pieter.inat_data") %>% sdf_nrow()

In [0]:
%r
#glimpse(inat_open_data)

### Filter out casual observations

In [0]:
%r
# filter out the casual observations

inat_open_data <- inat_open_data %>%
  filter(quality_grade != "casual")
  


### Filter out inactive users on a global scale

In [0]:
%r
# get a total tally for the users so we can get rid of ones that don't have 20 observations

user_tally <- sdf_sql(sc,"SELECT observer_id,COUNT(*) as N FROM pieter.inat_data GROUP BY observer_id")

In [0]:
%r
#sdf_nrow(user_tally)

In [0]:
%r
active_users <- pull(filter(user_tally,N>19),observer_id)

In [0]:
%r
#length(active_users)

In [0]:
%r
inat_open_data <- filter(inat_open_data, observer_id %in% active_users)

In [0]:
%r
#sdf_nrow(inat_open_data)

### Add country information: using UK as a manual example

In [0]:
%r
#inat_uk <- 
#  inat_open_data %>%
#  filter(longitude > -7) %>%
#  filter(longitude < 2) %>%
#  filter(latitude > 49) %>%
#  filter(latitude < 59)

In [0]:
%r
#longs <- pull(inat_uk,longitude)
#lats <- pull(inat_uk,latitude)


In [0]:
%r
#countries <- maps::map.where(database = "world", x = longs, y = lats)

In [0]:
%r
#sdf_copy_to(sc,tibble(country=countries),name = "countries", overwrite = TRUE)

In [0]:
%r
# merge the country information to the observers, and filter so it's only from the chosen country
#inat_uk <- sdf_bind_cols(inat_uk,sdf_sql(sc,"SELECT * FROM countries")) %>%
#  filter(country == "UK:Great Britain")

drop observations of observers with less then 20 observers in the uk

In [0]:
%r
#active_observers <- filter(count(inat_uk,observer_id),n>19) %>% pull(observer_id)
#inat_uk <- filter(inat_uk, observer_id %in% active_observers)

In [0]:
%r
#sdf_nrow(inat_uk)

In [0]:
%r
#select(inat_uk, - country) %>% sdf_collect() %>% data.table::fwrite("/dbfs/FileStore/UK_inat.csv")

In [0]:
%r
#select(inat_uk, - country) %>% sdf_collect() %>% saveRDS("/dbfs/FileStore/UK_inat.rds")

## Now for any country

In [0]:
%r
bounding_boxes <- sdf_sql(sc,"SELECT * FROM pieter.bounding_boxes") %>% sdf_collect()

In [0]:
%r
bounding_boxes

In [0]:
%r


fetch_countrywise <- function(selected_country, cutoff = 20){

selected_bb <- filter(bounding_boxes,iso2c == selected_country)

inat_subset <-
  inat_open_data %>%
  filter(longitude > !!selected_bb$long_min) %>%
  filter(longitude < !!selected_bb$long_max) %>%
  filter(latitude > !!selected_bb$lat_min) %>%
  filter(latitude < !!selected_bb$lat_max)

# fetch coordinates to R env
longs <- pull(inat_subset,longitude)
lats <- pull(inat_subset,latitude)

# compare to shapefiles in local db, and push back to spark
sdf_copy_to(sc,tibble(country=maps::map.where(database = "world", x = longs, y = lats)),name = "countries", overwrite = TRUE)
  
# merge the country information to the observers, and filter so it's only from the chosen country
inat_subset <- sdf_bind_cols(inat_subset,sdf_sql(sc,"SELECT * FROM countries")) %>%
  filter(country == countrycode::countrycode(selected_country, origin= "iso2c", destination="country.name"))
# drop records of observers with less than the cutoff records
active_observers <- filter(count(inat_subset,observer_id),n>(cutoff-1)) %>% pull(observer_id)
inat_subset <- filter(inat_subset, observer_id %in% active_observers)
  
# print a short message about the result, the rowcount in the end is an expensive step that we don't really need
message(sprintf("passed %s: %s, subsetting resulted in %i records",selected_country, countrycode::countrycode(selected_country, origin = "iso2c", destination = "country.name"), sdf_nrow(inat_subset)))
# return subset
return(inat_subset)
  }

In [0]:
%r
# test with afghanistan
#fetch_countrywise("AF") %>% sdf_nrow()

In [0]:
%r
# get european countries

euro_iso2c <- 
  bounding_boxes %>%
    mutate(continent = countrycode::countrycode(iso2c, origin="iso2c", destination="continent")) %>%
    filter(continent == "Europe") %>%
    pull(iso2c)

In [0]:
%r


#let's create a list of spark dataframes with the results in per country

#purrr::map(euro_iso2c[1:3],fetch_countrywise) %>%
#  purrr::map2(euro_iso2c[1:3],~saveRDS(sdf_collect(.x),file=sprintf("/dbfs/FileStore/%s_inat.rds",.y)))

In a loop, so we have intermediate saving

In [0]:
%r
for(selected_cc in euro_iso2c[4:length(euro_iso2c)]){
  # we don't need to create a file we already have
  if(file.exists(sprintf("/dbfs/FileStore/%s_inat.rds",selected_cc))){
    message(sprintf("We already have %s_inat.rds for %s",
                    selected_cc,
                    countrycode::countrycode(selected_cc,
                                             origin = "iso2c",
                                             destination = "country.name")
                   )
           )
    next
  }
  
  out_sp <- fetch_countrywise(selected_cc) %>%
    sdf_collect()
  # save the file, except if it's length zero. Then there is probably a problem with the translation between maps::map.where and countrycode::countrycode(destination="country.name")
  
  if(nrow(out_sp)>0){
    saveRDS(out_sp,
            file=sprintf("/dbfs/FileStore/%s_inat.rds",selected_cc)
           )
  message("saved, NEXT!!")} else { message("problem with filter, not saved")}
}