# Mål: Gjenskape ETL Jobb (ARBEID_KJERNE. FK_1_DK_VEDTAK)

#### Source: FK_P.ARENA_FK_VEDTAK      

#### Target: DK_P.VEDTAK

In [None]:
# Denne mappingen laster data fra forkammer tabellen ARENA_FK_VEDTAK. 
# Mappingen laster alle rader som ligg i forkammeret, men oppdaterer eller setter inn kun rader som er endra eller nye.
# Parameter: $PMSourceFileDir\PARAMETER\3_1_FK_DK_NAV\FK_1_DK_VEDTAK.PRM
# Granularitet: En rad per vedtaksid og gyldighetsintervall.
# Versjonering: Nei.

## Importing Libraries

In [None]:
import pandas as pd
import cx_Oracle as cx_Oracle
import time
import os
import json

## ON PREM (SECRETS)

In [None]:
from dataverk_vault import api as vault_api

vault_api.set_secrets_as_envs()
response = os.environ["dvh"]
config = json.loads(response)

## GCLOUD (SECRETS)

In [None]:
# Import the Secret Manager client library.
from google.cloud import secretmanager
#from sqlalchemy import false

# GCP project in which to store secrets in Secret Manager.
project_id = "knada-gcp"

# ID of the secret to create.
secret_id = "amit-garg"

# Create the Secret Manager client.
client = secretmanager.SecretManagerServiceClient()

# Access the secret version.
response = client.access_secret_version(
    name=f"projects/{project_id}/secrets/{secret_id}/versions/latest"
)
payload = response.payload.data.decode("UTF-8")

# print("Plaintext: {}".format(payload))

# Convert json to dictionary.
config = json.loads(payload)

## SQL QUERIES

