Complete Code

In [0]:
%sql
-- Dynamic parameters (SQL-only) â€” derives LAST_EXTRACT_DATE from control table
CREATE OR REPLACE TEMP VIEW etl_params AS
SELECT
  'Y' AS IS_INCREMENTAL,
  -- derive last extract date from control table; fallback to 1970-01-01 if no record found
  COALESCE((
    date_format(
      max (ETL_LOAD_DATE), 'yyyy-MM-dd HH:mm:ss')
    ),
    '1970-01-01 00:00:00'
  ) AS LAST_EXTRACT_DATE,
  380 AS DATASOURCE_NUM_ID,
  '__NOT_APPLICABLE__' AS ETL_USAGE_CODE,
  '1' AS TENANT_ID,
  2624634 AS ETL_PROC_WID,
  12345 AS EXECUTION_ID,
  30 AS PRUNE_DAYS
FROM workspace.oracle_edw.w_etl_load_dates
WHERE PACKAGE_NAME = 'SDE_ORAR122_ADAPTOR_SDE_ORA_GLSEGMENTDIMENSION'
  AND DATASOURCE_NUM_ID = 380
  AND ETL_USAGE_CODE = '__NOT_APPLICABLE__';



In [0]:
%sql
-- 2) STAGING: read source tables, apply incremental filter and mapping
CREATE OR REPLACE TEMP VIEW stg_fnd_flex_values AS
SELECT
  fv.CREATION_DATE                       AS creation_date,
  fvs.LAST_UPDATE_DATE                   AS last_update_date1,
  fv.LAST_UPDATE_DATE                    AS changed_on_dt,
  'N'                                    AS delete_flg,
  CAST(fv.FLEX_VALUE_SET_ID AS STRING)   AS segment_lov_id,
  CASE WHEN fvs.VALIDATION_TYPE = 'I'
       THEN fv.FLEX_VALUE
       ELSE concat(fv.FLEX_VALUE, '~', fv.PARENT_FLEX_VALUE_LOW)
  END                                    AS segment_val_code,
  CAST(fv.CREATED_BY AS STRING)          AS created_by_id,
  CAST(fv.LAST_UPDATED_BY AS STRING)     AS changed_by_id,
  to_timestamp('1899-01-01 00:00:00','yyyy-MM-dd HH:mm:ss') AS src_eff_from_dt,
  to_timestamp('3714-01-01 00:00:00','yyyy-MM-dd HH:mm:ss') AS src_eff_to_dt,
  concat(CAST(fv.FLEX_VALUE_SET_ID AS STRING), '~', fv.FLEX_VALUE) AS integration_id,
  fv.LAST_UPDATE_DATE                    AS source_last_update_date
FROM workspace.oracle_ebs.FND_FLEX_VALUES fv
JOIN workspace.oracle_ebs.FND_FLEX_VALUE_SETS fvs
  ON fv.FLEX_VALUE_SET_ID = fvs.FLEX_VALUE_SET_ID
CROSS JOIN etl_params p
WHERE fvs.VALIDATION_TYPE IN ('I','D')
  AND (
      fvs.LAST_UPDATE_DATE > p.LAST_EXTRACT_DATE
   OR fv.LAST_UPDATE_DATE  > p.LAST_EXTRACT_DATE
  )
;


In [0]:
%sql
-- 3) INSERT incremental rows into target (append)
INSERT INTO workspace.oracle_edw.w_gl_segment_ds
(
  SEGMENT_LOV_ID,
  SEGMENT_VAL_CODE,
  CREATED_BY_ID,
  CHANGED_BY_ID,
  CREATED_ON_DT,
  CHANGED_ON_DT,
  AUX1_CHANGED_ON_DT,
  SRC_EFF_FROM_DT,
  SRC_EFF_TO_DT,
  DELETE_FLG,
  DATASOURCE_NUM_ID,
  INTEGRATION_ID,
  TENANT_ID,
  X_CUSTOM
)
SELECT
  s.segment_lov_id,
  COALESCE(NULLIF(s.segment_val_code, ''), '__UNASSIGNED__') AS segment_val_code,
  s.created_by_id,
  s.changed_by_id,
  try_cast(s.creation_date AS TIMESTAMP) AS created_on_dt,
  try_cast(s.changed_on_dt AS TIMESTAMP) AS changed_on_dt,
  try_cast(s.last_update_date1 AS TIMESTAMP) AS aux1_changed_on_dt,
  try_cast(s.src_eff_from_dt AS TIMESTAMP) AS src_eff_from_dt,
  try_cast(s.src_eff_to_dt AS TIMESTAMP) AS src_eff_to_dt,
  s.delete_flg,
  p.DATASOURCE_NUM_ID AS DATASOURCE_NUM_ID,
  s.integration_id,
  p.TENANT_ID AS TENANT_ID,
  '0' AS X_CUSTOM
