# Migration Notebook

This notebook is a live document to control the process of migrating applications from an Oracle database to a Postgres database virtualized by Liberatii Gateway.

## Using this notebook

Each stage described in this notebook has a set of corresponding criteria in the **success criteria** spreadsheet and a description of troubleshooting steps to resolve errors. Proceed to a following step **only** when the tests in this notebook pass according to the definitions set out in the **success criteria**.

The **playbook** that references this document contains more detailed information in order to troubleshoot specific errors that can be used with the troubleshooting blocks in this document.

## Configuration

### Notebook preamble

The following blocks allow this notebook to directly access the databases and control some aspects of the styling.

In [None]:
%load_ext sql
import socket
import json
import os
import time
import pandas as pd
from IPython.utils import io
from tqdm import tqdm
import sys

In [None]:
%%html
<style>
td { text-align: left !important; valign: left !important; }
</style>

## Connection configuration

This sections configures the connection information to allow this notebook access to the components of the build:

* The Oracle database
* The Postgres database
* The Liberatii Gateway that virtualizes the Postgres database
* The Liberatii Data Platform that provides the API for actions to control the migration

This section should be pre-filled by Liberatii as part of the **plan and build** stage. It concludes with a test section to make sure that all components are behaving correctly.

In [None]:
## Hostname or IP Address of the Oracle Database
ORACLE_HOST="oracle"

## Hostname or IP Address of the Liberatii Gateway
GATEWAY_HOST="pgtranslator"

## Hostname or IP Address of the Liberatii Data Platform
PLATFORM_HOST="migration"

## Hostname or IP Address of the Postgres Database
POSTGRES_HOST="postgres"

## Database connection information
USERNAME="HR"
PASSWORD="hr"
DATABASE="pdborcl"

## Oracle user required for Log Mining
SYNC_USERNAME='c##xstrm'
SYNC_PASSWORD='xs'

## Location of Change-Data-Capture pipeline manager
CDC_HOST='http://kafka-connect:8083'

## Service ports
ORACLE_PORT=1521
POSTGRES_PORT=5432
GATEWAY_PORT=15432

## Connection settings for code blocks
POSTGRES=f'postgresql://{USERNAME}:{PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{DATABASE}'
ORACLE_CONNECTION_STRING=f'(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={ORACLE_HOST})(PORT={ORACLE_PORT}))(CONNECT_DATA=(SERVICE_NAME={DATABASE})))'
ORACLE=f'oracle://{USERNAME}:{PASSWORD}@{ORACLE_CONNECTION_STRING}'
PLATFORM=f"http://{socket.gethostbyname(PLATFORM_HOST)}:3000"
GATEWAY=f'postgresql://{USERNAME}:{PASSWORD}@{GATEWAY_HOST}:{GATEWAY_PORT}/{DATABASE}'
ORACLE_SYNC=f'oracle://{SYNC_USERNAME}:{SYNC_PASSWORD}@{ORACLE_CONNECTION_STRING}'

print(f"""
Connection settings:

    Liberatii Gateway:
      {GATEWAY}
    Liberatii Data Platform:
      {PLATFORM}/api

    PostgreSQL:             
      {POSTGRES}
    Oracle:                 
      {ORACLE}
    
The API may be opened in a browser for reference.
""")


The following blocks test the connectivity by querying version information of the databases under test.

Proceed to the next section only when:

1. Both the Oracle and Postgres databases return the expected version information
2. The Gateway returns a version that matches the Postgres database, but uses Oracle Syntax
3. The API returns `[]` (no current operations are in progress) or only completed operations

If any of the above steps fail, verify the connection information, check firewalls and permissions allow access and ensure that the machines are accessible in their cloud consoles (if appropriate).


In [None]:
%%sql {ORACLE}
-- Find version information for the Oracle database
select banner FROM v$version

In [None]:
%%sql {ORACLE_SYNC}
-- Find version information for the Oracle database
select banner FROM v$version

In [None]:
%%sql {POSTGRES}
-- Find version information for the PostgreSQL database
select version()

In [None]:
%%sql {GATEWAY}
-- Find version information for the PostgreSQL database
select banner FROM v$version

In [None]:
!curl -X 'GET' \
  {PLATFORM}/operation \
  -H 'accept: application/json'

## Liberatii Data Platform configuration

The following blocks configure the Liberatii Data Platform with the connection information for the
databases and the gateway, and initialises the platform.

Proceed to the schema conversion stage when the "init" operation returns a message with `"status": "Succeeded"` and the final query returns the expected number of objects.


In [None]:
!curl -s {PLATFORM}/connection -H 'Content-Type: application/json' \
   -d '{{"type":"Oracle","connectionString":"{ORACLE_CONNECTION_STRING}","user":"{USERNAME}","password":"{PASSWORD}","id":1}}'

