# load_parker

A notebook for loading observational data preprocessed using [LiveOcean's "obs"](https://github.com/parkermac/LO/tree/main/obs) format.

This notebook does not handle the mooring data because I already have a working processing system for the ORCA buoys.

Parameters for tuning:

 * *parker_obs_path*: put outputs from obs here.
 * *data_included*: a collection of subfolder names under *parker_obs_path* to read and load into the database. As of 2024, including all years of the data below results in over 85 million observation rows.
 * *info_files*: a glob matching pattern for the info pickle files, used for populating the stations table.
 * *data_files*: a glob matching pattern for the individual data files, typically named with a 4-digit year.
 * *loc_tolerance*: The obs data does not group locations together beyond the cast level, so this notebook performs location deduplication by grouping stations within this distance (m) of an existing station. The larger this value is, the less spatial precision there will be in where the observation was taken; the smaller it is, the fewer unique stations there will be in the database. `dfo1` is the worst offender at creating many stations. Using a 100 m tolerance with the default *data_included* results in over 47 thousand individual stations.

On my setup with a separate database server on a gigabit ethernet connection, loading all of the default data took over an hour to finish.

In [204]:
parker_obs_path = 'data/parker'
data_included = ('collias','dfo1','ecology_nc','nceiCoastal','nceiPNW','nceiSalish','LineP','WOD','ocnms_ctd')

info_files = '**/info_*.p'
data_files = '**/????.p'

loc_tolerance = 100

import glob
import uuid
import random
from pathlib import Path
from multiprocessing import Pool
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from sqlalchemy.exc import IntegrityError

import db

Cast UUID's are generated by this notebook in a way that guarantees they will be repeatable between invocations, but always unique to a particular cid of a particular source and year.

In [168]:
rd = random.Random()
rd.seed(5) # Should be unique to this ETL notebook
baseuuid = uuid.UUID(int=rd.getrandbits(128))

An example of what the obs format looks like when unpickled.

In [2]:
root = Path(parker_obs_path)
for src in data_included:
    src_root = root / src
    for i in src_root.glob(data_files):
        print(i)
        data_df = pd.read_pickle(i)
        display(data_df.head())
        break
    break

data/parker/collias/bottle/1961.p


Unnamed: 0,cid,cruise,time,lat,lon,name,z,CT,SA,DO (uM),NO2 (uM),SiO4 (uM)
0,0,,1961-04-18 18:30:00,47.934814,-122.634599,HCE501,-96,8.851958,29.602401,262.762941,,
1,0,,1961-04-18 18:30:00,47.934814,-122.634599,HCE501,-72,8.864435,29.601889,263.388567,,
2,0,,1961-04-18 18:30:00,47.934814,-122.634599,HCE501,-48,8.961322,29.360403,269.332015,,
3,0,,1961-04-18 18:30:00,47.934814,-122.634599,HCE501,-29,9.046389,29.19968,274.649836,,
4,0,,1961-04-18 18:30:00,47.934814,-122.634599,HCE501,-19,9.12035,29.049315,280.906097,,


## Compile and Aggregate Stations

First, deduplicate station names by assembling a look-up DF mapping name+year to assigned DB name. Start by reading all the info files into a giant GDF.

In [203]:
info_dfs = []
for src in data_included:
    src_root = root / src
    # Some locations aren't named in some sources, so keep track of the source name so we can generate some names
    newdfs = [pd.read_pickle(i) for i in src_root.glob(info_files)]
    for df in newdfs:
        df['src'] = src
    info_dfs.extend(newdfs)

info_df = pd.concat(info_dfs, ignore_index=True)
# FIXME no documentation on what the lat/lon datum is or if it's been standardized. I assume it's WGS84
info_gdf = gpd.GeoDataFrame({'cid': info_df.index, 'time': info_df['time'], 'name': info_df['name'],
                             'cruise': info_df['cruise'], 'src': info_df['src']},
                            geometry=[Point(xy) for xy in zip(info_df['lon'], info_df['lat'])],
                            crs='epsg:4326').to_crs(epsg=32610)
info_gdf['time'] = pd.to_datetime(info_gdf['time'])
display(info_gdf.head())

Unnamed: 0,cid,time,name,cruise,src,geometry
0,0,1952-02-22 10:38:00,HCE501,,collias,POINT (527291.582 5309119.74)
1,1,1952-06-04 22:25:00,HCE501,,collias,POINT (527291.582 5309119.74)
2,2,1952-06-05 19:02:00,HCE501,,collias,POINT (527291.582 5309119.74)
3,3,1952-07-31 16:11:00,HCE501,,collias,POINT (527291.582 5309119.74)
4,4,1952-10-13 19:05:00,HCE501,,collias,POINT (527291.582 5309119.74)


Now do a spatial group for everything within a small radius (*loc_tolerance* above). Each entry within the group must be given a unique name, so if 'name' alone is not unique, append an incrementing number to it. Keep track of final names in the lookup DF, and assemble the locations table in the DB.

In [132]:
info_simple = info_gdf.copy()
info_simple['year'] = info_simple['time'].dt.year
cruise_idx = ~info_simple['cruise'].isna()
info_simple['crname'] = info_simple['name'].str.strip()
info_simple.loc[cruise_idx, 'crname'] = info_simple.loc[cruise_idx].apply(
    lambda r: str(r['crname']) + '_' + str(r['cruise']).strip(), axis=1)

def gen_name(base, existing):
    i = 1
    while f"{base}_{i}" in existing:
        i += 1
    return f"{base}_{i}"

master = None # The master list of unique stations as a GeoSeries
# The raw data that will be used for an internal lookup table later
lookup = {'name': [], 'cruise': [], 'year': [], 'geometry': [], 'dbname': []}
for i,row in info_simple.drop_duplicates(subset=('name','year','geometry')).iterrows():
    new = False
    if master is None:
        newname = row['crname']
        master = gpd.GeoSeries([row['geometry']], index=[newname], crs=info_simple.crs)
        master.index.name = 'name'
    else:
        nearby = master.dwithin(row['geometry'], loc_tolerance)
        if np.any(nearby):
            # Already have a nearby location, so use it
            newname = master.loc[nearby].sort_values().index[0]
            #print(f"{row['crname']}: Found existing nearby station {newname}")
        else:
            if row['crname'] is None:
                # Nothing nearby, and no name given so generate one
                newname = gen_name(row['src'], master.index)
            elif row['crname'] in master.index:
                # Nothing nearby, but the name given is already taken
                newname = gen_name(row['crname'], master.index)
            else:
                # We can safely use the name as-is
                newname = row['crname']
            master.loc[newname] = row['geometry']

    lookup['name'].append(row['name'])
    lookup['cruise'].append(row['cruise'])
    lookup['year'].append(row['year'])
    lookup['geometry'].append(row['geometry'])
    lookup['dbname'].append(newname)

master_gdf = gpd.GeoDataFrame(geometry=master, crs=info_simple.crs)
lookup_df = pd.DataFrame(lookup)
#lookup_df.set_index(['name','cruise','year','geometry'], inplace=True)
master_gdf.head()

Unnamed: 0_level_0,geometry
name,Unnamed: 1_level_1
HCE501,POINT (527291.582 5309119.74)
ADM201,POINT (516506.266 5335940.591)
ADM204,POINT (527862.368 5319496.902)
ADM202,POINT (523215.387 5331517.772)
PSB301,POINT (539020.178 5305297.273)


Our lookup DF allows relating the combination of name, cruise, year, and coordinates to the newly generated station name.

In [133]:
lookup_df.head()

Unnamed: 0,name,cruise,year,geometry,dbname
0,HCE501,,1952,POINT (527291.58150689 5309119.739996711),HCE501
1,ADM201,,1952,POINT (516506.2662512224 5335940.591206845),ADM201
2,ADM204,,1952,POINT (527862.3682839083 5319496.902014364),ADM204
3,ADM202,,1952,POINT (523215.387201385 5331517.77196983),ADM202
4,PSB301,,1952,POINT (539020.1779876386 5305297.272531977),PSB301


## Load Sources, Stations

Define a unique source for each entry in *data_included* but indicate that it comes from Parker's obs outputs be setting the agency field.

In [19]:
engine = db.connect()

In [93]:
source_ids = {}
df = pd.read_sql_table("sources", con=engine, schema='obsdata', index_col='id')
for src in data_included:
    agency = 'Parker'
    study = f'{src} dataset'
    source_row = df.loc[(df['agency'] == agency) & (df['study'] == study)]
    if len(source_row) == 0:
        df = pd.DataFrame({
            "agency": [agency],
            "study": [study]
        })
        df.to_sql('sources', con=engine, schema='obsdata', index=False, if_exists='append')

        # Refresh the sources so we can fetch the primary key
        df = pd.read_sql_table("sources", con=engine, schema='obsdata', index_col='id')
        source_row = df.loc[(df['agency'] == agency) & (df['study'] == study)]
    
    source_ids[src] = source_row.index[0]
source_ids

{'collias': 4,
 'dfo1': 5,
 'ecology_nc': 6,
 'nceiCoastal': 7,
 'nceiPNW': 8,
 'nceiSalish': 9,
 'LineP': 10,
 'WOD': 11,
 'ocnms_ctd': 12}

In [134]:
master_gdf.rename_geometry('geom').to_postgis('stations', con=engine,
                                              schema='obsdata', index=True,
                                              index_label='name', if_exists='append')

## Load Observations

This is the meat of the notebook. Define the function that processes all the observations from a single file. It takes optional `con` and `start_col` parameters so it can be called manually to diagnose/retry any that fail.

In [200]:
col_param_map = {
    'CT': 'temp',
    'SA': 'salt',
    'DO (uM)': 'o2',
    'NO3 (uM)': 'no3',
    'NO2 (uM)': 'no2',
    'NH4 (uM)': 'nh4',
    'PO4 (uM)': 'po4',
    'SiO4 (uM)': 'sioh4',
    'Chl (mg m-3)': 'chla',
    'DIC (uM)': None,
    'TA (uM)': None
}

col_factors = {
    'DO (uM)': 32/1000
}

def process_data(pth, src, con=None, start_col=None):
    if con is None:
        con = thread_con
    data = pd.read_pickle(pth)
    # Sometimes there are true duplicate entries, so merge them
    data = data.groupby(['cid','lon','lat','time','z','name','cruise'], dropna=False).agg('mean').reset_index()
    data[['name','cruise']] = data[['name','cruise']].replace(np.nan, None)
    data.dropna(subset=('z','time','lon','lat'), inplace=True)

    data_gdf = gpd.GeoDataFrame(data, geometry=[Point(xy) for xy in zip(data['lon'],data['lat'])],
                                crs='epsg:4326').to_crs(epsg=32610)
    data_gdf['time'] = pd.to_datetime(data['time'])
    data_gdf['year'] = data_gdf['time'].dt.year
    # Some files don't use an object dtype for cruise and this breaks the below merge, so force it
    data_gdf['cruise'] = data_gdf['cruise'].astype(object)
    # FIXME not using indices for lookup_df probably slows this down, but converting to
    # a MultiIndex messes up the dtype on cruise and makes the merge fail
    m = data_gdf.merge(lookup_df, left_on=('name','cruise','year','geometry'), #right_index=True)
                       right_on=('name','cruise','year','geometry'))
    dest = pd.DataFrame({
        # Assuming dataset has already been corrected for daylight savings and is in PST
        'datetime': pd.DatetimeIndex(m['time']).tz_localize(-8*3600),
        'depth': -m['z'],
        'location_id': m['dbname']
    })
    dest['source_id'] = source_ids[src]
    dest['cast_id'] = m.groupby('cid')['cid'].transform(lambda g: uuid.uuid3(baseuuid, f'{pth.relative_to(root).parent}_{pth.stem}_{g}'))
    # create copies with parameter_id/value pairs
    found_start = start_col is None
    for c in set(data.columns) - {'cid','lon','lat','name','time','cruise','z'}:
        found_start |= c == start_col
        if not found_start:
            continue
        pid = col_param_map[c]
        if pid is None:
            continue
        dest['parameter_id'] = pid
        dest['value'] = data[c] * col_factors[c] if c in col_factors else data[c]
        try:
            dest.dropna(subset=('value',)).to_sql('observations', con=con, schema='obsdata', index=False, if_exists='append')
        except IntegrityError:
            return (False,c)
    return (True,None)

Improve performance somewhat by processing in parallel, but no more than 4 task threads because it would otherwise overwhelm any database. This could take an hour or more to finish.

In [201]:
def db_init():
    global thread_con
    thread_con = db.connect()

with Pool(4, initializer=db_init) as p:
    inputs = []
    for src in data_included:
        for d in (root / src).glob(data_files):
            inputs.append((d, src))
    successes = p.starmap(process_data, inputs)

`inputs` is a list of tuples containing a data file path and source ID.

`successes` is a list of tuples indicating whether the matching input was successfully committed in full, and if not the name of the column where the failure occurred. So if there are any entries here, troubleshooting those failed data files is required, but the above cell will not have to be run again since they can be loaded piecemeal.

In [202]:
results_df = pd.DataFrame(successes, index=pd.MultiIndex.from_tuples(inputs))
results_df.loc[~results_df[0]]

Unnamed: 0,Unnamed: 1,0,1
