### Load Catalog

 - Connect to catalog (postgres) and warehouse (s3 - data & metadata layer)
 - Verify Namespace and Table


 *NOTE* For this example we will use a second namespace: `development`


In [1]:
import logging
logging.basicConfig(level=logging.INFO)

from connection import connect_to_catalog
catalog = connect_to_catalog()

NAMESPACE = "conformance"

name_spaces = [ns[0] for ns in catalog.list_namespaces()]
print(f"Existing namespaces: {name_spaces}")

for table_identifier in catalog.list_tables(NAMESPACE):
    print(f"Table found: {table_identifier}")

INFO:root:Connected to Iceberg catalog: `trinity`


Existing namespaces: ['development', 'conformance']
Table found: ('conformance', 'hydraulics')
Table found: ('conformance', 'hydrology')


#### Load hydraulic data from model output:

 - Use the same `event_id` and `model_id` to add (not replace) data
 - Will need to check existing data, adust, and update

In [2]:
from example_data_utils import load_sample_ras_data
from example_data_utils import SAMPLE_RAS_MODELS,SAMPLE_EVENTS

first_ras_model = SAMPLE_RAS_MODELS[0]

print(f"Loading RAS data for event {SAMPLE_EVENTS[0]} and model {first_ras_model}")
ras_df = load_sample_ras_data(SAMPLE_EVENTS[0], first_ras_model, tseries_type="stage")
print(f"RAS data shape: {ras_df.shape}")

ras_df.head()

INFO:root:Loading data from s3://trinity-pilot/dev/conformance/simulations/event-data/4/hydraulics/blw-bear/stage_timeseries.pq


Loading RAS data for event 4 and model blw-bear


INFO:root:Data loaded with shape (841, 56)


RAS data shape: (47096, 7)


Unnamed: 0,sim_time,realization_id,model_id,site_id,event_id,run_version,stage
0,1992-11-22 00:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491
1,1992-11-22 01:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491
2,1992-11-22 02:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491
3,1992-11-22 03:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491
4,1992-11-22 04:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491


#### Search existing iceberg table using primary keys

In [3]:
from table_utils import fetch_existing_data_for_keys, key_cols

print(f"Key columns: {key_cols}")

table_identifier = f"{NAMESPACE}.hydraulics"
table = catalog.load_table(table_identifier)

existing_records = fetch_existing_data_for_keys(table, ras_df)
print(f"Existing records shape: {existing_records.shape}")
existing_records.head()

INFO:pyiceberg.io:Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO


Key columns: ['sim_time', 'realization_id', 'model_id', 'site_id', 'event_id', 'run_version']


INFO:pyiceberg.io:Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO


Existing records shape: (21025, 7)


Unnamed: 0,sim_time,realization_id,model_id,site_id,event_id,run_version,flow
0,1992-11-22 00:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,0.0
1,1992-11-22 01:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,0.0
2,1992-11-22 02:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,0.0
3,1992-11-22 03:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,0.0
4,1992-11-22 04:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,0.0


*NOTE* 

 1. Some of the data will be replaced (where stage and flow exist at reference lines) 
 3. Some will be appended (where no flow data is available, i.e. reference points)

 ---

##### Merge data in preparation for upserting

In [4]:
combined = ras_df.merge(
    existing_records,
    on=key_cols,
    how="left")

print(combined.shape)
combined.head()

(47096, 8)


Unnamed: 0,sim_time,realization_id,model_id,site_id,event_id,run_version,stage,flow
0,1992-11-22 00:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491,0.0
1,1992-11-22 01:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491,0.0
2,1992-11-22 02:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491,0.0
3,1992-11-22 03:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491,0.0
4,1992-11-22 04:00:00,1,blw-bear,bc_big-bear-ck_s010_base,4,v1,430.397491,0.0


In [5]:
import pyarrow as pa
from table_utils import json_to_arrow


arrow_schema = json_to_arrow("schemas/hydraulics.json")
arrow_table = pa.Table.from_pandas(combined, schema=arrow_schema, preserve_index=False)

arrow_table.slice(0,1)

pyarrow.Table
sim_time: timestamp[us] not null
realization_id: int32 not null
model_id: string not null
site_id: string not null
event_id: int32 not null
run_version: string not null
flow: double
stage: double
----
sim_time: [[1992-11-22 00:00:00.000000]]
realization_id: [[1]]
model_id: [["blw-bear"]]
site_id: [["bc_big-bear-ck_s010_base"]]
event_id: [[4]]
run_version: [["v1"]]
flow: [[0]]
stage: [[430.3974914550781]]

#### Upsert data

*NOTES*

In iceberg dentifier fields (~ primary keys):
 - Do not automatically filter operations
 - Do not automatically scope writes
 - Do not automatically rewrite only matching keys
 - Are not tied to partitioning
 - Are not used automatically by overwrite()
 - Are only used by the Iceberg "merge-on-read / upsert" machinery to decide which rows are "matching" when using upsert()

Based on these properties (and the need to do this on large tables in the future), we will use overwrite.


In [6]:
from pyiceberg.expressions import And, EqualTo

EVENT_ID = SAMPLE_EVENTS[0]
REALIZATION_ID = 1

overwrite_filter = And(
    EqualTo("event_id", int(EVENT_ID)),
    EqualTo("realization_id", int(REALIZATION_ID)),
)

table.overwrite(
    df=arrow_table,
    overwrite_filter=overwrite_filter,
)

INFO:pyiceberg.io:Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO
INFO:pyiceberg.io:Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO
INFO:pyiceberg.io:Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO


![](imgs/overwrite.png)