In [None]:
!curl -s {PLATFORM}/connection -H 'Content-Type: application/json' \
  -d '{{"type":"PostgreSQL","host":"{POSTGRES_HOST}","port":5432,"database":"{DATABASE}","user":"{USERNAME}","password":"{PASSWORD}","id":2}}'

In [None]:
!curl -s {PLATFORM}/connection -H 'Content-Type: application/json' \
  -d '{{"type":"LGW","host":"{GATEWAY_HOST}","port":{GATEWAY_PORT},"database":"{DATABASE}","user":"{USERNAME}","password":"{PASSWORD}", "id":3}}'

In [None]:
result = !curl -s {PLATFORM}/config -H 'Content-Type: application/json' \
  -d '{{"dataOnePass": true, "users":["{USERNAME}"], "verbose":2, "eraseOnInit":true}}'
print(json.dumps(json.loads(result[0]), indent=True))

In [None]:
result = !curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "id": 1, "oracle": 1, "lgw": 3, "stage": "init" }}'
print(json.dumps(json.loads(result[0]), indent=True))

In [None]:
result = !curl -s -X POST {PLATFORM}/operation/1/wait?count=1
print(json.dumps(json.loads(result[0]), indent=True))

In [None]:
%%sql {POSTGRES}
select count(*), type, stage, error from dbt.migration_objects group by type, stage, error

# Schema Conversion

The schema conversion stage translates the DDL statements recorded in the previous stage by passing them through Liberatii Gateway.

<img src="./step3.png" style="margin:auto" alt="Schema conversion diagram" title="Schema conversion diagram"/>

In order to speed up data transfer, this stage does **not** translate `INDEX` objects. Therefore, this stage is complete when all non-index objects are in the `D` stage. The following query displays the status of the migration and can be re-run to confirm status throughout the process.

Proceed to the next section only when all non-index objects are in the `D` stage.

If any objects do not migrate successfully they can be retryed by modifying their DDL or skipped by modifying their stage directly if they are not required. Steps to perform these operations are given at the end of this section.

In [None]:
%%sql {POSTGRES}
select count(*), type, stage, error from dbt.migration_objects group by type, stage, error

The following block will ask the Liberatii Data Platform to start the `schema` stage then monitor the objects left to be processed. It will finish when operation completes with either success or error. After this the database can be re-examined to determine the status of the migration.
<a id='MigrateSchema'/>

In [None]:
## Find the total number of objects to be migrated
with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where type != 'INDEX'

## Begin the operation
operation = !curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "id": 1, "oracle": 1, "lgw": 3, "stage": "schema" }}'

## Check the output
with tqdm(total=result[0][0], file=sys.stdout) as progress:
    while result[0][0] > 0:
        with io.capture_output():
            result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'I' and type != 'INDEX'
        if result[0][0] > 0:
            time.sleep(1)
        progress.update(progress.total - result[0][0] - progress.n)
        operation = !curl -s {PLATFORM}/operation/1
        if json.loads(operation[0])["status"] != 'Running':
            break
    operation = !curl -X POST -s {PLATFORM}/operation/1/wait
    with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'I' and type != 'INDEX'
    progress.update(progress.total - result[0][0] - progress.n)

## Print summary
status = json.loads(operation[0])["status"]    
print(f"Operation complete, status: {status}")

In [None]:
%%sql {POSTGRES}
select count(*), stage, type, error from dbt.migration_objects where type != 'INDEX' group by stage, type, error

### Reprocessing conversions

If an object fails to migrate it is possible to modify its DDL to (for example) add a cast to correct an error. The following block selects all objects that have failed to migrate:

In [None]:
%%sql {POSTGRES}
select name, error, ddl1 from dbt.migration_objects where error <> ''

