## R script for OMOP to load EPA Air Quality data

### setup

In [1]:
install.packages("config")
install.packages("DBI")
library(config)
library(DBI)

# make db connection
# for format of database file see db/env/db_conf.txt
db <- read.delim( '../../db/env/feta.txt', header=TRUE, sep=' ' )
con <- dbConnect(RPostgres::Postgres(),
                 dbname = toString(db$database[1]),
                 host = toString(db$host[1]),
                 port = 5432,
                 user = toString(db$user),
                 password = toString(db$pass))

# check the connection
dbListTables(con)

### get EPA data from API  
note that we use annual averages

In [1]:
#install.packages("httr")
library(httr)
base_api_url <- "https://aqs.epa.gov/data/api/annualData/"
api_query_type <- "byState"
api_email <- "test@aqs.api"
api_key <- "test"
api_param <- "88101,88502"  # PM2.5 88101,88502; NO2 42602
                            # https://aqs.epa.gov/aqsweb/documents/codetables/methods_all.html
api_bdate <- "20180115"     # begin date
api_edate <- "20180115"     # end date: set to same as bdate for whole year
api_state <- "12"           # Florida

request_url <- paste(
    base_api_url,api_query_type,"?", 
    "email=",api_email, 
    "&key=",api_key,
    "&param=",api_param,
    "&bdate=",api_bdate,
    "&edate=",api_bdate,
    "&state=",api_state,
    sep=""
)
res <- GET(request_url)
# Todo: error checking on response
data <- content(res)$Data

### get EPA data from file

In [None]:
base_download_url <- "https://aqs.epa.gov/aqsweb/airdata/"
file_download <- "annual_conc_by_monitor_2020.zip"

# all us sites:
file_download <- "aqs_sites.zip"

# all us monitors
file_download <- "aqs_monitors.zip"

In [2]:
# all sites (i.e. monitor locations)
base_download_url <- "https://aqs.epa.gov/aqsweb/airdata/"
file_download <- "aqs_sites.zip"
file_name <- "aqs_sites.csv"

temp <- tempfile()
download.file(paste(base_download_url,file_download,sep=""),temp)
data <- read.csv(unz(temp, file_name), header=TRUE)
unlink(temp)

In [2]:
data

### create empty tables in database

- TODO: define table naming conventions
- TODO: check for exisiting tables
- TODO: update geo_index and attr_index tables with metadata
- TODO: index geospatial columns for faster query
- NOTE: using empty template tables, not specification

In [11]:
# create empty tables for geometry and attributes

geo_tablename <- 'geo_fl_epa_aqs_sites'
sql_createidseq <- paste("CREATE SEQUENCE ",geo_tablename,"_id_seq ", # geo table id sequence
                         "INCREMENT 1 START 1 ",
                         "MINVALUE 1 MAXVALUE 9223372036854775807 ",
                         "CACHE 1;",
                         sep="")
req <- dbSendQuery(con,sql_createidseq)
dbClearResult(req)

sql_createpointgeom <- paste("CREATE TABLE",geo_tablename, # geo table as clone
                             "as SELECT * FROM geo_point_template",
                             "WITH NO DATA;")
req <- dbSendQuery(con,sql_createpointgeom)
dbClearResult(req)

sql_alterpointgeom <- paste("ALTER TABLE ",geo_tablename, # geo table add keys and relations
                            " ADD CONSTRAINT ",geo_tablename,"_id_pkey ",
                            "PRIMARY KEY (geo_record_id), ",
                            "ALTER COLUMN geo_record_id ",
                            "SET DEFAULT nextval('",geo_tablename,"_id_seq","'::regclass);",
                            sep="")
req <- dbSendQuery(con,sql_alterpointgeom)
dbClearResult(req)

att_tablename <- 'attr_fl_epa_pm25_2018'
sql_createidseq <- paste("CREATE SEQUENCE ",att_tablename,"_id_seq ", # attr table id seq
                         "INCREMENT 1 START 1 ",
                         "MINVALUE 1 MAXVALUE 9223372036854775807 ",
                         "CACHE 1;",
                         sep="")
req <- dbSendQuery(con,sql_createidseq)
dbClearResult(req)

sql_createattr <- paste("CREATE TABLE",att_tablename, # attr table as clone
                        "AS SELECT * FROM attr_template",
                        "WITH NO DATA;")
req <- dbSendQuery(con,sql_createattr)
dbClearResult(req)

sql_alterattr <- paste("ALTER TABLE ",att_tablename, #attr table add keys and relations
                       " ADD CONSTRAINT ",att_tablename,"_id_pkey ",
                       "PRIMARY KEY (attr_record_id), ",
                       "ALTER COLUMN attr_record_id ",
                       "SET DEFAULT nextval('",att_tablename,"_id_seq","'::regclass), ",
                       "ADD CONSTRAINT ",att_tablename,"_geo_record_id_fkey ",
                       "FOREIGN KEY (geo_record_id) ",
                       "REFERENCES ",geo_tablename," (geo_record_id) MATCH SIMPLE ",
                       "ON UPDATE NO ACTION ON DELETE NO ACTION;",
                       sep="")
req <- dbSendQuery(con,sql_alterattr)
dbClearResult(req)

### load geospatial site data

In [12]:
sites <- list()
for (site in data){
    
    if (! site$site_number %in% sites) {

        # create location with geometry
        sites <- c(site$site_number,sites)
        sql_createsite <- paste(
            'INSERT INTO',geo_tablename,' ("geo_record_id","name","geom_wgs84") 
            VALUES (DEFAULT,$3,ST_SetSRID(ST_MakePoint($2,$1),4326))
            RETURNING geo_record_id'
        )
        res <- dbSendQuery(con,sql_createsite)
        dbBind(res, list(site$lat,site$lon,paste(site$site_number,":",site$local_site_name)))
        site_ID <- dbFetch(res)[1,1]
        dbClearResult(res)
    
    }

}

