In [None]:
import os
from typing import Mapping, Optional, Sequence

from dotenv import load_dotenv
import pyspark
from pyspark.sql import SparkSession

# Source
INCOMING_ROOT = "s3a://landing-isis/opralog/incoming"
OPRALOGDB_TABLES: Mapping[str, Optional[Mapping[str, str]]] = dict(
    LOGBOOKS=dict(unique_keys=("LOGBOOK_ID",), partition_by=None),
    LOGBOOK_ENTRIES=dict(
        unique_keys=("LOGBOOK_ID", "ENTRY_ID"), partition_by=None
    ),
    ENTRIES=dict(unique_keys=["ENTRY_ID"], partition_by=None),
    MORE_ENTRY_COLUMNS=dict(
        unique_keys=["ENTRY_ID", "COLUMN_NO", "ENTRY_TYPE_ID"],
        partition_by=None,
    ),
    ADDITIONAL_COLUMNS=dict(
        unique_keys=["COLUMN_NO", "ENTRY_TYPE_ID"], partition_by=None
    ),
)

# Destination
TARGET_CATALOG = "isis"
TARGET_DB = "cleaned"
OPRALOG_LOGBOOK_MERGED = "opralog_logbook_entry" 

# 
load_dotenv()
spark = (
    SparkSession.builder
        .master("spark://data-accelerator.isis.cclrc.ac.uk:7077")
        .config("spark.hadoop.fs.s3a.access.key", os.environ["S3_ACCESS_KEY"])
        .config("spark.hadoop.fs.s3a.secret.key", os.environ["S3_ACCESS_SECRET"])
        .getOrCreate()
)
spark.active()

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {TARGET_CATALOG}.{TARGET_DB}")
spark.sql(f"USE {TARGET_CATALOG}.{TARGET_DB}")

In [None]:
%%time

# Load in temporary tables
loadtype = "full"
ingest_date = "2024/11/21"
for tablename in OPRALOGDB_TABLES.keys():
    sources = f"{INCOMING_ROOT}/{tablename}/{loadtype}/{ingest_date}/*.parquet"
    print(f"Ingesting path '{sources}'")
    df = spark.read.parquet(sources)
    df.createOrReplaceTempView(tablename)
    print(spark.sql(f"SELECT COUNT(*) FROM {tablename}"))

In [None]:
%%sql

WITH ordered_entries AS (
    SELECT ENTRY_ID FROM ENTRIES ORDER BY ENTRY_ID
)
SELECT FIRST_VALUE(ENTRY_ID), LAST_VALUE(ENTRY_ID) FROM ordered_entries

In [None]:
%%time

snapshot = """
SELECT
  CAST(ENTRIES.ENTRY_ID AS LONG) AS entry_id,
  CAST(LOGBOOKS.LOGBOOK_ID AS LONG) AS logbook_id,
  CAST(ADDITIONAL_COLUMNS.COLUMN_NO AS LONG) AS extra_column_no,
  CAST(ADDITIONAL_COLUMNS.ENTRY_TYPE_ID AS LONG) AS extra_column_id,
  CAST(LOGBOOK_NAME AS STRING) AS logbook_name,
  ENTRY_TIMESTAMP AS entered,
  CAST(ENTRY_DESCRIPTION AS STRING) AS description,
  CAST(SHADOW_COMMENT AS STRING) AS comment_text,
  CAST(COL_TITLE AS STRING) AS column_title,
  CAST(COL_DATA AS STRING) AS string_data,
  CAST(NUMBER_VALUE AS DOUBLE) AS number_data 
FROM ENTRIES
JOIN LOGBOOK_ENTRIES ON LOGBOOK_ENTRIES.ENTRY_ID = ENTRIES.ENTRY_ID
JOIN LOGBOOKS ON LOGBOOKS.LOGBOOK_ID = LOGBOOK_ENTRIES.LOGBOOK_ID
LEFT OUTER JOIN MORE_ENTRY_COLUMNS ON MORE_ENTRY_COLUMNS.ENTRY_ID = ENTRIES.ENTRY_ID
LEFT OUTER JOIN ADDITIONAL_COLUMNS ON ADDITIONAL_COLUMNS.COLUMN_NO = MORE_ENTRY_COLUMNS.COLUMN_NO AND ADDITIONAL_COLUMNS.ENTRY_TYPE_ID = MORE_ENTRY_COLUMNS.ENTRY_TYPE_ID
WHERE
  LOGBOOK_ENTRIES.LOGBOOK_ID = PRINCIPAL_LOGBOOK
  AND (COL_DATA IS NOT NULL OR NUMBER_VALUE IS NOT NULL)
"""
snapshot_df = spark.sql(snapshot)
snapshot_df.createOrReplaceTempView("snapshot")

In [None]:
%%time

# Loading a full snapshot should just replace the whole table
spark.sql(f"DROP TABLE IF EXISTS {OPRALOG_LOGBOOK_MERGED}")

table_ensure_exists = f"""
CREATE TABLE {OPRALOG_LOGBOOK_MERGED} (
  entry_id LONG,
  logbook_id LONG,
  extra_column_no LONG,
  extra_column_id LONG,
  logbook_name STRING,
  entered TIMESTAMP,
  description STRING,
  comment_text STRING,
  column_title STRING,
  string_data STRING,
  number_data DOUBLE
)
USING iceberg
PARTITIONED BY (logbook_name, month(entered))
"""
spark.sql(table_ensure_exists)

In [None]:
%%time

insert_into = f"INSERT INTO {OPRALOG_LOGBOOK_MERGED} SELECT * FROM snapshot"
spark.sql(insert_into)

# We will need to use merge when we have incremental snapshots
# merge_snapshot = """
# MERGE INTO opralog_logbook_entry t
# USING snapshot s
# ON t.logbook_id = s.logbook_id AND t.entry_id = s.entry_id AND t.extra_column_no = s.extra_column_no AND t.extra_column_id = s.extra_column_id
# WHEN MATCHED THEN UPDATE SET *
# WHEN NOT MATCHED THEN INSERT *
# """
# df = spark.sql(merge_snapshot)


In [None]:
%sql SELECT COUNT(*) FROM {OPRALOG_LOGBOOK_MERGED}

In [None]:
%sql WITH ordered_entries AS ( \
    SELECT entry_id FROM {OPRALOG_LOGBOOK_MERGED} ORDER BY entry_id \
) \
SELECT FIRST_VALUE(entry_id), LAST_VALUE(entry_id) FROM ordered_entries

In [None]:
%sql DESCRIBE EXTENDED {OPRALOG_LOGBOOK_MERGED}