The following block demonstrates the process of modifying the DDL of the migration objects to fix any errors. Once this has been performed the [schema migration operation](#MigateSchema) can be re-run to migrate this object again.

In [None]:
%%sql {POSTGRES}

update dbt.migration_objects set
    stage='I',
    error='',
    ddl1 = '<New DDL for object>'
where name = '<OBJECT NAME>'

### Skipping a conversion

If an object fails to migrate and is not required (as defined by the **success criteria**) then the object can be skipped by modifying its `stage` to show it as completed.

In [None]:
%%sql {POSTGRES}

delete from dbt.migration_objects where name = '<OBJECT NAME>'

# Data Transfer

The data transfer stages transfers data in blocks from the source database to the target database. It is possible to run this operation using multiple operations to increase throughput. However, the total throughput will be limited to the minimum of the following:

* The egress capabilities of the Oracle database and network
* The ingress capabilities of the Postgres database
* The bandwidth available to the Liberatii Data Platform

<img src="./step4.png" style="margin: auto; padding-bottom: 20px;" alt="Data Transfer" title="Data Transfer"/>

Proceed to the next section only when all tables are marked as 'K'.

The following step will show the data volume for each table that will be migrated:

In [None]:
%%sql {POSTGRES}
select name, pg_size_pretty(data_size), stage, error from dbt.migration_objects where type = 'TABLE' order by data_size desc

The following configuration options may improve throughput:

* `useNative` - Bypass the Liberatii Gateway when writing data if not required
* `rowsBuf` - Number of rows to process at a time. Increasing this will increase memory pressure on the Liberatii Data Platform but will reduce round-trips to the database. Increase this if you have tables with many rows but little data.
* `dataOnePass` - Do not stage data before transfer. This must be turned off to use data chunking or parallel tables (`parTables`).
* `dataChunkSize` - Size in bytes of the data chunks to copy in one go. 
* `parTables` - Set to true to allow multiple operations to act on single tables.

A reasonable estimation to achieve good throughput for tables with **a large volume of data per row** is to set `dataOnePass` to false, `parTables` to true and set the `dataChunkSize` to cover 10 rows of data or 100MB. If the converse is true and tables have a **low volume of data per row** then `dataOnePass` should be set to true.

The following cell demonstrates setting a configuration operation to disable `dataOnePass`:

In [None]:
!curl -XPUT {PLATFORM}/config/dataOnePass \
    -H 'Content-Type: application/json' \
    -d '{{"value": false}}'

The following cell will run the data transfer according to the current settings. The constant `OPERATIONS` can be adjusted as required to control the number of concurrent operations.

In [None]:
## Concurrent operations to run
OPERATIONS=2

## Find the total number of objects to be migrated
with io.capture_output():
    result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage in ('D', 'R', 'K') and type = 'TABLE'

## Begin the operations
operations=[]
for _ in range(OPERATIONS):
    operation = !curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "oracle": 1, "lgw": 3, "stage": "data" }}'
    operations.append(json.loads(operation[0])["operation"]["id"])
    print(f"Started operation id={operations[-1]}")

## Check the output
with tqdm(total=result[0][0], file=sys.stdout) as progress:
    while result[0][0] > 0 and len(operations) > 0:
        with io.capture_output():
            result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'D' and type = 'TABLE'
        if result[0][0] > 0:
            time.sleep(1)
        progress.update(progress.total - result[0][0] - progress.n)
        operation = !curl -s {PLATFORM}/operation/{operations[-1]}
        if json.loads(operation[0])["status"] != 'Running':
            operations.pop()
    for id in operations:
        operation = !curl -X POST -s {PLATFORM}/operation/{id}/wait
    with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'D' and type = 'TABLE'
    progress.update(progress.total - result[0][0] - progress.n)

## Print summary
status = json.loads(operation[0])["status"]  
print(f"Operation complete, status: {status}")

If the above step fails (`status: Failed`) then the logs can be viewed using the following block:

In [None]:
result=!curl -s {PLATFORM}/operation
for operation in json.loads(result[0]):
    if operation["status"] != "Succeeded":
        result=!curl -s {PLATFORM}/operation/{operation["id"]}
        print(f"Operation {operation['id']} failed, messages:\n  ", end='')
        print("\n  ".join(json.loads(result[0])["messages"]))

The following cell will check whether all tables are migrated. Any which are not in the 'R' or 'K' stages are not yet transfered.

In [None]:
%%sql {POSTGRES}
select stage, error from dbt.migration_objects where type = 'TABLE' group by stage, error

# Constraint Migration

The constraint migration stages sets up the constraints on the tables. This is done **after** the data transfer stage to prevent constraints conflicting with insertions into separate tables.

Proceed to the next section only when all objects are in the `D` or `d` stage.

The following cell will migrate the constraints for all objects:

In [None]:
## Find the total number of objects to be migrated
with io.capture_output():
    result=%sql {POSTGRES} select count(*) from dbt.migration_objects

## Begin the operation
operation = !curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "id": 1, "oracle": 1, "lgw": 3, "stage": "constraints" }}'

## Check the output
with tqdm(total=result[0][0], file=sys.stdout) as progress:
    while result[0][0] > 0:
        with io.capture_output():
            result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage in ('I', 'R')
        if result[0][0] > 0:
            time.sleep(1)
        progress.update(progress.total - result[0][0] - progress.n)
        operation = !curl -s {PLATFORM}/operation/1
        if json.loads(operation[0])["status"] != 'Running':
            break
    operation = !curl -X POST -s {PLATFORM}/operation/1/wait
    with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage in ('I', 'R')
    progress.update(progress.total - result[0][0] - progress.n)