In [None]:
SQL_CREATE_TABLE = """
CREATE PRIVATE TEMPORARY TABLE ORA$PTT_SOURCE
AS
SELECT
--ROWNUM +(SELECT MAX(PK_VEDTAK) FROM DK_P.VEDTAK) AS PK_VEDTAK_ID
S.AAR
,S.MOD_DATO AS ENDRET_KILDE_DATO
,COALESCE(T1.PK_AKTIVITET_FASE, -1) FK_AKTIVITET_FASE
,COALESCE(T2.PK_BEHANDLING_STATUS, -1) FK_BEHANDLING_STATUS
,COALESCE(T3.PK_DIM_ORGANISASJON,-1) FK_NAV_ENHET
,COALESCE(T8.FK_PERSON1, -1) AS FK_PERSON1
,COALESCE(T4.PK_SAK_RESULTAT, -1)FK_SAK_RESULTAT
,COALESCE(T5.PK_SAK_TYPE, -1) FK_SAK_TYPE
,COALESCE(T6.PK_STONAD, -1) FK_STONAD
,S.FRA_DATO
,1 AS GYLDIG_FLAGG
,0 AS GYLDIG_FLAGG_SCD2
,S.LASTET_DATO AS GYLDIG_FRA_DATO
,TO_DATE('31.12.9999','DD.MM.YYYY')AS GYLDIG_TIL_DATO
,TRUNC(SYSDATE -1) AS GYLDIG_TIL_DATO_SCD2
,S.KILDESYSTEM
,SYSDATE AS LASTET_DATO
,S.PERSON_ID AS LK_PERSON_ID_ARENA
,S.SAK_ID AS LK_SAK_ID
,S.VEDTAK_ID AS LK_VEDTAK_ID
,S.VEDTAK_ID_RELATERT AS LK_VEDTAK_ID_RELATERT
,S.LOPENRSAK AS LOPENR_SAK
,S.LOPENRVEDTAK AS LOPENR_VEDTAK
,'TEST' AS MD5_SJEKKSUM
,S.DATO_MOTTATT AS MOTTATT_DATO
,SYSDATE AS OPPDATERT_DATO
,S.REG_DATO AS OPPRETTET_KILDE_DATO
,T7.PK_VEDTAK AS PK_VEDTAK
,S.BRUKERID_ANSVARLIG AS SAKSBEHANDLER
--,S.STATUS_SENSITIV AS SENSITIV_FLAGG
,CASE WHEN S.STATUS_SENSITIV LIKE 'N' THEN 0 WHEN S.STATUS_SENSITIV LIKE 'Y' THEN 1 END AS SENSITIV_FLAGG
,T7.SLETTET_DATO AS SLETTET_DATO
,S.TIL_DATO
,S.TOTALBELOP
FROM FK_P.ARENA_FK_VEDTAK S
LEFT JOIN DK_P.AKTIVITET_FASE T1 ON T1.AKTIVITET_FASE_KODE = S.AKTFASEKODE AND S.LASTET_DATO BETWEEN T1.GYLDIG_FRA_DATO AND T1.GYLDIG_TIL_DATO AND T1.KILDESYSTEM = S.KILDESYSTEM
LEFT JOIN DK_P.BEHANDLING_STATUS T2 ON T2.BEHANDLING_STATUS_KODE = S.VEDTAKSTATUSKODE AND S.LASTET_DATO BETWEEN T2.GYLDIG_FRA_DATO AND T2.GYLDIG_TIL_DATO AND T2.KILDESYSTEM = S.KILDESYSTEM
LEFT JOIN DT_P.DIM_ORGANISASJON T3 ON T3.ARENA_ENHET_KODE = S.AETATENHET_BEHANDLER AND T3.DIM_NIVAA =2 AND S.LASTET_DATO BETWEEN T3.GYLDIG_FRA_DATO AND T3.GYLDIG_TIL_DATO
LEFT JOIN DK_P.SAK_RESULTAT T4 ON T4.SAK_RESULTAT_KODE = S.UTFALLKODE AND S.LASTET_DATO BETWEEN T4.GYLDIG_FRA_DATO AND T4.GYLDIG_TIL_DATO AND T4.KILDESYSTEM = S.KILDESYSTEM
LEFT JOIN DK_P.SAK_TYPE T5 ON T5.SAK_TYPE_KODE = S.VEDTAKTYPEKODE AND S.LASTET_DATO BETWEEN T5.GYLDIG_FRA_DATO AND T5.GYLDIG_TIL_DATO AND T5.KILDESYSTEM = S.KILDESYSTEM
LEFT JOIN DK_P.STONAD T6 ON T6.STONAD_KODE = S.RETTIGHETKODE AND S.LASTET_DATO BETWEEN T6.GYLDIG_FRA_DATO AND T6.GYLDIG_TIL_DATO AND T6.KILDE = S.KILDESYSTEM
LEFT JOIN DK_P.VEDTAK T7 ON T7.GYLDIG_FLAGG = 1 AND T7.LK_VEDTAK_ID = S.VEDTAK_ID
LEFT JOIN DT_PERSON.DVH_PERSON_IDENT_ARENA T8 ON S.PERSON_ID = T8.ARENA_ID AND S.LASTET_DATO BETWEEN T8.GYLDIG_FRA_DATO AND T8.GYLDIG_TIL_DATO
"""
INSERT_SCDI = """
INSERT INTO DK_P.VEDTAK(
PK_VEDTAK
,FK_STONAD
,FK_SAK_TYPE
,FK_PERSON1
,FK_BEHANDLING_STATUS
,FK_SAK_RESULTAT
,FK_AKTIVITET_FASE
,FK_NAV_ENHET
,LK_PERSON_ID_ARENA
,LK_SAK_ID
,LK_VEDTAK_ID
,LK_VEDTAK_ID_RELATERT
,AAR
,LOPENR_SAK
,LOPENR_VEDTAK
,TOTALBELOP
,MOTTATT_DATO
,SENSITIV_FLAGG
,SAKSBEHANDLER
,FRA_DATO
,TIL_DATO
,OPPRETTET_KILDE_DATO
,ENDRET_KILDE_DATO
,SLETTET_DATO
,GYLDIG_FLAGG
,MD5_SJEKKSUM
,GYLDIG_FRA_DATO
,GYLDIG_TIL_DATO
,LASTET_DATO
,OPPDATERT_DATO
,KILDESYSTEM
)
WITH CTE AS
(SELECT 
ROWNUM +(SELECT MAX(PK_VEDTAK) FROM DK_P.VEDTAK)AS PK_VEDTAK
,FK_STONAD
,FK_SAK_TYPE
,FK_PERSON1
,FK_BEHANDLING_STATUS
,FK_SAK_RESULTAT
,FK_AKTIVITET_FASE
,FK_NAV_ENHET
,LK_PERSON_ID_ARENA
,LK_SAK_ID
,LK_VEDTAK_ID
,LK_VEDTAK_ID_RELATERT
,AAR
,LOPENR_SAK
,LOPENR_VEDTAK
,TOTALBELOP
,MOTTATT_DATO
,SENSITIV_FLAGG
,SAKSBEHANDLER
,FRA_DATO
,TIL_DATO
,OPPRETTET_KILDE_DATO
,ENDRET_KILDE_DATO
,SLETTET_DATO
,GYLDIG_FLAGG
,MD5_SJEKKSUM
,GYLDIG_FRA_DATO
,GYLDIG_TIL_DATO
,LASTET_DATO
,OPPDATERT_DATO
,KILDESYSTEM
FROM ORA$PTT_SOURCE WHERE PK_VEDTAK IS NULL)
SELECT * FROM CTE
"""
UPDATE_SCDI = """
MERGE INTO DK_P.VEDTAK x

USING (SELECT * FROM ORA$PTT_SOURCE) y

ON (x.PK_VEDTAK  = y.PK_VEDTAK
AND x.FK_STONAD = y.FK_STONAD
AND x.FK_SAK_TYPE = y.FK_SAK_TYPE
AND x.FK_BEHANDLING_STATUS = y.FK_BEHANDLING_STATUS
AND x.FK_SAK_RESULTAT = y.FK_SAK_RESULTAT
AND x.FK_AKTIVITET_FASE = y.FK_AKTIVITET_FASE
AND x.LK_PERSON_ID_ARENA = y.LK_PERSON_ID_ARENA
AND x.FRA_DATO = y.FRA_DATO
AND x.TIL_DATO = y.TIL_DATO)

WHEN MATCHED THEN
    UPDATE SET
x.AAR = y.AAR
,x.FK_NAV_ENHET = y.FK_NAV_ENHET
,x.FK_PERSON1 = y.FK_PERSON1
,x.LK_SAK_ID = y.LK_SAK_ID
,x.LK_VEDTAK_ID_RELATERT = y.LK_VEDTAK_ID_RELATERT
,x.LOPENR_SAK = y.LOPENR_SAK
,x.LOPENR_VEDTAK = y.LOPENR_VEDTAK
,x.MOTTATT_DATO = y.MOTTATT_DATO
,x.OPPDATERT_DATO = y.OPPDATERT_DATO
,x.SAKSBEHANDLER = y.SAKSBEHANDLER
,x.SENSITIV_FLAGG = y.SENSITIV_FLAGG
,x.TOTALBELOP = y.TOTALBELOP
WHERE
x.AAR != y.AAR
OR x.FK_NAV_ENHET != y.FK_NAV_ENHET
OR x.FK_PERSON1 <> y.FK_PERSON1
OR x.LK_SAK_ID <> y.LK_SAK_ID
OR x.LK_VEDTAK_ID_RELATERT <> y.LK_VEDTAK_ID_RELATERT
OR x.LOPENR_SAK <> y.LOPENR_SAK
OR x.LOPENR_VEDTAK <> y.LOPENR_VEDTAK
OR x.MOTTATT_DATO <> y.MOTTATT_DATO
OR x.SAKSBEHANDLER <> y.SAKSBEHANDLER
OR x.SENSITIV_FLAGG <> y.SENSITIV_FLAGG
OR x.TOTALBELOP <> y.TOTALBELOP
OR x.SLETTET_DATO <> y.SLETTET_DATO
"""

