# HMS Kaupskrá — Ducklake Demo

**Dataset** Þinglýstir kaupsamningar um fasteignir (kaupverð, dagsetningar, staðsetning o.fl.). Gögn ættu að uppfærast daglega frá HMS

**Skref**
1. Tengjast við Ducklake
2. Búa til töflur fyrir kaupskránna í Ducklake & Minio
3. Sanity checks & smá analysis
4. Time travel demo (query'a *as of* með timestamp fyrir breytingar)
5. ACID sýnidæmi í Ducklake (transaction + rollback)
6. Metadata töflur
7. Partitioning

## Instructions

 - Mæli með að breyta env breytunum
 -- MINIO_ROOT_USER=miniouser
 -- MINIO_ROOT_PASSWORD=miniopassw
 - Það þarf að keyra upp docker-compose skránna fyrst
 - Það þarf að útbúa virtual environment með requirements sem finnast í rótinni undir pyproject.toml

## 1) Tengjast Ducklake

In [122]:
# Tengja DuckLake út frá .env í rót
from ducklake import connect_and_attach
import pandas as pd
import os

con = connect_and_attach()

## 2) Útbúa töflur
Byrjum á að setja kaupskránna hráa inn í raw, seinna meir í "typed" töflu undir `hms.kaupskra`.

In [123]:
HMS_URL = "https://frs3o1zldvgn.objectstorage.eu-frankfurt-1.oci.customer-oci.com/n/frs3o1zldvgn/b/public_data_for_download/o/kaupskra.csv"
LAKE = os.environ.get("DUCKLAKE_NAME", "my_lake")
SCHEMA  = "hms"
TABLE   = "kaupskra"
FULL = f"{SCHEMA}.{TABLE}"

# Búa til schema fyrir HMS gögn
con.execute(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA};")

# loada háragögnum inn (inferred types/schema)
con.execute(f"""
CREATE OR REPLACE TABLE {SCHEMA}.kaupskra_raw AS
SELECT * FROM read_csv_auto('{HMS_URL}', header=True, sep=';', encoding='latin-1');
""")

# Útbúum töflu með fyrirfram ákveðnum datatýpum
con.execute(f"""
CREATE OR REPLACE TABLE {SCHEMA}.{TABLE} AS
SELECT
  CAST(faerslunumer AS VARCHAR)                AS faerslunumer,
  CAST(emnr         AS VARCHAR)                AS emnr,
  CAST(skjalanumer  AS VARCHAR)                AS skjalanumer,
  CAST(fastnum      AS VARCHAR)                AS fastnum,
  CAST(heimilisfang AS VARCHAR)                AS heimilisfang,
  CAST(postnr       AS VARCHAR)                AS postnr,
  CAST(heinum       AS VARCHAR)                AS heinum,
  CAST(svfn         AS VARCHAR)                AS svfn,
  CAST(sveitarfelag AS VARCHAR)                AS sveitarfelag,
  TRY_CAST(utgdag         AS TIMESTAMP)        AS utgdag,
  TRY_CAST(thinglystdags  AS TIMESTAMP)        AS thinglystdags,
  TRY_CAST(kaupverd                   AS BIGINT)  AS kaupverd,
  TRY_CAST(fasteignamat               AS BIGINT)  AS fasteignamat,
  TRY_CAST(fasteignamat_gildandi      AS BIGINT)  AS fasteignamat_gildandi,
  TRY_CAST(brunabotamat_gildandi      AS BIGINT)  AS brunabotamat_gildandi,
  TRY_CAST(byggar                     AS INTEGER) AS byggar,
  CAST(fepilog      AS VARCHAR)                AS fepilog,
  TRY_CAST(einflm                     AS DOUBLE)  AS einflm,
  TRY_CAST(lod_flm                    AS DOUBLE)  AS lod_flm,
  CAST(lod_flmein   AS VARCHAR)                AS lod_flmein,
  TRY_CAST(fjherb                     AS INTEGER) AS fjherb,
  CAST(tegund       AS VARCHAR)                AS tegund,
  TRY_CAST(fullbuid                  AS INTEGER)  AS fullbuid,
  TRY_CAST(onothaefur_samningur      AS INTEGER)  AS onothaefur_samningur
FROM {SCHEMA}.kaupskra_raw;
""")

# Skoðum magn raða
con.execute(f"SELECT COUNT(*) AS rows FROM {SCHEMA}.{TABLE};").fetchdf()

Unnamed: 0,rows
0,218473


In [124]:
con.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'hms'
""").fetchdf()

Unnamed: 0,table_schema,table_name
0,hms,kaupskra
1,hms,kaupskra_raw


In [125]:
# Skoða metadata um hvert column
con.execute("""
SELECT *
FROM __ducklake_metadata_my_lake.ducklake_column
""").fetchdf().head()


Unnamed: 0,column_id,begin_snapshot,end_snapshot,table_id,column_order,column_name,column_type,initial_default,default_value,nulls_allowed,parent_column
0,1,2,,2,1,FAERSLUNUMER,int64,,,True,
1,2,2,,2,2,EMNR,int64,,,True,
2,3,2,,2,3,SKJALANUMER,varchar,,,True,
3,4,2,,2,4,FASTNUM,int64,,,True,
4,5,2,,2,5,HEIMILISFANG,varchar,,,True,


In [126]:
# Velja nýjustu útgefnu kaupsamninga
con.execute(f"""
SELECT faerslunumer, sveitarfelag, heimilisfang, kaupverd, einflm, utgdag, thinglystdags
FROM {SCHEMA}.{TABLE}
ORDER BY utgdag DESC NULLS LAST
LIMIT 10;
""").fetchdf()

Unnamed: 0,faerslunumer,sveitarfelag,heimilisfang,kaupverd,einflm,utgdag,thinglystdags
0,734357,Múlaþing,Skógarsel 15B,52400,84.0,2025-09-18,2025-09-19 09:19:36
1,734329,Akureyrarbær,Skarðshlíð 32,45200,96.9,2025-09-17,2025-09-18 12:11:27
2,734361,Vestmannaeyjabær,Áshamar 93A,74500,118.3,2025-09-17,2025-09-19 10:08:36
3,734363,Reykjanesbær,Heiðarbrún 7,97500,170.5,2025-09-17,2025-09-19 10:24:38
4,734364,Vestmannaeyjabær,Heiðarvegur 11,60000,137.8,2025-09-17,2025-09-19 10:11:33
5,734257,Kópavogsbær,Gullsmári 10,74700,107.4,2025-09-16,2025-09-17 10:10:09
6,734312,Vestmannaeyjabær,Illugagata 65,90000,171.9,2025-09-16,2025-09-18 10:24:46
7,734316,Vestmannaeyjabær,Heimagata 35,18000,123.6,2025-09-16,2025-09-18 10:41:50
8,734320,Reykjavíkurborg,Ægisgata 5,74900,64.3,2025-09-16,2025-09-18 11:26:52
9,734321,Þingeyjarsveit,Víðifell lóð,32500,53.2,2025-09-16,2025-09-18 11:15:19


## 3) Basic analysis
- **By year**: count of contracts & average price.
- **Price per m²** (where usable area exists).

In [127]:
# magn og meðalverð samninga eftir ári
con.execute(f"""
WITH base AS (
  SELECT DATE_TRUNC('year', utgdag) AS yr, kaupverd::DOUBLE AS price
  FROM {SCHEMA}.{TABLE}
  WHERE utgdag IS NOT NULL AND kaupverd IS NOT NULL
)
SELECT yr::DATE AS year,
       COUNT(*)  AS n_contracts,
       ROUND(AVG(price)) AS avg_price
FROM base
GROUP BY 1
ORDER BY 1 DESC;
""").fetchdf()

Unnamed: 0,year,n_contracts,avg_price
0,2025-01-01,9788,94612.0
1,2024-01-01,16180,157953.0
2,2023-01-01,11665,155321.0
3,2022-01-01,13238,89264.0
4,2021-01-01,16952,61726.0
5,2020-01-01,15280,196628.0
6,2019-01-01,12313,53402.0
7,2018-01-01,12731,51763.0
8,2017-01-01,12354,60732.0
9,2016-01-01,13454,123620.0


In [128]:
# Verð á fermeter
con.execute(f"""
SELECT sveitarfelag,
       COUNT(*)                                        AS n,
       ROUND(AVG(kaupverd / NULLIF(einflm,0)), 0)      AS verd_per_fm2
FROM {SCHEMA}.{TABLE}
WHERE kaupverd IS NOT NULL AND einflm IS NOT NULL AND einflm > 0
GROUP BY 1
HAVING n >= 25
ORDER BY verd_per_fm2 DESC
LIMIT 10;
""").fetchdf()

Unnamed: 0,sveitarfelag,n,verd_per_fm2
0,Mýrdalshreppur,276,2866.0
1,Seltjarnarnesbær,2064,477.0
2,Garðabær,10091,463.0
3,Reykjavíkurborg,76270,432.0
4,Kópavogsbær,22670,414.0
5,Mosfellsbær,6807,397.0
6,Hafnarfjarðarkaupstaður,19389,367.0
7,Grímsnes-og Grafningshreppur,1927,361.0
8,Sveitarfélagið Árborg,7671,342.0
9,Sveitarfélagið Vogar,864,341.0


## 4) Time travel
DuckLake heldur utan um **snapshots** af stöðu hverrar töflu sem er hægt að query'a með *as of*, með: `AT (TIMESTAMP => ...)`, eða ákvenu version með: `AT (VERSION => ...)`.

Skref:
1. Ná í `t_before` tímastimpil
2. Skrifa bull röð í gögnin og committa, sem útbýr nýtt snapshot
3. Bera saman `COUNT(*)` vs. `AT (TIMESTAMP => t_before)`
4. Hreynsa upp mep að eyða röðinni

In [129]:
# Ná í síðasta tímastimpil
ts_before = con.execute(f"""
SELECT s.snapshot_time
FROM {LAKE}.snapshots() AS s
JOIN {LAKE}.last_committed_snapshot() AS l ON s.snapshot_id = l.id
""").fetchone()[0]
print(ts_before)

2025-09-21 16:35:00.677411+00:00


In [130]:
# Ná í síðasta tímastimpil
ts_before = con.execute(f"""
SELECT s.snapshot_time
FROM {LAKE}.snapshots() AS s
JOIN {LAKE}.last_committed_snapshot() AS l ON s.snapshot_id = l.id
""").fetchone()[0]
print(f"Tímastimpill fyrir: {ts_before}")


# insertum í töfluna
con.execute("BEGIN;")
con.execute(f"""
INSERT INTO {FULL} (
  faerslunumer, emnr, skjalanumer, fastnum, heimilisfang, postnr, heinum, svfn, sveitarfelag,
  utgdag, thinglystdags, kaupverd, fasteignamat, fasteignamat_gildandi, brunabotamat_gildandi,
  byggar, fepilog, einflm, lod_flm, lod_flmein, fjherb, tegund, fullbuid, onothaefur_samningur
) VALUES (
  'DEMO01','000','0000000000000','0000000','DEMO ADDRESS','000','0000000','0000','DEMO',
  now(), now(), 123456789, NULL, NULL, NULL,
  NULL, NULL, 50.0, NULL, NULL, NULL, 'Annað', 1, 0
);
""")
# Hægt er að setja committ message sem mún sjást í transaction loggnum í metadata grunninum
con.execute(f"CALL {LAKE}.set_commit_message(?, ?, extra_info => ?);",
            ["Notebook Demo", "Insert DEMO01", "{'source':'notebook'}"])
con.execute("COMMIT;")



# tímastimpill af snapshottinu EFTIR breytingar
ts_after = con.execute(f"""
SELECT s.snapshot_time
FROM {LAKE}.snapshots() AS s
ORDER BY s.snapshot_id DESC
LIMIT 1
""").fetchone()[0]
print(f"Tímastimpill eftir: {ts_after}")


# Samanburður
before_cnt = con.execute(f"""
SELECT COUNT(*) FROM {FULL} AT (TIMESTAMP => ?);
""", [ts_before]).fetchone()[0]

after_cnt = con.execute(f"""
SELECT COUNT(*) FROM {FULL} AT (TIMESTAMP => ?);
""", [ts_after]).fetchone()[0]

current_cnt = con.execute(f"SELECT COUNT(*) FROM {FULL};").fetchone()[0]

pd.DataFrame({
    "ts_before":[ts_before],
    "ts_after":[ts_after],
    "count_before":[before_cnt],
    "count_after":[after_cnt],
    "count_current":[current_cnt],
})


Tímastimpill fyrir: 2025-09-21 16:35:00.677411+00:00
Tímastimpill eftir: 2025-09-21 16:35:01.814745+00:00


Unnamed: 0,ts_before,ts_after,count_before,count_after,count_current
0,2025-09-21 16:35:00.677411+00:00,2025-09-21 16:35:01.814745+00:00,218473,218474,218474


In [131]:
# Cleanup
con.execute(f"DELETE FROM {SCHEMA}.{TABLE} WHERE faerslunumer = 'DEMO01';")
con.execute(f"SELECT COUNT(*) AS rows_after_cleanup FROM {SCHEMA}.{TABLE};").fetchdf()

Unnamed: 0,rows_after_cleanup
0,218473


## 5) ACID Demo
Byrjum transaction, útbúum breytingar, innitiate'um **ROLLBACK** og staðfestum að upphaf og endir sé sá sami.

In [132]:
before = con.execute(f"SELECT COUNT(*) FROM {SCHEMA}.{TABLE};").fetchone()[0]
con.execute("BEGIN;")
con.execute(f"INSERT INTO {SCHEMA}.{TABLE} (faerslunumer) VALUES ('DEMO_TX');")
during = con.execute(f"SELECT COUNT(*) FROM {SCHEMA}.{TABLE};").fetchone()[0]
con.execute("ROLLBACK;")
after = con.execute(f"SELECT COUNT(*) FROM {SCHEMA}.{TABLE};").fetchone()[0]
pd.DataFrame({"before":[before], "during":[during], "after_rollback":[after]})

Unnamed: 0,before,during,after_rollback
0,218473,218474,218473


## 6) Metadata

In [133]:
con.execute("SHOW TABLES FROM hms;").fetchdf()

Unnamed: 0,name
0,kaupskra
1,kaupskra_raw


In [149]:
# Hvernig er ducklake upp sett?
con.execute("""
SELECT *
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog','information_schema','pg_toast')
ORDER BY 1,2;
""").fetchdf().head(10)


Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action,TABLE_COMMENT
0,__ducklake_metadata_my_lake,public,ducklake_delete_file,BASE TABLE,,,,,,YES,NO,,
1,__ducklake_metadata_my_lake,public,ducklake_table_stats,BASE TABLE,,,,,,YES,NO,,
2,__ducklake_metadata_my_lake,public,ducklake_data_file,BASE TABLE,,,,,,YES,NO,,
3,__ducklake_metadata_my_lake,public,ducklake_column_mapping,BASE TABLE,,,,,,YES,NO,,
4,__ducklake_metadata_my_lake,public,ducklake_column_tag,BASE TABLE,,,,,,YES,NO,,
5,__ducklake_metadata_my_lake,public,ducklake_view,BASE TABLE,,,,,,YES,NO,,
6,__ducklake_metadata_my_lake,public,ducklake_file_partition_value,BASE TABLE,,,,,,YES,NO,,
7,__ducklake_metadata_my_lake,public,ducklake_column,BASE TABLE,,,,,,YES,NO,,
8,__ducklake_metadata_my_lake,public,ducklake_snapshot_changes,BASE TABLE,,,,,,YES,NO,,
9,__ducklake_metadata_my_lake,public,ducklake_files_scheduled_for_deletion,BASE TABLE,,,,,,YES,NO,,


In [135]:
con.execute(f"""
SELECT * FROM {LAKE}.snapshots()
ORDER BY snapshot_id;
""").fetchdf()

Unnamed: 0,snapshot_id,snapshot_time,schema_version,changes,author,commit_message,commit_extra_info
0,0,2025-09-21 16:34:01.046908+00:00,0,{'schemas_created': ['main']},,,
1,1,2025-09-21 16:34:57.379000+00:00,1,{'schemas_created': ['hms']},,,
2,2,2025-09-21 16:34:57.403644+00:00,2,"{'tables_created': ['hms.kaupskra_raw'], 'tabl...",,,
3,3,2025-09-21 16:35:00.677411+00:00,3,"{'tables_created': ['hms.kaupskra'], 'tables_i...",,,
4,4,2025-09-21 16:35:01.814745+00:00,3,{'tables_inserted_into': ['3']},Notebook Demo,Insert DEMO01,{'source':'notebook'}
5,5,2025-09-21 16:35:02.085162+00:00,3,{'tables_deleted_from': ['3']},,,


In [136]:
schema_version = con.execute(f"SELECT * FROM {LAKE}.current_snapshot();").fetchone()[0]
print(f"select from schema version: {schema_version}")
con.execute(f"SELECT COUNT(*) FROM {FULL} AT (VERSION => {schema_version});").fetchone()[0]

select from schema version: 5


218473

## 7) Partitioning

Fyrir stórar skrár er oft gott að partitiona eftir þeim dálkum sem eru hvað skilvirkastir í að filtera gögnin. Oft er það t.d. tími, en að öllum líkindum er oftar leitað að kaupsamningum 2025 frekar en 2006, þá getur verið hagræðing í því að skipta skránni upp eftir árum, og geyma hvert ár í sér skrá.

Notum hér útgáfudag "utgdag" sem partition of afritum töfluna yfir í partitionir.

In [137]:
con.begin()
con.execute("""
-- Gerum tóma töflu út frá sama skéma og hms.kaupskra
CREATE TABLE hms.kaupskra_partition AS
SELECT * FROM hms.kaupskra WHERE 1=0;
ALTER TABLE hms.kaupskra_partition SET PARTITIONED BY (year(utgdag));
""")

con.execute("""
-- 3) load all rows into the new (partitioned) table
INSERT INTO hms.kaupskra_partition
SELECT * FROM hms.kaupskra;
""")
con.commit()

<_duckdb.DuckDBPyConnection at 0x22831d3ab30>

### Hægt er að sjá hvernig skrárnar eru geymdar:

In [140]:
con.execute("""
SELECT dt.table_id,
    dt.table_uuid,
    dt.table_name,
    pi.partition_id,
    pc.column_id AS partition_column_id,
    pc.transform AS partition_column_transform