## Print summary
status = json.loads(operation[0])["status"]
print(f"Operation complete, status: {status}")

In [None]:
%%sql {POSTGRES}
select count(*), stage, type, error from dbt.migration_objects group by stage, type, error

# Verification

The verification stage will run a hash check over all data in all columns to determine whether the data transfer is successful. This operation can be CPU and IO intensive on both databases so may take some time.

Proceed to the next stage only when all table objects are in the `K` stage.

In [None]:
## Find the total number of objects to be migrated
with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage in ('d', 'K') and type = 'TABLE'

## Begin the operation
operation = !curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "id": 1, "oracle": 1, "lgw": 3, "stage": "check" }}'

## Check the output
with tqdm(total=result[0][0], file=sys.stdout) as progress:
    while result[0][0] > 0:
        with io.capture_output():
            result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'd' and type = 'TABLE'
        if result[0][0] > 0:
            time.sleep(1)
        progress.update(progress.total - result[0][0] - progress.n)
        operation = !curl -s {PLATFORM}/operation/1
        if json.loads(operation[0])["status"] != 'Running':
            break
    operation = !curl -X POST -s {PLATFORM}/operation/1/wait
    with io.capture_output():
        result=%sql {POSTGRES} select count(*) from dbt.migration_objects where stage = 'd' and type = 'TABLE'
    progress.update(progress.total - result[0][0] - progress.n)

## Print summary
status = json.loads(operation[0])["status"]
print(f"Operation complete, status: {status}")

In [None]:
%%sql {POSTGRES}

select stage, error from dbt.migration_objects where type = 'TABLE' group by stage, error

### Verification errors

If there are verification errors it will be necessary to determine where these errors have come from. The most likely cause of a verification error is that data was added or removed between the data transfer and verification stages. In this case it is recommended to run the data transfer and verification from a **snapshot** then synchronised from the real database.

# Synchronization

The synchronization step will set up a Change-Data-Capture pipeline between the Oracle and Postgres database. It does this using LogMiner to collect changes from the database.

This connection requires permissions that the schema user is unlikely to have. A separate user should therefore be setup according to the guidelines specified here: https://debezium.io/documentation/reference/stable/connectors/oracle.html.

Proceed to the next section only when all table connectors are marked `RUNNING` for the status and task status.

The following cell will setup the new connection. The username and password can also be set here for ease of use:

In [None]:
# Change this quickly here to test new user connections
SYNC_USERNAME=f"{SYNC_USERNAME}"
SYNC_PASSWORD=f"{SYNC_PASSWORD}"

# Set the new information
print(f"Setting Oracle connection to: {SYNC_USERNAME}/{SYNC_PASSWORD}@{ORACLE_CONNECTION_STRING}")
!curl {PLATFORM}/connection -H 'Content-Type: application/json' \
   -d '{{  "type":"Oracle",\
           "connectionString":"{ORACLE_CONNECTION_STRING}",\
           "user": "{SYNC_USERNAME}",\
           "password": "{SYNC_PASSWORD}",\
           "id":1}}'

The following cell will enable the synchronization workers for all tables:

In [None]:
result=!curl -s {PLATFORM}/operation -H 'Content-Type: application/json' -d '{{ "id": 1, "oracle": 1, "postgres": 2, "lgw": 3, "stage": "sync" }}'
result=!curl -s -XPOST {PLATFORM}/operation/1/wait
decoded = json.loads(result[0])
print(decoded["status"])

In [None]:
result=!curl -I -s {CDC_HOST}/connectors
if result == []:
    print("CDC Host is down")
    decoded={}
else:
    result=!curl -s {CDC_HOST}/connectors?expand=status
    decoded=json.loads(result[0])
print("CDC Table status:")
pd.DataFrame(
    [   [
            sink,
            s["status"]["connector"]["state"],
            (s["status"]["tasks"] or [{"state": "..."}])[0]["state"]]
        for sink, s in decoded.items()],
    columns=["Sink", "Status", "Task Status"])

### Resolving connector issues

The following cell will enumerate the errors returned by any failing connectors. The most likely error to be encountered is "DDL Statement couldn't be parsed". This means that the table on the Oracle side cannot be parsed by the Change-Data-Capture tools.

If the tables that are causing issues are not in use they can be dropped to resolve this issue. Otherwise Liberatii can be contacted to find a workaround.

In [None]:
result=!curl -s {CDC_HOST}/connectors?expand=status
decoded=json.loads(result[0])
failures = [
    [sink, s["status"]["tasks"][0]["trace"]] 
    for sink, s in decoded.items()
    if (s["status"]["tasks"] or [{"state":""}])[0]["state"] == 'FAILED']
for name, failure in failures:
    print(f"Sink {name} failed with:")
    print(failure)