UPDATE_SCDII = """
MERGE INTO DK_P.VEDTAK x
USING (SELECT * FROM ORA$PTT_SOURCE) y
ON (x.PK_VEDTAK  = y.PK_VEDTAK)
WHEN MATCHED THEN
    UPDATE SET
x.GYLDIG_FLAGG = y.GYLDIG_FLAGG_SCD2
,x.GYLDIG_TIL_DATO = y.GYLDIG_TIL_DATO_SCD2
,x.OPPDATERT_DATO = y.OPPDATERT_DATO
WHERE
(x.FK_STONAD != y.FK_STONAD
OR x.FK_SAK_TYPE != y.FK_SAK_TYPE
OR x.FK_BEHANDLING_STATUS != y.FK_BEHANDLING_STATUS
OR x.FK_SAK_RESULTAT != y.FK_SAK_RESULTAT
OR x.FK_AKTIVITET_FASE != y.FK_AKTIVITET_FASE
OR x.LK_PERSON_ID_ARENA != y.LK_PERSON_ID_ARENA
OR x.FRA_DATO != y.FRA_DATO
OR x.TIL_DATO != y.TIL_DATO)
"""

INSERT_SCDII = """
INSERT INTO DK_P.VEDTAK
(
PK_VEDTAK
,FK_STONAD
,FK_SAK_TYPE
,FK_PERSON1
,FK_BEHANDLING_STATUS
,FK_SAK_RESULTAT
,FK_AKTIVITET_FASE
,FK_NAV_ENHET
,LK_PERSON_ID_ARENA
,LK_SAK_ID
,LK_VEDTAK_ID
,LK_VEDTAK_ID_RELATERT
,AAR
,LOPENR_SAK
,LOPENR_VEDTAK
,TOTALBELOP
,MOTTATT_DATO
,SENSITIV_FLAGG
,SAKSBEHANDLER
,FRA_DATO
,TIL_DATO
,OPPRETTET_KILDE_DATO
,ENDRET_KILDE_DATO
,SLETTET_DATO
,GYLDIG_FLAGG
,MD5_SJEKKSUM
,GYLDIG_FRA_DATO
,GYLDIG_TIL_DATO
,LASTET_DATO
,OPPDATERT_DATO
,KILDESYSTEM
)
WITH CTE_1 AS
(SELECT 
ROWNUM +(SELECT MAX(PK_VEDTAK) FROM DK_P.VEDTAK) PK_VEDTAK
,y.FK_STONAD
, y.FK_SAK_TYPE
, y.FK_PERSON1
, y.FK_BEHANDLING_STATUS
, y.FK_SAK_RESULTAT
, y.FK_AKTIVITET_FASE
, y.FK_NAV_ENHET
, y.LK_PERSON_ID_ARENA
, y.LK_SAK_ID
, y.LK_VEDTAK_ID
, y.LK_VEDTAK_ID_RELATERT
, y.AAR
, y.LOPENR_SAK
, y.LOPENR_VEDTAK
, y.TOTALBELOP
, y.MOTTATT_DATO
, y.SENSITIV_FLAGG
, y.SAKSBEHANDLER
, y.FRA_DATO
, y.TIL_DATO
, y.OPPRETTET_KILDE_DATO
, y.ENDRET_KILDE_DATO
, y.SLETTET_DATO
, y.GYLDIG_FLAGG
, y.MD5_SJEKKSUM
, y.GYLDIG_FRA_DATO
, y.GYLDIG_TIL_DATO
, y.LASTET_DATO
, y.OPPDATERT_DATO
, y.KILDESYSTEM
FROM ORA$PTT_SOURCE y 
INNER JOIN DK_P.VEDTAK x ON X.PK_VEDTAK  = Y.PK_VEDTAK
WHERE
x.FK_STONAD != y.FK_STONAD
OR x.FK_SAK_TYPE != y.FK_SAK_TYPE
OR x.FK_BEHANDLING_STATUS <> y.FK_BEHANDLING_STATUS
OR x.FK_SAK_RESULTAT <> y.FK_SAK_RESULTAT
OR x.FK_AKTIVITET_FASE <> y.FK_AKTIVITET_FASE
OR x.LK_PERSON_ID_ARENA <> y.LK_PERSON_ID_ARENA
OR x.FRA_DATO <> y.FRA_DATO
OR x.TIL_DATO <> y.TIL_DATO)
SELECT * FROM CTE_1
"""

SQL_COMMIT = """COMMIT"""

## CONNECT DATABASE

In [None]:
try:
    con = cx_Oracle.connect(**config["U"])
except:
    print("Oracle: Unable to connect")

cur = con.cursor()

## RUN ETL

In [None]:
# store starting time
begin = time.time()

try:
    cur.execute(SQL_CREATE_TABLE)
    cur.execute(INSERT_SCDI)
    cur.execute(UPDATE_SCDI)
    cur.execute(UPDATE_SCDII)
    cur.execute(INSERT_SCDII)
    cur.execute(SQL_COMMIT)
except cx_Oracle.DatabaseError as exc:
    error, = exc.args
    print("Oracle-Error-Code:", error.code)
    print("Oracle-Error-Message:", error.message)
    print("Oracle-Error-Offset:", error.offset)
    print("Oracle-Error-Context:", error.context)
    print("Oracle-Error-isrecoverable:", error.isrecoverable)
finally:
    if cur:
        cur.close()
    if con:
        con.close()


# store end time
end = time.time()

print(f"Total runtime of the program is {end - begin}")