In [16]:
dbReadTable(con, geo_tablename) 

geo_record_id,name,source_id_coding,source_id_value,geom_wgs84,geom_local
<int>,<chr>,<chr>,<chr>,<pq_gmtry>,<pq_gmtry>
1,3002 : SYDNEY,,,0101000020E6100000E63FA4DFBE8E54C0E561A1D634F73B40,
2,0013 : Bee Ridge Park,,,0101000020E6100000F373435376A054C015A8C5E0614A3B40,
3,2003 : Pompano Highlands,,,0101000020E6100000C6F7C5A52A0654C00F4240BE844A3A40,
4,0033 : Palm Springs Fire Station,,,0101000020E61000008C31B08EE31454C0D6A9F23D23F13940,
5,0004 : Ellyson Industrial Park,,,0101000020E6100000744694F606CD55C0CB65A3737E863E40,
6,5005 : Coconut Creek,,,0101000020E6100000F29716F5490B54C0FC8F4C874E4B3A40,
7,0012 : Tallahassee Community College,,,0101000020E61000006DACC43C2B1655C0EB54F99E91703E40,
8,1002 : Sanford (Seminole Community College),,,0101000020E6100000E7E44526E05354C0942F682101BF3C40,
9,0018 : Azalea Park,,,0101000020E61000003108AC1C5AAF54C0D009A1832EC93B40,
10,0007 : Melbourne,,,0101000020E6100000FAB9A1293B2854C0B3B45373B90D3C40,


In [8]:
n <- 0
for (site in data) {
    #print (paste(site$site_number,site$local_site_name))
    if (site$site_number=='5002') {
        print (
            paste(
                #site$site_number,
                #site$parameter_code,
                site$arithmetic_mean,'|',
                #site$date_of_last_change,
                #site$parameter,
                #site$units_of_measure,
                site$observation_count,'|',
                #site$observation_percent,
                site$valid_day_count,'|',
                site$method,'|',
                site$pollutant_standard,'|',
                site$first_max_datetime,'|',
                site$first_max_value
            )
        )
        print ('____________________________________')
        n <- n + 1
    }
}
print(n)

[1] "7.233989 | 356 | 356 | Multiple Methods Used | PM25 Annual 2012 | 2018-01-20 00:00 | 23.6"
[1] "____________________________________"
[1] "7.233989 | 356 | 356 | Multiple Methods Used | PM25 24-hour 2012 | 2018-01-20 00:00 | 23.6"
[1] "____________________________________"
[1] "7.233989 | 356 | 356 | Multiple Methods Used | PM25 Annual 2006 | 2018-01-20 00:00 | 23.6"
[1] "____________________________________"
[1] "7.233989 | 356 | 356 | Multiple Methods Used | PM25 24-hour 2006 | 2018-01-20 00:00 | 23.6"
[1] "____________________________________"
[1] "7.270524 | 8573 | 364 | Multiple Methods Used |  | 2018-01-01 00:00 | 59.7"
[1] "____________________________________"
[1] "6.581034 | 58 | 58 | R & P Model 2025 PM-2.5 Sequential Air Sampler w/VSCC - Gravimetric | PM25 Annual 2012 | 2018-01-20 00:00 | 20.2"
[1] "____________________________________"
[1] "6.581034 | 58 | 58 | R & P Model 2025 PM-2.5 Sequential Air Sampler w/VSCC - Gravimetric | PM25 24-hour 2012 | 2018-01-20 00:00 | 

In [25]:
cols = names(site)

In [61]:
for(key in names(site)){
  value<-site[key]
  print(paste(key,'=',value))
}

[1] "state_code = 12"
[1] "county_code = 017"
[1] "site_number = 0006"
[1] "parameter_code = 88502"
[1] "poc = 1"
[1] "latitude = 28.958644"
[1] "longitude = -82.642965"
[1] "datum = WGS84"
[1] "parameter = Acceptable PM2.5 AQI & Speciation Mass"
[1] "sample_duration = 1 HOUR"
[1] "pollutant_standard = NULL"
[1] "metric_used = Observed Values"
[1] "method = PM2.5 SCC w/Correction Factor - TEOM Gravimetric 50 deg C"
[1] "year = 2018"
[1] "units_of_measure = Micrograms/cubic meter (LC)"
[1] "event_type = No Events"
[1] "observation_count = 8270"
[1] "observation_percent = 94"
[1] "validity_indicator = Y"
[1] "valid_day_count = 344"
[1] "required_day_count = 365"
[1] "exceptional_data_count = 0"
[1] "null_observation_count = 490"
[1] "primary_exceedance_count = NULL"
[1] "secondary_exceedance_count = NULL"
[1] "certification_indicator = Certification not required"
[1] "arithmetic_mean = 8.394353"
[1] "standard_deviation = 4.325292"
[1] "first_max_value = 63"
[1] "first_max_datetime = 2018

In [94]:
which(data[[]]$site_number=='0035')

In [97]:
lapply(
    data, function(x) 
        filter (x, site_number=='0035')
)

ERROR: Error in storage.mode(x) <- "double": 'list' object cannot be coerced to type 'double'


In [113]:
filter(data, site_number=="0035")

ERROR: Error in storage.mode(x) <- "double": 'list' object cannot be coerced to type 'double'


In [104]:
site