FROM stg_fnd_flex_values s
CROSS JOIN etl_params p
;

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
-- 4) Upsert control table: update existing row for this package OR insert new row
MERGE INTO workspace.oracle_edw.w_etl_load_dates AS tgt
USING (
  SELECT
    p.DATASOURCE_NUM_ID AS datasource_num_id,
    'SDE_ORAR122_ADAPTOR_SDE_ORA_GLSEGMENTDIMENSION' AS package_name,
    'W_GL_SEGMENT_DS' AS target_table_name,
    p.ETL_USAGE_CODE    AS etl_usage_code,
    p.ETL_PROC_WID      AS etl_proc_wid,
    p.EXECUTION_ID      AS load_plan_id,
    current_timestamp() AS etl_load_date,
    CASE WHEN p.IS_INCREMENTAL = 'Y' THEN '1' ELSE '0' END AS committed,
    date_add(current_timestamp(), -p.PRUNE_DAYS) AS wip_load_start_date
  FROM etl_params p
) src
ON tgt.datasource_num_id = src.datasource_num_id
  AND tgt.package_name = src.package_name
  AND tgt.etl_usage_code = src.etl_usage_code
WHEN MATCHED THEN
  UPDATE SET
    tgt.target_table_name = src.target_table_name,
    tgt.etl_proc_wid = src.etl_proc_wid,
    tgt.load_plan_id = src.load_plan_id,
    tgt.wip_load_start_date = src.wip_load_start_date,
    tgt.etl_load_date = src.etl_load_date,
    tgt.committed = src.committed
WHEN NOT MATCHED THEN
  INSERT (
    datasource_num_id, package_name, target_table_name, etl_usage_code,
    etl_proc_wid, load_plan_id, wip_load_start_date, last_max_date, etl_load_date, committed
  )
  VALUES (
    src.datasource_num_id, src.package_name, src.target_table_name, src.etl_usage_code,
    src.etl_proc_wid, src.load_plan_id, src.wip_load_start_date, NULL, src.etl_load_date, src.committed
  )
;


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


In [0]:
%sql
-- 5) Insert a history row into control log table
INSERT INTO workspace.oracle_edw.w_etl_load_dates_log
(
  DATASOURCE_NUM_ID,
  PACKAGE_NAME,
  TARGET_TABLE_NAME,
  ETL_USAGE_CODE,
  ETL_PROC_WID,
  LOAD_PLAN_ID,
  SESSION_ID,
  WIP_LOAD_START_DATE,
  LAST_MAX_DATE,
  ETL_LOAD_DATE,
  COMMITTED
)
SELECT
  p.DATASOURCE_NUM_ID,
  'SDE_ORAR122_ADAPTOR_SDE_ORA_GLSEGMENTDIMENSION',
  'W_GL_SEGMENT_DS',
  p.ETL_USAGE_CODE,
  p.ETL_PROC_WID,
  p.EXECUTION_ID,
  p.ETL_PROC_WID AS SESSION_ID,
  date_add(current_timestamp(), -p.PRUNE_DAYS) AS WIP_LOAD_START_DATE,
  NULL AS LAST_MAX_DATE,
  current_timestamp() AS ETL_LOAD_DATE,
  CASE WHEN p.IS_INCREMENTAL = 'Y' THEN '1' ELSE '0' END AS COMMITTED
FROM etl_params p
;


num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
-- 6) Cleanup
DROP VIEW IF EXISTS stg_fnd_flex_values;
DROP VIEW IF EXISTS etl_params;