FROM
__ducklake_metadata_my_lake.ducklake_table dt
INNER JOIN __ducklake_metadata_my_lake.ducklake_partition_info pi
ON pi.table_id = dt.table_id
INNER JOIN __ducklake_metadata_my_lake.ducklake_partition_column pc
ON pc.table_id = dt.table_id
""").fetch_df()

Unnamed: 0,table_id,table_uuid,table_name,partition_id,partition_column_id,partition_column_transform
0,4,01996d21-02f4-79dd-9aad-e4e7b4060df2,kaupskra_partition,5,10,year


In [148]:
con.execute("""
SELECT pv.data_file_id,
            pv.table_id,
            df.path,
            df.file_format,
            df.record_count,
            df.file_size_bytes
            
 FROM
__ducklake_metadata_my_lake.ducklake_file_partition_value pv
INNER JOIN __ducklake_metadata_my_lake.ducklake_data_file df
ON df.data_file_id = pv.data_file_id
""").fetch_df().head()

Unnamed: 0,data_file_id,table_id,path,file_format,record_count,file_size_bytes
0,3,4,year=2011/ducklake-01996d21-03d2-7b0d-9230-0da...,parquet,7200,457137
1,4,4,year=2010/ducklake-01996d21-040b-7bdc-ab88-240...,parquet,4985,326632
2,5,4,year=2008/ducklake-01996d21-041a-7ab9-819e-83f...,parquet,6409,411020
3,6,4,year=2009/ducklake-01996d21-0425-7ee7-b11d-e55...,parquet,3764,256980
4,7,4,year=2006/ducklake-01996d21-0439-79f9-9a4b-b5d...,parquet,7117,453606
