From fef7013afa97180a4e7eec511bd7594bd5bdf270 Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Thu, 4 Jan 2024 21:24:42 -0500 Subject: [PATCH] Preview of the DB2 connector. This change adds a connector to a DB2 database source. It uses DB2 SQL replication, leveraging staging tables in the source database. For instructions on how to put table in capture mode in DB2, so that changes can be sent to the staging tables, see https://debezium.io/documentation/reference/stable/connectors/db2.html#putting-tables-in-capture-mode. The connector periodically pulls new mutations from the staging tables and inserts them to the target database via the types.MultiAcceptor interface. --- .github/db2/Dockerfile | 31 + .github/db2/README.md | 8 + .github/db2/asncdc.c | 164 ++++ .github/db2/asncdc_UDF.sql | 20 + .github/db2/asncdcaddremove.sql | 196 +++++ .github/db2/asncdctables.sql | 482 +++++++++++ .github/db2/cdcsetup.sh | 21 + .github/db2/custom-init/cleanup_storage.sh | 17 + .github/db2/dbsetup.sh | 56 ++ .github/db2/inventory.sql | 77 ++ .github/db2/openshift_entrypoint.sh | 23 + .github/db2/startup-agent.sql | 4 + .github/db2/startup-cdc-demo.sql | 11 + .github/docker-compose.yml | 19 + .github/workflows/go-test-db2.yaml | 207 +++++ .github/workflows/go-tests.yaml | 2 +- .github/workflows/go.yaml | 13 + .gitignore | 6 + Makefile | 34 + go.mod | 16 + go.sum | 12 + internal/cmd/db2/db2.go | 39 + internal/cmd/db2/install.go | 161 ++++ internal/sinktest/all/integration.go | 4 + .../scripttest/testdata/logical_test_db2.ts | 56 ++ internal/source/db2/config.go | 137 +++ internal/source/db2/conn.go | 516 ++++++++++++ internal/source/db2/db2.go | 39 + internal/source/db2/driver.go | 22 + internal/source/db2/injector.go | 55 ++ internal/source/db2/integration_test.go | 782 ++++++++++++++++++ internal/source/db2/lsn.go | 114 +++ internal/source/db2/lsn_test.go | 150 ++++ internal/source/db2/metrics.go | 48 ++ internal/source/db2/operation.go | 36 + internal/source/db2/operation_string.go | 29 + internal/source/db2/provider.go | 107 +++ internal/source/db2/queries.go | 111 +++ internal/source/db2/wire_gen.go | 90 ++ main.go | 3 + 40 files changed, 3917 insertions(+), 1 deletion(-) create mode 100644 .github/db2/Dockerfile create mode 100644 .github/db2/README.md create mode 100644 .github/db2/asncdc.c create mode 100644 .github/db2/asncdc_UDF.sql create mode 100644 .github/db2/asncdcaddremove.sql create mode 100644 .github/db2/asncdctables.sql create mode 100755 .github/db2/cdcsetup.sh create mode 100644 .github/db2/custom-init/cleanup_storage.sh create mode 100755 .github/db2/dbsetup.sh create mode 100644 .github/db2/inventory.sql create mode 100755 .github/db2/openshift_entrypoint.sh create mode 100644 .github/db2/startup-agent.sql create mode 100644 .github/db2/startup-cdc-demo.sql create mode 100644 .github/workflows/go-test-db2.yaml create mode 100644 Makefile create mode 100644 internal/cmd/db2/db2.go create mode 100644 internal/cmd/db2/install.go create mode 100644 internal/sinktest/scripttest/testdata/logical_test_db2.ts create mode 100644 internal/source/db2/config.go create mode 100644 internal/source/db2/conn.go create mode 100644 internal/source/db2/db2.go create mode 100644 internal/source/db2/driver.go create mode 100644 internal/source/db2/injector.go create mode 100644 internal/source/db2/integration_test.go create mode 100644 internal/source/db2/lsn.go create mode 100644 internal/source/db2/lsn_test.go create mode 100644 internal/source/db2/metrics.go create mode 100644 internal/source/db2/operation.go create mode 100644 internal/source/db2/operation_string.go create mode 100644 internal/source/db2/provider.go create mode 100644 internal/source/db2/queries.go create mode 100644 internal/source/db2/wire_gen.go diff --git a/.github/db2/Dockerfile b/.github/db2/Dockerfile new file mode 100644 index 000000000..57988f08c --- /dev/null +++ b/.github/db2/Dockerfile @@ -0,0 +1,31 @@ +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +FROM icr.io/db2_community/db2 + +LABEL maintainer="Debezium Community" + +RUN mkdir -p /asncdctools/src + +ADD asncdc_UDF.sql /asncdctools/src +ADD asncdcaddremove.sql /asncdctools/src +ADD asncdctables.sql /asncdctools/src +ADD dbsetup.sh /asncdctools/src +ADD startup-agent.sql /asncdctools/src +ADD startup-cdc-demo.sql /asncdctools/src +ADD inventory.sql /asncdctools/src +ADD asncdc.c /asncdctools/src + +RUN mkdir /var/custom && \ + chmod -R 777 /asncdctools && \ + chmod -R 777 /var/custom + +ADD cdcsetup.sh /var/custom +ADD custom-init /var/custom-init + +RUN chmod -R 777 /var/custom-init + +ADD openshift_entrypoint.sh /var/db2_setup/lib + +RUN chmod 777 /var/custom/cdcsetup.sh && \ + chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh diff --git a/.github/db2/README.md b/.github/db2/README.md new file mode 100644 index 000000000..c80921dd9 --- /dev/null +++ b/.github/db2/README.md @@ -0,0 +1,8 @@ +# DB2 container + +This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables. + +Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. + +Original Code: +https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server diff --git a/.github/db2/asncdc.c b/.github/db2/asncdc.c new file mode 100644 index 000000000..4be00b960 --- /dev/null +++ b/.github/db2/asncdc.c @@ -0,0 +1,164 @@ +// Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +// https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server +#include +#include +#include +#include +#include +#include + +void SQL_API_FN asncdcservice( + SQLUDF_VARCHAR *asnCommand, /* input */ + SQLUDF_VARCHAR *asnService, + SQLUDF_CLOB *fileData, /* output */ + /* null indicators */ + SQLUDF_NULLIND *asnCommand_ind, /* input */ + SQLUDF_NULLIND *asnService_ind, + SQLUDF_NULLIND *fileData_ind, + SQLUDF_TRAIL_ARGS, + struct sqludf_dbinfo *dbinfo) +{ + + int fd; + char tmpFileName[] = "/tmp/fileXXXXXX"; + fd = mkstemp(tmpFileName); + + int strcheck = 0; + char cmdstring[256]; + + + char* szDb2path = getenv("HOME"); + + + + char str[20]; + int len = 0; + char c; + char *buffer = NULL; + FILE *pidfile; + + char dbname[129]; + memset(dbname, '\0', 129); + strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen); + dbname[dbinfo->dbnamelen] = '\0'; + + int pid; + if (strcmp(asnService, "asncdc") == 0) + { + strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName); + int callcheck; + callcheck = system(cmdstring); + pidfile = fopen(tmpFileName, "r"); + while ((c = fgetc(pidfile)) != EOF) + { + if (c == '\n') + { + break; + } + len++; + } + buffer = (char *)malloc(sizeof(char) * len); + fseek(pidfile, 0, SEEK_SET); + fread(buffer, sizeof(char), len, pidfile); + fclose(pidfile); + pidfile = fopen(tmpFileName, "w"); + if (strcmp(asnCommand, "start") == 0) + { + if (len == 0) // is not running + { + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname); + fprintf(pidfile, "start --> %s \n", cmdstring); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is already running"); + } + } + if ((strcmp(asnCommand, "prune") == 0) || + (strcmp(asnCommand, "reinit") == 0) || + (strcmp(asnCommand, "suspend") == 0) || + (strcmp(asnCommand, "resume") == 0) || + (strcmp(asnCommand, "status") == 0) || + (strcmp(asnCommand, "stop") == 0)) + { + if (len > 0) + { + //buffer[len] = '\0'; + //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer); + //fprintf(pidfile, "stop --> %s", cmdstring); + //callcheck = system(cmdstring); + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName); + //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is not running"); + } + } + + fclose(pidfile); + } + /* system(cmdstring); */ + + int rc = 0; + long fileSize = 0; + size_t readCnt = 0; + FILE *f = NULL; + + f = fopen(tmpFileName, "r"); + if (!f) + { + strcpy(SQLUDF_MSGTX, "Could not open file "); + strncat(SQLUDF_MSGTX, tmpFileName, + SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1); + strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN); + return; + } + + rc = fseek(f, 0, SEEK_END); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN); + return; + } + + /* verify the file size */ + fileSize = ftell(f); + if (fileSize > fileData->length) + { + strcpy(SQLUDF_MSGTX, "File too large"); + strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN); + return; + } + + /* go to the beginning and read the entire file */ + rc = fseek(f, 0, 0); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN); + return; + } + + readCnt = fread(fileData->data, 1, fileSize, f); + if (readCnt != fileSize) + { + /* raise a warning that something weird is going on */ + sprintf(SQLUDF_MSGTX, "Could not read entire file " + "(%d vs %d)", + readCnt, fileSize); + strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN); + *fileData_ind = -1; + } + else + { + fileData->length = readCnt; + *fileData_ind = 0; + } + // remove temorary file + rc = remove(tmpFileName); + //fclose(pFile); +} diff --git a/.github/db2/asncdc_UDF.sql b/.github/db2/asncdc_UDF.sql new file mode 100644 index 000000000..06d517700 --- /dev/null +++ b/.github/db2/asncdc_UDF.sql @@ -0,0 +1,20 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +DROP SPECIFIC FUNCTION ASNCDC.asncdcservice; + +CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8)) + RETURNS CLOB(100K) + SPECIFIC asncdcservice + EXTERNAL NAME 'asncdc!asncdcservice' + LANGUAGE C + PARAMETER STYLE SQL + DBINFO + DETERMINISTIC + NOT FENCED + RETURNS NULL ON NULL INPUT + NO SQL + NO EXTERNAL ACTION + NO SCRATCHPAD + ALLOW PARALLEL + NO FINAL CALL; \ No newline at end of file diff --git a/.github/db2/asncdcaddremove.sql b/.github/db2/asncdcaddremove.sql new file mode 100644 index 000000000..eef38eef9 --- /dev/null +++ b/.github/db2/asncdcaddremove.sql @@ -0,0 +1,196 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +-- +-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE() +-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table +-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table +-- + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE stmtSQL VARCHAR(2048); + +DECLARE SQLCODE INT; +DECLARE SQLSTATE CHAR(5); +DECLARE RC_SQLCODE INT DEFAULT 0; +DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000'; + +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE; + +-- delete ASN.IBMSNAP_PRUNCTL entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_Register entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- drop CD Table / source +SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename ; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUBS_COLS entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_COLVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2'; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_TABVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE SQLSTATE CHAR(5); + +DECLARE stmtSQL VARCHAR(2048); + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' AS ( SELECT ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' || + ' CAST ('''' AS CHAR(1)) ' || + ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' || + tableschema || '_' || tablename || + ' ON ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' VOLATILE CARDINALITY'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' || + 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' || + 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' || + 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' || + 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' || + 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' || + 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' || + 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''N'', ' || + '1, ' || + '''Y'', ' || + '''Y'', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + 'null, ' || + 'null, ' || + '0, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + '''0801'', ' || + 'null, ' || + 'null, ' || + '''0'', ' || + '''Y'', ' || + '''N'', ' || + '''Y'', ' || + '''NNNN'', ' || + '''Y'', ' || + '''A'',' || + 'null ) '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || + 'TARGET_SERVER, ' || + 'TARGET_OWNER, ' || + 'TARGET_TABLE, ' || + 'SYNCHTIME, ' || + 'SYNCHPOINT, ' || + 'SOURCE_OWNER, ' || + 'SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, ' || + 'APPLY_QUAL, ' || + 'SET_NAME, ' || + 'CNTL_SERVER , ' || + 'TARGET_STRUCTURE , ' || + 'CNTL_ALIAS , ' || + 'PHYS_CHANGE_OWNER , ' || + 'PHYS_CHANGE_TABLE , ' || + 'MAP_ID ' || + ') VALUES ( ' || + '''KAFKA'', ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + 'NULL, ' || + 'NULL, ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''KAFKAQUAL'', ' || + '''SET001'', ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '8, ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || + ' )'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; diff --git a/.github/db2/asncdctables.sql b/.github/db2/asncdctables.sql new file mode 100644 index 000000000..a86953649 --- /dev/null +++ b/.github/db2/asncdctables.sql @@ -0,0 +1,482 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150 + +CREATE TABLE ASNCDC.IBMQREP_COLVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +POSITION SMALLINT NOT NULL, +NAME VARCHAR(128) NOT NULL, +TYPE SMALLINT NOT NULL, +LENGTH INTEGER NOT NULL, +NULLS CHAR( 1) NOT NULL, +DEFAULT VARCHAR(1536), +CODEPAGE INTEGER, +SCALE INTEGER, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX +ON ASNCDC.IBMQREP_COLVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +POSITION ASC); + +CREATE INDEX ASNCDC.IX2COLVERSION +ON ASNCDC.IBMQREP_COLVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE TABLE ASNCDC.IBMQREP_TABVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +VERSION INTEGER NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_NAME VARCHAR(128) NOT NULL, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX +ON ASNCDC.IBMQREP_TABVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +VERSION ASC); + +CREATE INDEX ASNCDC.IX2TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE INDEX ASNCDC.IX3TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +SOURCE_OWNER ASC, +SOURCE_NAME ASC); + +CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL( +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021') +ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES ( +'1021'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME TIMESTAMP NOT NULL, +RESTART_TIME TIMESTAMP NOT NULL, +CURRENT_MEMORY INT NOT NULL, +CD_ROWS_INSERTED INT NOT NULL, +RECAP_ROWS_SKIPPED INT NOT NULL, +TRIGR_ROWS_SKIPPED INT NOT NULL, +CHG_ROWS_SKIPPED INT NOT NULL, +TRANS_PROCESSED INT NOT NULL, +TRANS_SPILLED INT NOT NULL, +MAX_TRANS_SIZE INT NOT NULL, +LOCKING_RETRIES INT NOT NULL, +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +LOGREADLIMIT INT NOT NULL, +CAPTURE_IDLE INT NOT NULL, +SYNCHTIME TIMESTAMP NOT NULL, +CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT , +LAST_EOL_TIME TIMESTAMP, +RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +LOGREAD_API_TIME INT, +NUM_LOGREAD_CALLS INT, +NUM_END_OF_LOGS INT, +LOGRDR_SLEEPTIME INT, +NUM_LOGREAD_F_CALLS INT, +TRANS_QUEUED INT, +NUM_WARNTXS INT, +NUM_WARNLOGAPI INT) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX +ON ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT INT, +LAG_LIMIT INT, +COMMIT_INTERVAL INT, +PRUNE_INTERVAL INT, +TRACE_LIMIT INT, +MONITOR_LIMIT INT, +MONITOR_INTERVAL INT, +MEMORY_LIMIT SMALLINT, +REMOTE_SRC_SERVER CHAR( 18), +AUTOPRUNE CHAR( 1), +TERM CHAR( 1), +AUTOSTOP CHAR( 1), +LOGREUSE CHAR( 1), +LOGSTDOUT CHAR( 1), +SLEEP_INTERVAL SMALLINT, +CAPTURE_PATH VARCHAR(1040), +STARTMODE VARCHAR( 10), +LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256, +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021', +COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021') + ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT, +LAG_LIMIT, +COMMIT_INTERVAL, +PRUNE_INTERVAL, +TRACE_LIMIT, +MONITOR_LIMIT, +MONITOR_INTERVAL, +MEMORY_LIMIT, +SLEEP_INTERVAL, +AUTOPRUNE, +TERM, +AUTOSTOP, +LOGREUSE, +LOGSTDOUT, +CAPTURE_PATH, +STARTMODE, +COMPATIBILITY) +VALUES ( +10080, +10080, +30, +300, +10080, +10080, +300, +32, +5, +'Y', +'Y', +'N', +'N', +'N', +NULL, +'WARMSI', +'1021' +); + +CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS ( + CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL + ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX + ON ASNCDC.IBMSNAP_CAPSCHEMAS + (CAP_SCHEMA_NAME ASC); + +INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES ( +'ASNCDC'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE( +OPERATION CHAR( 8) NOT NULL, +TRACE_TIME TIMESTAMP NOT NULL, +DESCRIPTION VARCHAR(1024) NOT NULL) + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX +ON ASNCDC.IBMSNAP_CAPTRACE( +TRACE_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL( +TARGET_SERVER CHAR(18) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +CNTL_SERVER CHAR( 18) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +CNTL_ALIAS CHAR( 8), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +MAP_ID VARCHAR(10) NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX +ON ASNCDC.IBMSNAP_PRUNCNTL( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC, +TARGET_TABLE ASC, +TARGET_OWNER ASC); + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1 +ON ASNCDC.IBMSNAP_PRUNCNTL( +MAP_ID ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2 +ON ASNCDC.IBMSNAP_PRUNCNTL( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3 +ON ASNCDC.IBMSNAP_PRUNCNTL( +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK( +DUMMY CHAR( 1)) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER CHAR( 18) NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX +ON ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER ASC, +APPLY_QUAL ASC, +SET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +GLOBAL_RECORD CHAR( 1) NOT NULL, +SOURCE_STRUCTURE SMALLINT NOT NULL, +SOURCE_CONDENSED CHAR( 1) NOT NULL, +SOURCE_COMPLETE CHAR( 1) NOT NULL, +CD_OWNER VARCHAR(128), +CD_TABLE VARCHAR(128), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +DISABLE_REFRESH SMALLINT NOT NULL, +CCD_OWNER VARCHAR(128), +CCD_TABLE VARCHAR(128), +CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CCD_CONDENSED CHAR( 1), +CCD_COMPLETE CHAR( 1), +ARCH_LEVEL CHAR( 4) NOT NULL, +DESCRIPTION CHAR(254), +BEFORE_IMG_PREFIX VARCHAR( 4), +CONFLICT_LEVEL CHAR( 1), +CHG_UPD_TO_DEL_INS CHAR( 1), +CHGONLY CHAR( 1), +RECAPTURE CHAR( 1), +OPTION_FLAGS CHAR( 4) NOT NULL, +STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y', +STATE CHAR( 1) WITH DEFAULT 'I', +STATE_INFO CHAR( 8)) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX +ON ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1 +ON ASNCDC.IBMSNAP_REGISTER( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2 +ON ASNCDC.IBMSNAP_REGISTER( +GLOBAL_RECORD ASC); + +ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_RESTART( +MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +MAX_COMMIT_TIME TIMESTAMP NOT NULL, +MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +CURR_COMMIT_TIME TIMESTAMP NOT NULL, +CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT , +SIGNAL_TYPE VARCHAR( 30) NOT NULL, +SIGNAL_SUBTYPE VARCHAR( 30), +SIGNAL_INPUT_IN VARCHAR(500), +SIGNAL_STATE CHAR( 1) NOT NULL, +SIGNAL_LSN VARCHAR( 16) FOR BIT DATA) +DATA CAPTURE CHANGES + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_SIGNALX +ON ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +COL_TYPE CHAR( 1) NOT NULL, +TARGET_NAME VARCHAR(128) NOT NULL, +IS_KEY CHAR( 1) NOT NULL, +COLNO SMALLINT NOT NULL, +EXPRESSION VARCHAR(1024) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX +ON ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC, +TARGET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY; + +--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX +--ON ASNCDC.IBMSNAP_SUBS_EVENT( +--EVENT_NAME ASC, +--EVENT_TIME ASC); + + +--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +TARGET_CONDENSED CHAR( 1) NOT NULL, +TARGET_COMPLETE CHAR( 1) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +PREDICATES VARCHAR(1024), +MEMBER_STATE CHAR( 1), +TARGET_KEY_CHG CHAR( 1) NOT NULL, +UOW_CD_PREDICATES VARCHAR(1024), +JOIN_UOW_CD CHAR( 1), +LOADX_TYPE SMALLINT, +LOADX_SRC_N_OWNER VARCHAR( 128), +LOADX_SRC_N_TABLE VARCHAR(128)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX +ON ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SET_TYPE CHAR( 1) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +ACTIVATE SMALLINT NOT NULL, +SOURCE_SERVER CHAR( 18) NOT NULL, +SOURCE_ALIAS CHAR( 8), +TARGET_SERVER CHAR( 18) NOT NULL, +TARGET_ALIAS CHAR( 8), +STATUS SMALLINT NOT NULL, +LASTRUN TIMESTAMP NOT NULL, +REFRESH_TYPE CHAR( 1) NOT NULL, +SLEEP_MINUTES INT, +EVENT_NAME CHAR( 18), +LASTSUCCESS TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CAPTURE_SCHEMA VARCHAR(128) NOT NULL, +TGT_CAPTURE_SCHEMA VARCHAR(128), +FEDERATED_SRC_SRVR VARCHAR( 18), +FEDERATED_TGT_SRVR VARCHAR( 18), +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +OPTION_FLAGS CHAR( 4) NOT NULL, +COMMIT_COUNT SMALLINT, +MAX_SYNCH_MINUTES SMALLINT, +AUX_STMTS SMALLINT NOT NULL, +ARCH_LEVEL CHAR( 4) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX +ON ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +BEFORE_OR_AFTER CHAR( 1) NOT NULL, +STMT_NUMBER SMALLINT NOT NULL, +EI_OR_CALL CHAR( 1) NOT NULL, +SQL_STMT VARCHAR(1024), +ACCEPT_SQLSTATES VARCHAR( 50)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX +ON ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +BEFORE_OR_AFTER ASC, +STMT_NUMBER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_UOW( +IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL, +IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +IBMSNAP_LOGMARKER TIMESTAMP NOT NULL, +IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL, +IBMSNAP_AUTHID VARCHAR(128) NOT NULL, +IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT , +IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX +ON ASNCDC.IBMSNAP_UOW( +IBMSNAP_COMMITSEQ ASC, +IBMSNAP_LOGMARKER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPENQ ( + LOCK_NAME CHAR(9 OCTETS) + ) + ORGANIZE BY ROW + DATA CAPTURE NONE + COMPRESS NO; diff --git a/.github/db2/cdcsetup.sh b/.github/db2/cdcsetup.sh new file mode 100755 index 000000000..f7c27fb5f --- /dev/null +++ b/.github/db2/cdcsetup.sh @@ -0,0 +1,21 @@ +#/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +if [ ! -f /asncdctools/src/asncdc.nlk ]; then +rc=1 +echo "waiting for db2inst1 exists ." +while [ "$rc" -ne 0 ] +do + sleep 5 + id db2inst1 + rc=$? + echo '.' +done + +su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1 +fi +touch /asncdctools/src/asncdc.nlk + +echo "CDC setup completed." diff --git a/.github/db2/custom-init/cleanup_storage.sh b/.github/db2/custom-init/cleanup_storage.sh new file mode 100644 index 000000000..8624e0d10 --- /dev/null +++ b/.github/db2/custom-init/cleanup_storage.sh @@ -0,0 +1,17 @@ +#!/bin/bash + + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + + +################################################################################ +# Wipes the storage directory. +# Note that this essentially makes the database non-persistent when pod is deleted +################################################################################ + +echo "Inspecting database directory" +ls $STORAGE_DIR + +echo "Wiping database directory" +rm -rf $STORAGE_DIR/* diff --git a/.github/db2/dbsetup.sh b/.github/db2/dbsetup.sh new file mode 100755 index 000000000..3cbd81348 --- /dev/null +++ b/.github/db2/dbsetup.sh @@ -0,0 +1,56 @@ +#/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + + +echo "Compile ASN tool ..." +cd /asncdctools/src +/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc + +DBNAME=$1 +DB2DIR=/opt/ibm/db2/V11.5 +rc=1 +echo "Waiting for DB2 start ( $DBNAME ) ." +while [ "$rc" -ne 0 ] +do + sleep 5 + db2 connect to $DBNAME + rc=$? + echo '.' +done + +# enable metacatalog read via JDBC +cd $HOME/sqllib/bnd +db2 bind db2schema.bnd blocking all grant public sqlerror continue + +# do a backup and restart the db +db2 backup db $DBNAME to /dev/null +db2 restart db $DBNAME + +db2 connect to $DBNAME + +cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function +chmod 777 /database/config/db2inst1/sqllib/function + +# add UDF / start stop asncap +db2 -tvmf /asncdctools/src/asncdc_UDF.sql + +# create asntables +db2 -tvmf /asncdctools/src/asncdctables.sql + +# add UDF / add remove asntables + +db2 -tvmf /asncdctools/src/asncdcaddremove.sql + + + + +# create sample table and datat +db2 -tvmf /asncdctools/src/inventory.sql +db2 -tvmf /asncdctools/src/startup-agent.sql +sleep 10 +db2 -tvmf /asncdctools/src/startup-cdc-demo.sql + + +echo "done" diff --git a/.github/db2/inventory.sql b/.github/db2/inventory.sql new file mode 100644 index 000000000..1d441d590 --- /dev/null +++ b/.github/db2/inventory.sql @@ -0,0 +1,77 @@ + +-- Create and populate our products using a single insert with many rows +CREATE TABLE DB2INST1.PRODUCTS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','16oz carpenter''s hammer',1.0); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('rocks','box of assorted rocks',5.3); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('spare tire','24 inch spare tire',22.2); + +CREATE TABLE DB2INST1.PRODUCTS_ON_HAND ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (101,3); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (102,8); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (103,18); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (104,4); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (105,5); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (106,0); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (107,44); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (108,2); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (109,5); + +CREATE TABLE DB2INST1.CUSTOMERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 1001, INCREMENT BY 1) PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); + +CREATE TABLE DB2INST1.ORDERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 10001, INCREMENT BY 1) PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES DB2INST1.CUSTOMERS(id), + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-16', 1001, 1, 102); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-17', 1002, 2, 105); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-19', 1002, 2, 106); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-21', 1003, 1, 107); diff --git a/.github/db2/openshift_entrypoint.sh b/.github/db2/openshift_entrypoint.sh new file mode 100755 index 000000000..38974dea6 --- /dev/null +++ b/.github/db2/openshift_entrypoint.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +################################################################################ +# Runs custom init scripts followed by the original entry point +############################################################################### + +source ${SETUPDIR?}/include/db2_constants +source ${SETUPDIR?}/include/db2_common_functions + +if [[ -d /var/custom-init ]]; then + echo "(*) Running user-provided init scripts ... " + chmod -R 777 /var/custom-init + for script in `ls /var/custom-init`; do + echo "(*) Running $script ..." + /var/custom-init/$script + done +fi + +echo "Running original entry point" +/var/db2_setup/lib/setup_db2_instance.sh diff --git a/.github/db2/startup-agent.sql b/.github/db2/startup-agent.sql new file mode 100644 index 000000000..d72c09b90 --- /dev/null +++ b/.github/db2/startup-agent.sql @@ -0,0 +1,4 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +VALUES ASNCDC.ASNCDCSERVICES('start','asncdc'); \ No newline at end of file diff --git a/.github/db2/startup-cdc-demo.sql b/.github/db2/startup-cdc-demo.sql new file mode 100644 index 000000000..4c7c5c8ae --- /dev/null +++ b/.github/db2/startup-cdc-demo.sql @@ -0,0 +1,11 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +VALUES ASNCDC.ASNCDCSERVICES('status','asncdc'); + +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS_ON_HAND' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'CUSTOMERS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'ORDERS' ); + +VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc'); \ No newline at end of file diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index 701e7e580..8bd85c031 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -73,6 +73,25 @@ services: entrypoint: /cockroach/cockroach command: start-single-node --insecure --store type=mem,size=2G --listen-addr :5401 --http-addr :8082 + db2-v11.5.9: + # Using a DB2 image with additional SQL replication utilities. + # To build the image use .github/db2 + # Original is at icr.io/db2_community/db2 + build: ./db2 + privileged: True + ports: + - 50000:50000 + environment: + - PERSISTENT_HOME=false + - LICENSE=accept + - DBNAME=TESTDB + - DB2INST1_PASSWORD=SoupOrSecret + healthcheck: + # The setup scripts creates /asncdctools/src/asncdc.nlk when CDC is up. + # TODO (silvano) improve start up time + test: bash -c '[ -f /asncdctools/src/asncdc.nlk ] ' + interval: 1s + retries: 600 mysql-v5.7: image: mysql:5.7 platform: linux/x86_64 diff --git a/.github/workflows/go-test-db2.yaml b/.github/workflows/go-test-db2.yaml new file mode 100644 index 000000000..92874a906 --- /dev/null +++ b/.github/workflows/go-test-db2.yaml @@ -0,0 +1,207 @@ +# Copyright 2023 The Cockroach Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +name: DB2 Tests +permissions: + contents: read + packages: read +on: + workflow_call: + workflow_dispatch: +# on: +# push: +# branches: [ sr8_db2 ] +jobs: + # Static code-quality checks. + code-quality: + name: Code Quality + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Ensure binary starts + if: ${{ !cancelled() }} + run: go run . help + + # Integration matrix tests for all supported CRDB and source DBs. + tests: + name: Integration Tests + runs-on: ${{ matrix.runs-on || 'ubuntu-latest-8-core' }} + timeout-minutes: 20 + strategy: + fail-fast: false + # Refer to the CRDB support policy when determining how many + # older releases to support. + # https://www.cockroachlabs.com/docs/releases/release-support-policy.html + # + # This matrix is explicit, since we have a few axes (target vs + # integration) that can't be expressed with the automatic + # cross-product behavior provided by the matrix operator. + matrix: + include: + - cockroachdb: v23.1 + integration: db2-v11.5.9 + + env: + COVER_OUT: coverage-${{ strategy.job-index }}.out + DOCKER_LOGS_OUT: docker-${{ strategy.job-index }}.log + FIRESTORE_EMULATOR_HOST: 127.0.0.1:8181 + JUNIT_OUT: junit-${{ strategy.job-index }}.xml + TEST_OUT: go-test-${{ strategy.job-index }}.json + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + # Ensure we can grab any private images we need for testing. + - name: Log in to GitHub Package Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Start Containers + working-directory: .github + run: > + docker compose up -d --wait + cockroachdb-${{ matrix.cockroachdb }} + ${{ matrix.integration }} + ${{ matrix.source }} + ${{ matrix.target }} + + # The go test json output will be written into a pipeline to + # create a JUnit.xml file. The test reports are aggregated later + # on to produce a nicer summary of the test output in the GitHub + # Actions UI. + # + # Inspired by + # https://www.cloudwithchris.com/blog/githubactions-testsummary-go/ + - name: Go Tests + env: + COCKROACH_DEV_LICENSE: ${{ secrets.COCKROACH_DEV_LICENSE }} + CDC_INTEGRATION: ${{ matrix.integration }} + TEST_SOURCE_CONNECT: ${{ matrix.sourceConn }} + TEST_TARGET_CONNECT: ${{ matrix.targetConn }} + IBM_DRIVER: /tmp/drivers/clidriver + CGO_CFLAGS: -I${IBM_DRIVER}/include + CGO_LDFLAGS: -L${IBM_DRIVER}/lib + run: > + set -o pipefail; + make testdb2 2>&1 | + go run github.com/jstemmer/go-junit-report/v2 + -iocopy + -out ${{ env.JUNIT_OUT }} + -p cockroachdb=${{ matrix.cockroachdb }} + -p integration=${{ matrix.integration }} + -package-name ${{ matrix.cockroachdb }}-${{ matrix.integration }} | + tee ${{ env.TEST_OUT }} + + - name: Upload coverage + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ${{ env.COVER_OUT }} + + # Capture container logs in case they're needed for diagnostics. + - name: Docker container logs + if: always() + working-directory: .github + run: docker-compose logs --no-color > ${{ env.DOCKER_LOGS_OUT }} + + # Upload all test reports to a common artifact name, to make them + # available to the summarization step. The go test json is + # uploaded as a developer convenience. + - name: Stash test logs + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-reports + path: | + ${{ env.COVER_OUT }} + ${{ env.DOCKER_LOGS_OUT }} + ${{ env.JUNIT_OUT }} + ${{ env.TEST_OUT }} + retention-days: 7 + + # Aggregate the results of multiple jobs within this workflow into a + # single status object that we can use for branch protection. + # + # https://docs.github.com/en/rest/commits/statuses + status: + name: Create status objects + runs-on: ubuntu-latest + permissions: + statuses: write + needs: # Update failure case below + - code-quality + - tests + if: ${{ always() }} + env: + CONTEXT: Workflow Golang + GH_TOKEN: ${{ github.token }} + MERGE_SHA: ${{ github.event.merge_group.head_sha }} + PR_SHA: ${{ github.event.pull_request.head.sha }} + STATE: success + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + steps: + - name: Failure + if: ${{ needs.code-quality.result != 'success' || needs.tests.result != 'success' }} + run: echo "STATE=failure" >> $GITHUB_ENV + - name: Report + run: | + set -eo pipefail + + if [ ! -z "$PR_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$PR_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + if [ ! -z "$MERGE_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$MERGE_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + # This job downloads the test log files generated in the integration + # job and summarizes them into the GitHub Actions UI. + summarize-tests: + name: Test summaries + runs-on: ubuntu-latest + needs: tests + if: ${{ always() }} + steps: + - name: Download reports + uses: actions/download-artifact@v3 + with: + name: integration-reports + - name: Summarize + uses: test-summary/action@v2 + with: + paths: junit-*.xml diff --git a/.github/workflows/go-tests.yaml b/.github/workflows/go-tests.yaml index 5f174ce74..2e326e1cb 100644 --- a/.github/workflows/go-tests.yaml +++ b/.github/workflows/go-tests.yaml @@ -43,7 +43,7 @@ jobs: - name: Copyright headers if: ${{ !cancelled() }} - run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore '**/testdata/**/*.sql' . + run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore ".github/db2/**" -ignore '**/testdata/**/*.sql' . - name: Lint if: ${{ !cancelled() }} diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index ea98c18ad..637cbac50 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -82,6 +82,19 @@ jobs: statuses: write secrets: inherit + go-db2: + uses: ./.github/workflows/go-test-db2.yaml + needs: + - go-build-cache + # We use the merge queue prior to pushing to a branch, so there's no + # reason to repeat tests that just ran. + if: ${{ github.event_name != 'push' }} + permissions: + contents: read + packages: read + statuses: write + secrets: inherit + go-wiki: uses: ./.github/workflows/go-wiki.yaml needs: diff --git a/.gitignore b/.gitignore index 48edd5f21..d9badf399 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,12 @@ cdc-sink # Dependency directories (remove the comment below to include it) # vendor/ +# Temporary directory +tmp/ + +# Downloaded drivers +drivers/ + # CockroachDB data directory cockroach-data/ goroutine_dump/ diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..80114b31c --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ + +PWD = $(shell pwd) +COVER_OUT ?= cover.out +IBM_DRIVER = ${PWD}/drivers/clidriver +export CGO_CFLAGS=-I${IBM_DRIVER}/include +export CGO_LDFLAGS=-L${IBM_DRIVER}/lib +export DYLD_LIBRARY_PATH=${IBM_DRIVER}/lib +export LD_LIBRARY_PATH=${IBM_DRIVER}/lib + +.PHONY: all db2 clean realclean testdb2 + +all: cdc-sink + +cdc-sink: + go build + +drivers/clidriver: + go run . db2install --dest drivers + +db2: drivers/clidriver + go build -tags db2 + +clean: + rm cdc-sink + +realclean: clean + rm -rf drivers + +testdb2: drivers/clidriver + go test -tags db2 -count 1 -coverpkg=./internal/source/db2 \ + -covermode=atomic \ + -coverprofile=${COVER_OUT} \ + -race \ + -v ./internal/source/db2 diff --git a/go.mod b/go.mod index dca2823ef..fb8d96939 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,19 @@ require ( require ( github.com/google/go-cmp v0.6.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/codeclysm/extract/v3 v3.1.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/h2non/filetype v1.1.3 // indirect + github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 // indirect + github.com/klauspost/compress v1.15.13 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/ulikunitz/xz v0.5.11 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect ) require ( @@ -55,6 +68,9 @@ require ( github.com/google/licenseclassifier v0.0.0-20210722185704-3043a050f148 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/google/subcommands v1.2.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/ibmdb/go_ibm_db v0.4.5 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index c742acb5e..be8300461 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/cockroachdb/gostdlib v1.19.0 h1:cSISxkVnTlWhTkyple/T6NXzOi5659FkhxvUg github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTymPZUKMfURjBtY= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8 h1:Hli+oX84dKq44sLVCcsGKqifm5Lg9J8VoJ2P3h9iPdI= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8/go.mod h1:75wnig8+TF6vst9hChkpcFO7YrRLddouJ5is8uqpfv0= +github.com/codeclysm/extract/v3 v3.1.1 h1:iHZtdEAwSTqPrd+1n4jfhr1qBhUWtHlMTjT90+fJVXg= +github.com/codeclysm/extract/v3 v3.1.1/go.mod h1:ZJi80UG2JtfHqJI+lgJSCACttZi++dHxfWuPaMhlOfQ= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= @@ -266,11 +268,15 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg= +github.com/h2non/filetype v1.1.3/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/ibmdb/go_ibm_db v0.4.5 h1:a0qKWbA5shCRo5HRRnAuENwhjg6AkgfPNr9xx0IZ160= +github.com/ibmdb/go_ibm_db v0.4.5/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP49UTmEvg= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= @@ -299,9 +305,13 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report/v2 v2.1.0 h1:X3+hPYlSczH9IMIpSC9CQSZA0L+BipYafciZUWHEmsc= github.com/jstemmer/go-junit-report/v2 v2.1.0/go.mod h1:mgHVr7VUo5Tn8OLVr1cKnLuEy0M92wdRntM99h7RkgQ= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 h1:rhqTjzJlm7EbkELJDKMTU7udov+Se0xZkWmugr6zGok= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v2wtGp9Gmz1Ze3eVRAWJMLokvN3QjdzCHLY= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0= +github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -398,6 +408,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= +github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/cmd/db2/db2.go b/internal/cmd/db2/db2.go new file mode 100644 index 000000000..ca9bf7eeb --- /dev/null +++ b/internal/cmd/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains a command to perform logical replication +// from an IBM DB2 server. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/db2" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/spf13/cobra" +) + +// Command returns the pglogical subcommand. +func Command() *cobra.Command { + cfg := &db2.Config{} + return stdlogical.New(&stdlogical.Template{ + Config: cfg, + Short: "start a DB2 replication feed", + Start: func(ctx *stopper.Context, cmd *cobra.Command) (any, error) { + return db2.Start(ctx, cfg) + }, + Use: "db2logical", + }) +} diff --git a/internal/cmd/db2/install.go b/internal/cmd/db2/install.go new file mode 100644 index 000000000..f3fba1b38 --- /dev/null +++ b/internal/cmd/db2/install.go @@ -0,0 +1,161 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/codeclysm/extract/v3" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +const ( + defaultURL = "https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/" + defaultDest = "drivers" +) + +// Install downloads and installs the IBM DB2 ODBC driver. +// Because of license restrictions, the driver cannot be under +// source control. It needs to be downloaded when used. +func Install() *cobra.Command { + var downloadURL, dest string + var dryrun bool + cmd := &cobra.Command{ + Args: cobra.NoArgs, + Short: "installs the db2 driver", + Use: "db2install", + // We are hiding it, since it's mainly used for testing. + Hidden: true, + RunE: func(cmd *cobra.Command, args []string) error { + driverPath := path.Join(dest, "clidriver") + _, err := os.Stat(driverPath) + _, includeErr := os.Stat(path.Join(driverPath, "include")) + _, libErr := os.Stat(path.Join(driverPath, "lib")) + if os.IsNotExist(err) || os.IsNotExist(includeErr) || os.IsNotExist(libErr) { + pkgName, err := install(cmd.Context(), dest, downloadURL, dryrun) + if err != nil { + return err + } + if pkgName != "" { + fmt.Printf("Installed %s into %s.\n", pkgName, dest) + } + return nil + } + if err != nil { + return err + } + if includeErr != nil { + return includeErr + } + if libErr != nil { + return libErr + } + fmt.Println("Driver already installed.") + return nil + }, + } + cmd.Flags().BoolVar(&dryrun, "dryrun", false, + "show the instructions to install the package manually") + cmd.Flags().StringVar(&dest, "dest", defaultDest, + "destination dir") + cmd.Flags().StringVar(&downloadURL, "url", defaultURL, + "url to download the driver") + return cmd +} + +// download a package to specified temp destination. +func download(tmpFile, baseURL, pkg string) (downloadError error) { + out, err := os.Create(tmpFile) + if err != nil { + return err + } + defer func() { + if err := out.Close(); downloadError != nil && err != nil { + downloadError = err + } + }() + pkgURL, err := url.JoinPath(baseURL, pkg) + if err != nil { + return err + } + resp, err := http.Get(pkgURL) + if err != nil { + return err + } + defer resp.Body.Close() + _, downloadError = io.Copy(out, resp.Body) + return +} + +// install the IBM DB2 driver for the runtime platform into +// the specified directory +func install(ctx context.Context, target, baseURL string, dryrun bool) (string, error) { + var pkgName string + switch runtime.GOOS { + case "darwin": + pkgName = "macos64_odbc_cli.tar.gz" + case "linux": + switch runtime.GOARCH { + case "amd64": + pkgName = "linuxx64_odbc_cli.tar.gz" + default: + return "", errors.Errorf("unknown arch %q", runtime.GOARCH) + } + default: + return "", errors.Errorf("unknown OS %q", runtime.GOOS) + } + if dryrun { + fmt.Printf("Download %s%s in %s.\n", baseURL, pkgName, target) + return "", nil + } + pkg := filepath.Join(target, pkgName) + _, err := os.Stat(pkg) + if os.IsNotExist(err) { + fmt.Printf("Installing %s%s in %s.\n", baseURL, pkgName, target) + tmpFile := path.Join(os.TempDir(), pkgName) + defer os.Remove(tmpFile) + err := download(tmpFile, baseURL, pkgName) + if err != nil { + return "", err + } + fmt.Println("Download successful.") + pkg = tmpFile + } else { + fmt.Printf("Found %s in %s\n", pkgName, target) + } + f, err := os.Open(pkg) + if err != nil { + return "", err + } + if strings.HasSuffix(pkg, ".zip") { + return pkgName, extract.Zip(ctx, f, target, nil) + } + if strings.HasSuffix(pkg, ".gz") { + return pkgName, extract.Gz(ctx, f, target, nil) + } + return "", errors.Errorf("unknown file type %s", pkgName) +} diff --git a/internal/sinktest/all/integration.go b/internal/sinktest/all/integration.go index d05d07a0b..110512261 100644 --- a/internal/sinktest/all/integration.go +++ b/internal/sinktest/all/integration.go @@ -29,6 +29,10 @@ const ( // integration tests for some databases. We expect this to be of the // format "database-v123". IntegrationEnvName = "CDC_INTEGRATION" + // DB2Name must be kept in alignment with the + // .github/docker-compose.yml file and the integration matrix + // variable in workflows/tests.yaml. + DB2Name = "db2" // MySQLName must be kept in alignment with the // .github/docker-compose.yml file and the integration matrix // variable in workflows/tests.yaml. diff --git a/internal/sinktest/scripttest/testdata/logical_test_db2.ts b/internal/sinktest/scripttest/testdata/logical_test_db2.ts new file mode 100644 index 000000000..781eba495 --- /dev/null +++ b/internal/sinktest/scripttest/testdata/logical_test_db2.ts @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; +import {Document, Table} from "cdc-sink@v1"; + +// The sentinel name will be replaced by the test rig. It would normally be +// "my_db.public" or "my_db" depending on the target product. +api.configureSource("{{ SCHEMA }}", { + dispatch: (doc: Document, meta: Document): Record => { + console.trace(JSON.stringify(doc), JSON.stringify(meta)); + let ret: Record = {}; + ret[meta.table] = [ + { + PK: doc.PK, + OK: doc.OK, + ignored: 'by_configuration_below', + v_dispatched: doc.V, // Rename the property + } + ]; + return ret + } +}) + +// We introduce an unknown column in the dispatch function above. +// We'll add an extra configuration here to ignore it. +// The sentinel name would be replaced by "my_table". +api.configureTable("{{ TABLE }}", { + map: (doc: Document): Document => { + console.trace("map", JSON.stringify(doc)); + if (doc.v_dispatched === undefined) { + throw "did not find expected property"; + } + doc.v_mapped = doc.v_dispatched; + delete doc.v_dispatched; // Avoid schema-drift error due to unknown column. + return doc; + }, + ignore: { + "ignored": true, + } +}) diff --git a/internal/source/db2/config.go b/internal/source/db2/config.go new file mode 100644 index 000000000..05e4f1078 --- /dev/null +++ b/internal/source/db2/config.go @@ -0,0 +1,137 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "net/url" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +// EagerConfig is a hack to get Wire to move userscript evaluation to +// the beginning of the injector. This allows CLI flags to be set by the +// script. +type EagerConfig Config + +// Config contains the configuration necessary for creating a +// replication connection to a DB2 server. SourceConn is mandatory. +type Config struct { + DLQ dlq.Config + Script script.Config + Sequencer sequencer.Config + Staging sinkprod.StagingConfig + Target sinkprod.TargetConfig + + // InitialLsn is the Log Sequence Number used to start the replication + InitialLSN string + // PollingInterval defines how often we should check for new data, if idle. + PollingInterval time.Duration + // Connection string for the source db. Mandatory. + SourceConn string + // The schema we are replicating. If not provided, all the schemas will be replicated. + SourceSchema ident.Schema + + // The schema for SQL replication in DB2. + SQLReplicationSchema ident.Schema + // The SQL schema in the target cluster to write into. + TargetSchema ident.Schema + // The fields below are extracted by Preflight. + host string + password string + port uint16 + user string + database string +} + +// defaultSQLReplicationSchema is the schema in DB2 where the staging tables are located +var defaultSQLReplicationSchema = ident.MustSchema(ident.New("ASNCDC")) + +// Bind adds flags to the set. It delegates to the embedded Config.Bind. +func (c *Config) Bind(f *pflag.FlagSet) { + c.DLQ.Bind(f) + c.Script.Bind(f) + c.Sequencer.Bind(f) + c.Staging.Bind(f) + c.Target.Bind(f) + + f.StringVar(&c.InitialLSN, "defaultLSN", "", + "default log sequence number. Used if no state is persisted") + f.StringVar(&c.SourceConn, "sourceConn", "", + "the source database's connection string") + f.Var(ident.NewSchemaFlag(&c.SourceSchema), "sourceSchema", + "the SQL database schema in the source cluster") + f.Var(ident.NewSchemaFlag(&c.TargetSchema), "targetSchema", + "the SQL database schema in the target cluster to update") + f.Var(ident.NewSchemaFlag(&c.SQLReplicationSchema), "replicationSchema", + "the SQL database schema used by the DB2 SQL replication") + f.DurationVar(&c.PollingInterval, "pollingInterval", time.Second, "how often we check for new mutations") + +} + +// Preflight updates the configuration with sane defaults or returns an +// error if there are missing options for which a default cannot be +// provided. +func (c *Config) Preflight() error { + if err := c.DLQ.Preflight(); err != nil { + return err + } + if err := c.Script.Preflight(); err != nil { + return err + } + if err := c.Sequencer.Preflight(); err != nil { + return err + } + if err := c.Staging.Preflight(); err != nil { + return err + } + if err := c.Target.Preflight(); err != nil { + return err + } + if c.SourceConn == "" { + return errors.New("no SourceConn was configured") + } + if c.TargetSchema.Empty() { + return errors.New("no target schema specified") + } + if c.SQLReplicationSchema.Empty() { + c.SQLReplicationSchema = defaultSQLReplicationSchema + } + u, err := url.Parse(c.SourceConn) + if err != nil { + return err + } + port, err := strconv.ParseUint(u.Port(), 0, 16) + if err != nil { + return errors.Wrapf(err, "invalid port %s", u.Port()) + } + c.host = u.Hostname() + c.port = uint16(port) + c.user = u.User.Username() + c.password, _ = u.User.Password() + c.database, _ = strings.CutPrefix(u.Path, "/") + + return nil +} diff --git a/internal/source/db2/conn.go b/internal/source/db2/conn.go new file mode 100644 index 000000000..d2397eca3 --- /dev/null +++ b/internal/source/db2/conn.go @@ -0,0 +1,516 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/cockroachdb/cdc-sink/internal/util/stopvar" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" +) + +// Conn is responsible to pull the mutations from the DB2 staging tables +// and applying them to the target database via the logical.Batch interface. +// The process leverages the DB2 SQL replication, similar to +// the debezium DB2 connector. +// To set up DB2 for SQL replication, see the instructions at +// https://debezium.io/documentation/reference/stable/connectors/db2.html#setting-up-db2 +// Note: In DB2 identifiers (e.g. table names, column names) are converted to +// uppercase, unless they are in quotes. The target schema must use the +// same convention. +type Conn struct { + // The destination for writes. + acceptor types.MultiAcceptor + // The database connection. + db *sql.DB + // Memoize column metadata for each table. + columns *ident.TableMap[[]types.ColData] + // Configuration of the connector. + config *Config + // Memoize of primary keys position for each table. + // Maps the column, based on the order of ColData, to its position in the primary key. + primaryKeys *ident.TableMap[map[int]int] + // Memoize queries to check SQL replication. + replQueries map[string]string + // Access to the staging cluster. + stagingDB *types.StagingPool + // Persistent storage for WAL data. + memo types.Memo + // Memoization of queries to fetch change events. + tableQueries *ident.TableMap[string] + // The destination for writes. + target ident.Schema + // Access to the target database. + targetDB *types.TargetPool + // Consistent Point, managed by persistWALOffset. + walOffset notify.Var[*lsn] +} + +var ( + _ diag.Diagnostic = (*Conn)(nil) +) + +// Start the DB2 connector. +func (c *Conn) Start(ctx *stopper.Context) error { + // Call this first to load the previous offset. We want to reset our + // state before starting the main copier routine. + if err := c.persistWALOffset(ctx); err != nil { + return err + } + + // Start a process to copy data to the target. + ctx.Go(func() error { + for !ctx.IsStopping() { + if err := c.copyMessages(ctx); err != nil { + log.WithError(err).Warn("error while copying messages; will retry") + select { + case <-ctx.Stopping(): + case <-time.After(time.Second): + } + } + } + return nil + }) + + return nil +} + +// Diagnostic implements diag.Diagnostic. +func (c *Conn) Diagnostic(_ context.Context) any { + return map[string]any{ + "hostname": c.config.host, + "database": c.config.database, + "defaultLsn": c.config.InitialLSN, + "columns": c.columns, + } +} + +// accumulateBatch finds all the rows that have been changed on the given table +// and adds them to the batch. +func (c *Conn) accumulateBatch( + ctx *stopper.Context, sourceTable ident.Table, lsnRange lsnRange, batch *types.MultiBatch, +) (int, error) { + count := 0 + query, ok := c.tableQueries.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unknown table %q", sourceTable) + } + // TODO (silvano): we might want to run in batches with a limit. + rows, err := c.db.QueryContext(ctx, query, lsnRange.From.Value[:], lsnRange.To.Value[:]) + if err != nil { + return 0, err + } + defer rows.Close() + cols, err := rows.ColumnTypes() + if err != nil { + return 0, err + } + // the first four columns are metadata (opcode, commit_seq, intent_seq, op_id) + numMetaColumns := 5 + numCols := len(cols) + if numCols <= numMetaColumns { + return 0, errors.Errorf("error in query template. Not enough columns. %q", query) + } + dataCols := cols[numMetaColumns:] + for rows.Next() { + count++ + res := make([]any, numCols) + ptr := make([]any, numCols) + for i := range cols { + ptr[i] = &res[i] + } + err = rows.Scan(ptr...) + if err != nil { + return count, err + } + // first column is the opcode + opcode, ok := res[0].(int32) + if !ok || !validOperation(opcode) { + return count, errors.Errorf("invalid operation %T", res[0]) + } + op := operation(opcode) + // second column is a timestamp + ts, ok := res[1].(time.Time) + if !ok { + return count, errors.Errorf("invalid timestamp %v", res[1]) + } + values := make([]any, len(res)-numMetaColumns) + for i, v := range res[numMetaColumns:] { + if v == nil { + values[i] = nil + continue + } + switch dataCols[i].DatabaseTypeName() { + case "VARCHAR", "CHAR", "CLOB": + s, ok := v.([]byte) + if !ok { + return count, errors.Errorf("invalid type %T, expected []byte", s) + } + values[i] = string(s) + case "DECIMAL", "DECFLOAT": + s, ok := v.([]byte) + if !ok { + return count, errors.Errorf("invalid type %T, expected []byte", s) + } + values[i], err = decimal.NewFromString(string(s)) + if err != nil { + errors.Wrapf(err, "cannot convert decimal %s", s) + return count, err + } + default: + values[i] = v + } + } + targetTable := ident.NewTable( + c.target, + ident.New(sourceTable.Table().Raw())) + targetCols, ok := c.columns.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unable to retrieve target columns for %s", sourceTable) + } + primaryKeys, ok := c.primaryKeys.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unable to retrieve primary keys for %s", sourceTable) + } + key := make([]any, len(primaryKeys)) + enc := make(map[string]any) + for idx, sourceCol := range values { + targetCol := targetCols[idx] + switch s := sourceCol.(type) { + case nil: + enc[targetCol.Name.Raw()] = nil + default: + enc[targetCol.Name.Raw()] = s + } + if targetCol.Primary { + key[primaryKeys[idx]] = sourceCol + } + } + var mut types.Mutation + mut.Time = hlc.New(ts.UnixNano(), 0) + if mut.Key, err = json.Marshal(key); err != nil { + return count, err + } + if op != deleteOp { + mut.Data, err = json.Marshal(enc) + if err != nil { + return count, err + } + } + mutationCount.With(prometheus.Labels{ + "table": sourceTable.Raw(), + "op": op.String()}).Inc() + log.Infof("(%s,%v) %s => %s \n", op, key, sourceTable.Raw(), targetTable.Raw()) + + script.AddMeta("db2", targetTable, &mut) + if err = batch.Accumulate(targetTable, mut); err != nil { + return count, err + } + } + return count, nil +} + +// copyMessages read messages from the DB2 staging tables, +// accumulates them, and commits data to the target +func (c *Conn) copyMessages(ctx *stopper.Context) error { + var err error + c.db, err = c.open() + if err != nil { + return errors.Wrap(err, "failed to open a connection to the target db") + } + defer func() { + err := c.db.Close() + if err != nil { + log.Errorf("Error closing the database. %q", err) + } + }() + previousLsn, _ := c.walOffset.Get() + if previousLsn == nil { + return errors.New("missing log sequence number") + } + log.Infof("DB2 connector starting at %x", previousLsn.Value) + for { + nextLsn, err := c.getNextLsn(ctx, previousLsn) + if err != nil { + return errors.Wrap(err, "cannot retrieve next sequence number") + } + log.Tracef("NEXT [%x]\n", nextLsn.Value) + if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) { + select { + case <-time.After(c.config.PollingInterval): + continue + case <-ctx.Stopping(): + return nil + } + } + start := time.Now() + log.Tracef("BEGIN [%x]\n", previousLsn.Value) + tables, stagingTables, err := c.getTables(ctx, c.config.SourceSchema) + if err != nil { + return errors.Wrap(err, "cannot get DB2 CDC tables") + } + currentLsnRange := lsnRange{ + From: previousLsn, + To: nextLsn, + } + batch := &types.MultiBatch{} + for idx, table := range tables { + start := time.Now() + stagingTable := stagingTables[idx] + err := c.populateTableMetadata(ctx, table, stagingTable) + if err != nil { + return errors.Wrap(err, "cannot retrieve table metadata") + } + count, err := c.accumulateBatch(ctx, table, currentLsnRange, batch) + if err != nil { + return errors.Wrap(err, "cannot post mutation") + } + if ctx.IsStopping() { + return nil + } + log.Tracef("post mutations for %s starting at %x. Count %d. Time %d.", table, + currentLsnRange.From.Value, count, + time.Since(start).Milliseconds()) + queryLatency.With(prometheus.Labels{ + "table": table.Canonical().Raw()}).Observe(time.Since(start).Seconds()) + } + if batch.Count() == 0 { + log.Trace("skipping empty transaction") + } else { + log.Debugf("COMMIT [%x]-[%x]\n", previousLsn.Value, nextLsn.Value) + if err := c.commit(ctx, batch); err != nil { + return errors.WithStack(err) + } + } + batchLatency.Observe(float64(time.Since(start).Seconds())) + previousLsn = nextLsn + c.walOffset.Set(previousLsn) + } +} + +// commit the current batch to the target database. +func (c *Conn) commit(ctx *stopper.Context, batch *types.MultiBatch) error { + batchSize.Observe(float64(batch.Count())) + tx, err := c.targetDB.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return errors.WithStack(err) + } + defer tx.Rollback() + if err := c.acceptor.AcceptMultiBatch(ctx, batch, &types.AcceptOptions{ + TargetQuerier: tx, + }); err != nil { + return errors.WithStack(err) + } + return tx.Commit() +} + +// getFirstLsnFound returns the first row that contains a log sequence number. +func getFirstLsnFound(rows *sql.Rows) (*lsn, error) { + if rows.Next() { + var v []byte + err := rows.Scan(&v) + if err != nil { + return nil, err + } + return newLSN(v) + } + return lsnZero(), nil +} + +// getLsnFromMonitoring retrieves the sequence number from +// the monitoring table +func (c *Conn) getLsnFromMonitoring(ctx *stopper.Context, current *lsn) (*lsn, error) { + // Checking the monitoring table for a batch of mutations. + rows, err := c.db.QueryContext(ctx, c.replQueries["nextLsn"], current.Value[:]) + if err != nil { + return nil, err + } + defer rows.Close() + return getFirstLsnFound(rows) +} + +// getMaxLsn retrieves the latest sequence number in all the DB2 staging tables. +func (c *Conn) getMaxLsn(ctx *stopper.Context) (*lsn, error) { + rows, err := c.db.QueryContext(ctx, c.replQueries["maxLsn"]) + if err != nil { + return nil, err + } + defer rows.Close() + return getFirstLsnFound(rows) +} + +// getNextLsn retrieves the next sequence number +func (c *Conn) getNextLsn(ctx *stopper.Context, current *lsn) (*lsn, error) { + // Checking the monitoring table first, maybe we can process a smaller batch. + lsn, err := c.getLsnFromMonitoring(ctx, current) + if err != nil { + return nil, err + } + // If we don't find a lsn in the monitoring table, then look in the + // DB2 staging tables. + if lsn.Equal(lsnZero()) { + return c.getMaxLsn(ctx) + } + return lsn, nil +} + +// getTables returns the source tables and corresponding staging tables in DB2. +// The DB2 staging tables store the committed transactional data +// for all the change events tracked by the SQL replication in the source database. +func (c *Conn) getTables( + ctx *stopper.Context, schema ident.Schema, +) ([]ident.Table, []ident.Table, error) { + var rows *sql.Rows + var err error + if schema.Empty() { + rows, err = c.db.QueryContext(ctx, c.replQueries["stagingTables"]) + } else { + rows, err = c.db.QueryContext(ctx, c.replQueries["stagingTablesForSchema"], schema.Raw()) + } + if err != nil { + return nil, nil, err + } + defer rows.Close() + tables := make([]ident.Table, 0) + stagingTables := make([]ident.Table, 0) + for rows.Next() { + var sourceSchema, sourceTable string + var stagingSchema, stagingTable string + err = rows.Scan(&sourceSchema, &sourceTable, &stagingSchema, &stagingTable) + if err != nil { + return nil, nil, err + } + tables = append(tables, ident.NewTable( + ident.MustSchema(ident.New(sourceSchema)), + ident.New(sourceTable))) + + stagingTables = append(stagingTables, ident.NewTable( + ident.MustSchema(ident.New(stagingSchema)), + ident.New(stagingTable))) + } + return tables, stagingTables, nil +} + +// open a new connection to the source database. +func (c *Conn) open() (*sql.DB, error) { + con := fmt.Sprintf("HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=%s", + c.config.host, + c.config.database, + c.config.port, + c.config.user, + c.config.password) + log.Debugf("DB2 connection: HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=...", + c.config.host, + c.config.database, + c.config.port, + c.config.user) + + return sql.Open("go_ibm_db", con) +} + +// persistLsn loads an existing value from memo into WALOffset or +// initializes to a user-provided value. It will also start a goroutine +// in the stopper to occasionally write an updated value back to the +// memo. +func (c *Conn) persistWALOffset(ctx *stopper.Context) error { + key := fmt.Sprintf("db2-lsn-%s", c.target.Raw()) + found, err := c.memo.Get(ctx, c.stagingDB, key) + if err != nil { + return err + } + cp := &lsn{} + if len(found) > 0 { + if err := cp.UnmarshalText(found); err != nil { + return err + } + } else if c.config.InitialLSN != "" { + if err := cp.UnmarshalText([]byte(c.config.InitialLSN)); err != nil { + return err + } + } + c.walOffset.Set(cp) + ctx.Go(func() error { + _, err := stopvar.DoWhenChanged(ctx, cp, &c.walOffset, + func(ctx *stopper.Context, _, cp *lsn) error { + if err := c.memo.Put(ctx, c.stagingDB, key, []byte(cp.String())); err == nil { + log.Tracef("stored WAL offset %s: %s", key, cp) + } else { + log.WithError(err).Warn("could not persist WAL offset") + } + return nil + }) + return err + }) + return nil +} + +// populateTableMetadata fetches the column metadata for a given table +// and caches it. +func (c *Conn) populateTableMetadata( + ctx *stopper.Context, sourceTable ident.Table, stagingTable ident.Table, +) error { + if _, ok := c.columns.Get(sourceTable); ok { + return nil + } + // to get the changes for the given source table, we need to query the staging table. + c.tableQueries.Put(sourceTable, + fmt.Sprintf(changeQuery, stagingTable, c.config.SQLReplicationSchema)) + // get the columns and primary keys. + rows, err := c.db.QueryContext(ctx, columnQuery, sourceTable.Schema().Raw(), sourceTable.Table().Raw()) + if err != nil { + return err + } + defer rows.Close() + colData := make([]types.ColData, 0) + primaryKeys := make(map[int]int) + idx := 0 + for rows.Next() { + var name, typ string + var pk *int + err = rows.Scan(&name, &pk, &typ) + if err != nil { + return err + } + if pk != nil { + primaryKeys[idx] = *pk - 1 + } + log.Tracef("Table %s col %q %q primary?: %t", sourceTable, name, typ, pk != nil) + colData = append(colData, types.ColData{ + Name: ident.New(name), + Primary: pk != nil, + Type: typ, + }) + idx++ + } + c.columns.Put(sourceTable, colData) + c.primaryKeys.Put(sourceTable, primaryKeys) + return nil +} diff --git a/internal/source/db2/db2.go b/internal/source/db2/db2.go new file mode 100644 index 000000000..62d96af75 --- /dev/null +++ b/internal/source/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains support for reading a db2 sql replication feed. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" +) + +// DB2 is a logical replication loop for the IBM DB2 database. +// It leverages SQL replication on the source. +type DB2 struct { + Conn *Conn + Diagnostics *diag.Diagnostics +} + +var ( + _ stdlogical.HasDiagnostics = (*DB2)(nil) +) + +// GetDiagnostics implements [stdlogical.HasDiagnostics]. +func (d *DB2) GetDiagnostics() *diag.Diagnostics { + return d.Diagnostics +} diff --git a/internal/source/db2/driver.go b/internal/source/db2/driver.go new file mode 100644 index 000000000..99209ccad --- /dev/null +++ b/internal/source/db2/driver.go @@ -0,0 +1,22 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import _ "github.com/ibmdb/go_ibm_db" diff --git a/internal/source/db2/injector.go b/internal/source/db2/injector.go new file mode 100644 index 000000000..f33f5720d --- /dev/null +++ b/internal/source/db2/injector.go @@ -0,0 +1,55 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build wireinject +// +build wireinject + +package db2 + +import ( + "context" + + scriptRuntime "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + scriptSequencer "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/staging" + "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +// Start creates a DB2 logical replication loop using the +// provided configuration. +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + panic(wire.Build( + wire.Bind(new(context.Context), new(*stopper.Context)), + wire.Struct(new(DB2), "*"), + wire.FieldsOf(new(*Config), "Script"), + wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Staging", "Target"), + Set, + immediate.Set, + chaos.Set, + diag.New, + scriptRuntime.Set, + scriptSequencer.Set, + sinkprod.Set, + staging.Set, + target.Set, + )) +} diff --git a/internal/source/db2/integration_test.go b/internal/source/db2/integration_test.go new file mode 100644 index 000000000..c651e5ccc --- /dev/null +++ b/internal/source/db2/integration_test.go @@ -0,0 +1,782 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import ( + "context" + "database/sql" + "fmt" + "math" + "os" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/sinktest" + "github.com/cockroachdb/cdc-sink/internal/sinktest/all" + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + _ "github.com/ibmdb/go_ibm_db" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const defaultSourceConn = "db2://db2inst1:SoupOrSecret@localhost:50000/TESTDB" + +type fixtureConfig struct { + chaos bool + script bool +} + +type tableInfo struct { + name ident.Ident + sourceColName ident.Ident + sourceColType string + targetColName ident.Ident + targetColType string +} + +func TestMain(m *testing.M) { + all.IntegrationMain(m, all.DB2Name) +} + +func TestB2Logical(t *testing.T) { + t.Run("consistent", func(t *testing.T) { testDB2Logical(t, &fixtureConfig{}) }) + t.Run("consistent-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{chaos: true}) + }) + t.Run("consistent-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{script: true}) + }) +} + +func testDB2Logical(t *testing.T, fc *fixtureConfig) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDB2LogicalInt(t, fixture, fc) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} +func testDB2LogicalInt(t *testing.T, fixture *base.Fixture, fc *fixtureConfig) { + a := assert.New(t) + r := require.New(t) + ctx := fixture.Context + // Using uppercase for consistency + tableName := ident.New("T") + targetColName := "V" + if fc.script { + targetColName = "v_mapped" + } + table := tableInfo{ + name: tableName, + sourceColName: ident.New("V"), + sourceColType: "varchar(20)", + targetColName: ident.New(targetColName), + targetColType: "string", + } + + repl, config, err := createRepl(ctx, fixture, fc, tableName) + r.NoError(err) + defer repl.cleanUp(ctx) + + tableMap, cancel, err := repl.createTables(ctx, table) + r.NoError(err) + defer cancel() + start := time.Now() + // CDC needs to be initialized again on DB2. + // TODO (silvano): switch to reInit, once we move the + // COMMIT_INTERVAL,SLEEP_INTERVAL settings to the image. + // Changing them requires a restart. + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + const rowCount = 128 + values := make([]any, rowCount) + for i := 0; i < rowCount; i++ { + values[i] = fmt.Sprintf("v=%d", i) + } + // Insert data into source table. + start = time.Now() + err = repl.insertValues(ctx, tableMap.source, values, false) + r.NoError(err) + log.Infof("Inserted rows in %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + start = time.Now() + for { + var count int + if err := repl.target.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", tableMap.target)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + log.Infof("Rows replicated %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET %s= 'updated'`, tableMap.source, table.sourceColName)) + r.NoError(err) + // Wait for the update to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE "PK" < 50`, tableMap.source)) + r.NoError(err) + // Wait for the deletes to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount-50 { + break + } + time.Sleep(1 * time.Second) + } + + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +// TestMetadata verifies that we are able to get table metadata from the source +func TestMetadata(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testMetadata(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testMetadata(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{}, ident.Ident{}) + r.NoError(err) + sourceSchema := repl.sourceSchema + + defer repl.cleanUp(ctx) + + tcs := []struct { + name string + table ident.Ident + stmt string + columns []types.ColData + keys map[int]int + }{ + { + name: "simple", + table: ident.New("ONE"), + stmt: "CREATE TABLE %s (k INT PRIMARY KEY NOT NULL, v INT)", + columns: []types.ColData{ + { + Name: ident.New("K"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("V"), + Primary: false, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0}, + }, + { + name: "few cols", + table: ident.New("TWO"), + stmt: "CREATE TABLE %s (k INT PRIMARY KEY NOT NULL, a VARCHAR(20),b TIME, c INT, d INT)", + columns: []types.ColData{ + { + Name: ident.New("K"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("A"), + Primary: false, + Type: "VARCHAR", + }, + { + Name: ident.New("B"), + Primary: false, + Type: "TIME", + }, + { + Name: ident.New("C"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("D"), + Primary: false, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0}, + }, + { + name: "few keys", + table: ident.New("THREE"), + stmt: "CREATE TABLE %s (k1 INT NOT NULL, a INT,b INT, c INT, d INT,k2 int NOT NULL, primary key (k1,k2))", + columns: []types.ColData{ + { + Name: ident.New("K1"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("A"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("B"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("C"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("D"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("K2"), + Primary: true, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0, 5: 1}, + }, + } + loop, err := Start(ctx, config) + conn := loop.Conn + start := time.Now() + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + a := assert.New(t) + r := require.New(t) + table := ident.NewTable(sourceSchema, tc.table) + cdtable := ident.NewTable(defaultSQLReplicationSchema, tc.table) + log.Infof("CREATING %s %s", sourceSchema, tc.table) + _, err := repl.source.ExecContext(ctx, fmt.Sprintf(tc.stmt, table)) + r.NoError(err) + // enable CDC for the table + _, err = repl.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", + sourceSchema.Raw(), table.Table().Raw())) + r.NoError(err) + defer func() error { + _, err := repl.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", + sourceSchema.Raw(), table.Table().Raw())) + return err + }() + err = conn.populateTableMetadata(ctx, table, cdtable) + r.NoError(err) + a.Equal(tc.keys, conn.primaryKeys.GetZero(table)) + columns := conn.columns.GetZero(table) + r.Equal(len(tc.columns), len(columns)) + for idx, col := range columns { + a.Equal(tc.columns[idx].Name, col.Name) + a.Equal(tc.columns[idx].Primary, col.Primary) + a.Equal(tc.columns[idx].Type, col.Type) + } + }) + } + +} +func TestDataTypes(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDataTypes(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testDataTypes(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + // Build a long value for string and byte types. + var sb strings.Builder + shortString := "0123456789ABCDEF" + for sb.Len() < 1<<16 { + sb.WriteString(shortString) + } + longString := sb.String() + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{}, ident.Ident{}) + r.NoError(err) + defer repl.cleanUp(ctx) + + tcs := []struct { + source string + size string + crdb string + values []any // We automatically test for NULL below. + }{ + + // BIGINT(size) + {`bigint`, ``, `bigint`, []any{0, -1, 112358}}, + // BINARY(size) + {`binary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + + // BLOB(size) + {`blob`, `(65536)`, `blob`, []any{[]byte(longString)}}, + + // BOOLEAN + // Currently SQL Replication on Db2 does not support BOOLEAN. + //{`boolean`, ``, `int`, []any{true, false}}, + // CHAR(size) + {`char`, `(128)`, `string`, []any{[]byte(shortString)}}, + // CLOB(size) + {`clob`, `(128)`, `string`, []any{[]byte(shortString)}}, + // DATE + {`date`, ``, `date`, []any{"1000-01-01", "9999-12-31"}}, + // DATETIME(fsp) + {`datetime`, ``, `timestamp`, []any{"1000-01-01 00:00:00", "9999-12-31 23:59:59"}}, + + {`dbclob`, `(128)`, `string`, []any{shortString}}, + + // DECFLOAT + // DECIMAL(size, d) + {`decfloat`, ``, `decimal`, []any{math.E, math.Phi, math.Pi}}, + {`decimal`, `(11,10)`, `decimal`, []any{math.E, math.Phi, math.Pi}}, + + {`double`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + {`float`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // INT(size) + // INTEGER(size) + {`int`, ``, `int`, []any{0, -1, 112358}}, + + {`real`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // SMALLINT(size) + {`smallint`, ``, `int`, []any{0, -1, 127}}, + + // TIME(fsp) + {`time`, ``, `time`, []any{"16:20:00"}}, + // TIMESTAMP(fsp) + {`timestamp`, ``, `timestamp`, []any{"1970-01-01 00:00:01", "2038-01-19 03:14:07"}}, + // VARBINARY(size) + {`varbinary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + // VARCHAR(size) + {`varchar`, `(128)`, `text`, []any{shortString}}, + + // Sentinel + {`char`, `(4)`, `text`, []any{`done`}}, + + // XML replication is not supported on LUW + //{`xml`, ``, `text`, `a`}, + } + + // Create a dummy table for each type. + tables := make([]*tableMapping, len(tcs)) + for idx, tc := range tcs { + tbl := tableInfo{ + name: ident.New(fmt.Sprintf("tgt_%s_%d", tc.source, idx)), + sourceColName: ident.New("V"), + sourceColType: tc.source + tc.size, + targetColName: ident.New("V"), + targetColType: tc.crdb, + } + tgt, cancel, err := repl.createTables(ctx, tbl) + r.NoError(err) + tables[idx] = tgt + defer cancel() + } + start := time.Now() + // CDC needs to be initialized again on DB2. + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + // Insert data in the source tables. + for idx, tc := range tcs { + err := repl.insertValues(ctx, tables[idx].source, tc.values, true) + r.NoError(err) + } + + // Wait for the CDC source tables to be fully populated. + lastidx := len(tcs) - 1 + expected := len(tcs[lastidx].values) + 1 + var count int + query := fmt.Sprintf("SELECT count(*) FROM %s", tables[lastidx].sourceCdc.Raw()) + for count < expected { + err := repl.source.QueryRowContext(ctx, fmt.Sprintf(query)).Scan(&count) + r.NoError(err) + if count < expected { + time.Sleep(1 * time.Second) + } + } + log.Infof("Tables fully populated in %d ms", time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + + // Wait for rows to show up. + for idx, tc := range tcs { + tc := tc + t.Run(tc.source, func(t *testing.T) { + expected := len(tc.values) + 1 + a := assert.New(t) + var count int + for count < expected { + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s", tables[idx].target)).Scan(&count); !a.NoError(err) { + return + } + if count < expected { + time.Sleep(100 * time.Millisecond) + + } + } + // Expect the test data and a row with a NULL value. + a.Equalf(expected, count, "mismatch in %s", tables[idx].target) + }) + } + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) +} + +// UTILITIES + +// replAdmin provides utilites to manage source and target database. +type replAdmin struct { + source *sql.DB + sourceSchema ident.Schema + target *sql.DB + targetSchema ident.Schema + restarted bool +} + +type tableMapping struct { + source ident.Table + sourceCdc ident.Table + target ident.Table +} + +// cdcCmd issues a command to the CDC service on the source +// database, waiting for the given string to appear in CDC status, if supplied. +func (r *replAdmin) cdcCmd(ctx context.Context, command string, expect string) (bool, error) { + query := fmt.Sprintf("VALUES ASNCDC.ASNCDCSERVICES('%s','asncdc') ", command) + out, err := r.source.QueryContext(ctx, query) + if err != nil { + return false, err + } + defer out.Close() + for out.Next() { + var msg string + err := out.Scan(&msg) + if err != nil { + return false, err + } + done := strings.Contains(msg, expect) + if done { + return true, nil + } + } + return false, nil +} + +// cdcWaitFor waits until the given string is seen in the CDC status. +func (r *replAdmin) cdcWaitFor(ctx context.Context, expect string) error { + done := false + var err error + for !done { + done, err = r.cdcCmd(ctx, "status", expect) + if err != nil { + return err + } + if !done { + time.Sleep(1000 * time.Millisecond) + } + } + return nil +} + +// cdcRestart restarts CDC in the source database +func (r *replAdmin) cdcRestart(ctx context.Context) error { + err := r.cdcStop(ctx) + if err != nil { + return err + } + return r.cdcStart(ctx) +} + +// cdcStart starts CDC in the source database +func (r *replAdmin) cdcStart(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "start", "") + if err != nil { + return err + } + r.restarted = true + return r.cdcWaitFor(ctx, "is doing work") +} + +// cdcReinit re-initializes CDC in the source database +func (r *replAdmin) cdcReinit(ctx context.Context) error { + up, err := r.cdcCmd(ctx, "status", "is doing work") + if err != nil { + return err + } + // We try to re-init first, since it's faster. + // If that fails, we restart it. + if up { + reinit, err := r.cdcCmd(ctx, "reinit", "REINIT") + if err != nil { + return err + } + if reinit { + log.Info("CDC was already up, we re-init it") + return r.cdcWaitFor(ctx, "is doing work") + } + } + return r.cdcRestart(ctx) +} + +// cdcStop stops CDC in the source database +func (r *replAdmin) cdcStop(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "stop", "") + if err != nil { + return err + } + return r.cdcWaitFor(ctx, "asncap is not running") +} + +func (r *replAdmin) cleanUp(ctx context.Context) error { + // if we restarted CDC, we need it to stop it. + if r.restarted { + err := r.cdcStop(ctx) + if err != nil { + return err + } + } + return r.source.Close() +} + +// createRepl sets up the replication environment. +// It creates a schema in the DB2 source. +// If tbl is provided, it sets up a userscript to process events for the table. +func createRepl( + ctx *stopper.Context, fixture *base.Fixture, fc *fixtureConfig, tblName ident.Ident, +) (*replAdmin, *Config, error) { + + dbName, _ := fixture.TargetSchema.Schema().Split() + // For now, we are using Debezium stored procedure to add tables to CDC. + // The stored procedure does not deal with case sensitivity well. + // TODO (silvano): fix stored procedure. + sourceSchema := ident.MustSchema(ident.New( + strings.ToUpper(strings.ReplaceAll(dbName.Raw(), "-", "_")))) + + var tbl ident.Table + if !tblName.Empty() { + tbl = ident.NewTable(fixture.TargetSchema.Schema(), tblName) + } + config, err := getConfig(fixture, fc, tbl, + sourceSchema, fixture.TargetSchema.Schema()) + if err != nil { + return nil, nil, err + } + + conn := &Conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } + db, err := conn.open() + if err != nil { + return nil, nil, err + } + // Create a schema on the source to store all the tables. + _, err = db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA %s", sourceSchema)) + if err != nil { + return nil, nil, err + } + // To speed up testing, we reduced commit/sleep/monitor interval to decrease latency. + // With this settings the Capture program has a significant impact on foreground + // activities, and they are NOT recommended in a production env. + // COMMIT_INTERVAL = interval in seconds for how often the Capture log reader thread commits. + // SLEEP_INTERVAL = interval in seconds that the Capture program sleeps when idle. + _, err = db.ExecContext(ctx, "UPDATE ASNCDC.IBMSNAP_CAPPARMS SET COMMIT_INTERVAL=2,SLEEP_INTERVAL=1") + if err != nil { + return nil, nil, err + } + // We start the replication from the current max log sequence number + lsn, err := getCurrentLsn(ctx, db) + if err != nil { + return nil, nil, err + } + config.InitialLSN = fmt.Sprintf("%x", lsn.Value) + log.Infof("Starting consistent point %s", config.InitialLSN) + return &replAdmin{ + source: db, + sourceSchema: sourceSchema, + target: fixture.TargetPool.DB, + targetSchema: fixture.TargetSchema.Schema(), + }, config, nil +} + +func (r *replAdmin) createTables( + ctx context.Context, table tableInfo, +) (*tableMapping, func() error, error) { + // TODO (silvano): fix case sensitivity. + tb := ident.New(strings.ToUpper(table.name.Raw())) + tgt := ident.NewTable(r.targetSchema, tb) + src := ident.NewTable(r.sourceSchema, tb) + + cdc := ident.NewTable(defaultSQLReplicationSchema, + ident.New(strings.ToUpper(fmt.Sprintf("CDC_%s_%s", + r.sourceSchema.Raw(), tb.Raw())))) + res := &tableMapping{ + source: src, + sourceCdc: cdc, + target: tgt, + } + // Create the schema in both locations. + stmt := `CREATE TABLE %s ("PK" INT not null, %s %s, "OK" INT NOT NULL, PRIMARY KEY ("PK","OK"))` + _, err := r.source.ExecContext(ctx, fmt.Sprintf(stmt, src, table.sourceColName, table.sourceColType)) + if err != nil { + return &tableMapping{}, nil, err + } + _, err = r.target.ExecContext(ctx, fmt.Sprintf(stmt, tgt, table.targetColName, table.targetColType)) + if err != nil { + return &tableMapping{}, nil, err + } + // enable CDC for the table + _, err = r.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw())) + if err != nil { + return &tableMapping{}, nil, err + } + return res, func() error { + query := fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw()) + _, err := r.source.ExecContext(ctx, query) + return err + }, nil +} + +// getConfig is an helper function to create a configuration for the connector +func getConfig( + fixture *base.Fixture, + fc *fixtureConfig, + tbl ident.Table, + dbName ident.Schema, + targetSchema ident.Schema, +) (*Config, error) { + + crdbPool := fixture.TargetPool + conn := defaultSourceConn + if found := os.Getenv("TEST_DB2_CONN"); len(found) > 0 { + conn = found + } + + config := &Config{ + Staging: sinkprod.StagingConfig{ + Schema: fixture.StagingDB.Schema(), + }, + Target: sinkprod.TargetConfig{ + ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier. + Conn: crdbPool.ConnectionString, + }, + + TargetSchema: targetSchema, + SourceConn: conn, + SourceSchema: dbName, + } + if fc.chaos { + config.Sequencer.Chaos = 0.0005 + } + if fc.script { + config.Script = script.Config{ + FS: scripttest.ScriptFSFor(tbl), + MainPath: "/testdata/logical_test_db2.ts", + } + } + return config, config.Preflight() +} + +// getCurrentLsn retrieves the latest sequence number in the monitoring tables. +func getCurrentLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, "select MAX(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON") + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + return newLSN(v) + } + return nil, err + } + return lsnZero(), nil +} + +func (r *replAdmin) insertValues( + ctx context.Context, tbl ident.Table, values []any, insertNull bool, +) error { + tx, err := r.source.Begin() + if err != nil { + return err + } + defer tx.Rollback() + for valIdx, value := range values { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (?, ?, ?)`, tbl), valIdx, value, valIdx*2); err != nil { + return err + } + } + if insertNull { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (-1, NULL, -2)`, tbl)); err != nil { + return err + } + } + return tx.Commit() +} diff --git a/internal/source/db2/lsn.go b/internal/source/db2/lsn.go new file mode 100644 index 000000000..62e58ab17 --- /dev/null +++ b/internal/source/db2/lsn.go @@ -0,0 +1,114 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/pkg/errors" +) + +// lsn is the Log Sequence Number. +// It represents the offset, in bytes, of a log record from the beginning of a database log file +type lsn struct { + Value [16]byte +} + +type lsnRange struct { + From *lsn + To *lsn +} + +var _ stamp.Stamp = (*lsn)(nil) +var _ fmt.Stringer = (*lsn)(nil) + +func newLSN(v []byte) (*lsn, error) { + if len(v) == 0 { + return lsnZero(), nil + } + if len(v) != 16 { + return nil, errors.Errorf("invalid lsn %s %d", string(v), len(v)) + } + return &lsn{ + Value: ([16]byte)(v), + }, nil +} + +// Less implements stamp.Stamp. +func (l *lsn) Less(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Compare(l.Value[:], o.Value[:]) < 0 +} + +// Equal check if two log sequence numbers are the same. +func (l *lsn) Equal(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Equal(l.Value[:], o.Value[:]) +} + +// String implements fmt.Stringer. +func (l *lsn) String() string { + return fmt.Sprintf("%x", l.Value) +} + +// lsnMemo is used to store the lsn as JSON object in the memo table. +type lsnMemo struct { + Value []byte `json:"value,omitempty"` +} + +func (l *lsn) MarshalJSON() ([]byte, error) { + p := lsnMemo{Value: l.Value[:]} + return json.Marshal(p) +} + +func (l *lsn) UnmarshalJSON(data []byte) error { + var p lsnMemo + if err := json.Unmarshal(data, &p); err != nil { + return errors.WithStack(err) + } + if len(p.Value) != 16 { + return errors.Errorf("invalid lsn %s %d", string(data), len(p.Value)) + } + l.Value = ([16]byte)(p.Value) + return nil +} + +// UnmarshalText supports CLI flags and default values. +func (l *lsn) UnmarshalText(data []byte) (err error) { + if data == nil { + l.Value = lsnZero().Value + return + } + if len(data) != 32 { + return errors.Errorf("invalid lsn %s", string(data)) + } + var v []byte + v, err = hex.DecodeString(string(data)) + l.Value = ([16]byte)(v) + return +} + +func lsnZero() *lsn { + z := make([]byte, 16) + return &lsn{ + Value: ([16]byte)(z), + } +} diff --git a/internal/source/db2/lsn_test.go b/internal/source/db2/lsn_test.go new file mode 100644 index 000000000..d2800585d --- /dev/null +++ b/internal/source/db2/lsn_test.go @@ -0,0 +1,150 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "encoding/hex" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + zero = "00000000000000000000000000000000" + one = "00000000000000000000000000000001" + max = "7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF" + zerob = [16]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + oneb = [16]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1} + maxb = [16]byte{0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} +) + +func parse(t *testing.T, s string) *lsn { + o, err := hex.DecodeString(s) + require.NoError(t, err) + lsn, err := newLSN(o) + require.NoError(t, err) + return lsn +} + +// TestLess verifies stamp.Stamp.Less +func TestLess(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, false}, + {"zero_one", zero, one, true}, + {"one_zero", one, zero, false}, + {"zero_max", zero, max, true}, + {"max_zero", max, zero, false}, + {"max", max, max, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Less(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestLess verifies equality between two Log Sequence Number +func TestEqual(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, true}, + {"zero_one", zero, one, false}, + {"one_zero", one, zero, false}, + {"zero_max", zero, max, false}, + {"max_zero", max, zero, false}, + {"max", max, max, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Equal(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestJSON verifies marshalling to JSON and unmarshalling from JSON. +func TestJSON(t *testing.T) { + tests := []struct { + name string + one [16]byte + want []byte + }{ + {"zero", zerob, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x3d, 0x3d, 0x22, 0x7d}}, + {"one", oneb, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x51, 0x3d, 0x3d, 0x22, 0x7d}}, + {"max", maxb, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x66, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x39, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x77, 0x3d, 0x3d, 0x22, 0x7d}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := &lsn{Value: tt.one} + got, err := one.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, tt.want, got) + // verify roundtrip + rt := &lsn{} + err = rt.UnmarshalJSON(got) + require.NoError(t, err) + assert.Equal(t, one.Value, rt.Value) + }) + } +} + +// TestUnmarshalText verifies unmarshalling from a text string. +func TestUnmarshalText(t *testing.T) { + tests := []struct { + name string + text string + want [16]byte + err string + }{ + {"zero", zero, zerob, ""}, + {"one", one, oneb, ""}, + {"max", max, maxb, ""}, + {"odd", "0", zerob, "invalid"}, + {"short", "00", zerob, "invalid"}, + {"long", strings.Repeat("0", 34), zerob, "invalid"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := assert.New(t) + rt := &lsn{} + err := rt.UnmarshalText([]byte(tt.text)) + if tt.err != "" { + a.Error(err) + a.ErrorContains(err, tt.err) + } else { + a.NoError(err) + a.Equal(tt.want, rt.Value) + } + }) + } +} diff --git a/internal/source/db2/metrics.go b/internal/source/db2/metrics.go new file mode 100644 index 000000000..416691387 --- /dev/null +++ b/internal/source/db2/metrics.go @@ -0,0 +1,48 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + batchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_latency", + Help: "the length of time it took to process a batch", + Buckets: metrics.LatencyBuckets, + }) + batchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_size", + Help: "the size of batch", + Buckets: metrics.Buckets(1, 10000), + }) + mutationCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "db2_mutations_total", + Help: "Total number of mutations by source table.", + }, + []string{"table", "op"}, + ) + queryLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "db2_query_latency", + Help: "the length of time it took to process a query", + Buckets: metrics.LatencyBuckets, + }, []string{"table"}) +) diff --git a/internal/source/db2/operation.go b/internal/source/db2/operation.go new file mode 100644 index 000000000..f82984d56 --- /dev/null +++ b/internal/source/db2/operation.go @@ -0,0 +1,36 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +//go:generate go run golang.org/x/tools/cmd/stringer -type=operation + +// operation represents the type of operation in the staging table. +type operation int + +const ( + unknownOp operation = iota + deleteOp + insertOp + updateOp + beginOp + endOp + rollbackOp +) + +func validOperation(value int32) bool { + return value >= int32(unknownOp) && value <= int32(rollbackOp) +} diff --git a/internal/source/db2/operation_string.go b/internal/source/db2/operation_string.go new file mode 100644 index 000000000..df1d306ce --- /dev/null +++ b/internal/source/db2/operation_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=operation"; DO NOT EDIT. + +package db2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[unknownOp-0] + _ = x[deleteOp-1] + _ = x[insertOp-2] + _ = x[updateOp-3] + _ = x[beginOp-4] + _ = x[endOp-5] + _ = x[rollbackOp-6] +} + +const _operation_name = "unknownOpdeleteOpinsertOpupdateOpbeginOpendOprollbackOp" + +var _operation_index = [...]uint8{0, 9, 17, 25, 33, 40, 45, 55} + +func (i operation) String() string { + if i < 0 || i >= operation(len(_operation_index)-1) { + return "operation(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _operation_name[_operation_index[i]:_operation_index[i+1]] +} diff --git a/internal/source/db2/provider.go b/internal/source/db2/provider.go new file mode 100644 index 000000000..4298fd4a4 --- /dev/null +++ b/internal/source/db2/provider.go @@ -0,0 +1,107 @@ +// Copyright 2025 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + scriptSeq "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideConn, + ProvideEagerConfig, +) + +// ProvideConn is called by Wire to construct this package's +// logical.Dialect implementation. There's a fake dependency on +// the script loader so that flags can be evaluated first. +func ProvideConn( + ctx *stopper.Context, + acc *apply.Acceptor, + immediate *immediate.Immediate, + chaos *chaos.Chaos, + config *Config, + memo types.Memo, + scriptSeq *scriptSeq.Sequencer, + stagingPool *types.StagingPool, + targetPool *types.TargetPool, + watchers types.Watchers, +) (*Conn, error) { + if err := config.Preflight(); err != nil { + return nil, err + } + seq, err := scriptSeq.Wrap(ctx, immediate) + if err != nil { + return nil, err + } + seq, err = chaos.Wrap(ctx, seq) // No-op if probability is 0. + if err != nil { + return nil, err + } + connAcceptor, _, err := seq.Start(ctx, &sequencer.StartOptions{ + Delegate: types.OrderedAcceptorFrom(acc, watchers), + Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. + Group: &types.TableGroup{ + Name: ident.New(config.TargetSchema.Raw()), + Enclosing: config.TargetSchema, + }, + }) + if err != nil { + return nil, err + } + + opQueries := make(map[string]string) + for k, v := range opQueryTemplates { + opQueries[k] = fmt.Sprintf(v, config.SQLReplicationSchema) + } + ret := &Conn{ + + acceptor: connAcceptor, + columns: &ident.TableMap[[]types.ColData]{}, + config: config, + memo: memo, + primaryKeys: &ident.TableMap[map[int]int]{}, + replQueries: opQueries, + stagingDB: stagingPool, + tableQueries: &ident.TableMap[string]{}, + target: config.TargetSchema, + targetDB: targetPool, + walOffset: notify.Var[*lsn]{}, + } + + return ret, ret.Start(ctx) +} + +// ProvideEagerConfig is a hack to move up the evaluation of the user +// script so that the options callbacks can set any non-script-related +// CLI flags. +func ProvideEagerConfig(cfg *Config, _ *script.Loader) (*EagerConfig, error) { + return (*EagerConfig)(cfg), cfg.Preflight() +} diff --git a/internal/source/db2/queries.go b/internal/source/db2/queries.go new file mode 100644 index 000000000..9a5637366 --- /dev/null +++ b/internal/source/db2/queries.go @@ -0,0 +1,111 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +var ( + opQueryTemplates = map[string]string{ + // maxLsn is the max Log Sequence Number stored in the staging tables. + // This is used during live capture of events + "maxLsn": ` + SELECT Max(t.synchpoint) + FROM ( + SELECT cd_new_synchpoint AS synchpoint + FROM %[1]s.ibmsnap_register + UNION ALL + SELECT synchpoint AS synchpoint + FROM %[1]s.ibmsnap_register) t`, + + // nexLsn is the Log Sequence Number of the next batch from monitoring table. + // This is used during catch up, to batch mutations. + // The monitoring table is refreshed periodically based on the MONITOR_INTERVAL parameter as + // set it the ASNCDC.IBMSNAP_CAPPARMS table. By default, it set to 300 seconds. + "nextLsn": ` + SELECT Min(restart_seq) + FROM %[1]s.ibmsnap_capmon + WHERE cd_rows_inserted > 0 + AND restart_seq > ?`, + // get tables with mutations across all the schemata. + "stagingTables": ` + SELECT r.source_owner, + r.source_table, + r.cd_owner, + r.cd_table + FROM %[1]s.ibmsnap_register r + LEFT JOIN syscat.tables t + ON r.source_owner = t.tabschema + AND r.source_table = t.tabname + WHERE r.source_owner <> ''`, + // get tables with mutations within a schema + "stagingTablesForSchema": ` + SELECT r.source_owner, + r.source_table, + r.cd_owner, + r.cd_table + FROM %[1]s.ibmsnap_register r + LEFT JOIN syscat.tables t + ON r.source_owner = t.tabschema + AND r.source_table = t.tabname + WHERE r.source_owner = ? + `, + } + + // The IBMSNAP_UOW table provides additional information about transactions + // that have been committed to a source table, such as a timestamp. + // We join the IBMSNAP_UOW and change data (CD) tables based on matching + // IBMSNAP_COMMITSEQ values when to propose changes to the target tables. + // + // Note: we don't consider deletes before updates + // (we only look at the values after the update) + changeQuery = ` + SELECT * + FROM (SELECT CASE + WHEN ibmsnap_operation = 'D' + AND ( Lead(cdc.ibmsnap_operation, 1, 'X') + OVER ( + PARTITION BY cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_intentseq) ) = 'I' THEN 0 + WHEN ibmsnap_operation = 'I' + AND ( Lag(cdc.ibmsnap_operation, 1, 'X') + OVER ( + PARTITION BY cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_intentseq) ) = 'D' THEN 3 + WHEN ibmsnap_operation = 'D' THEN 1 + WHEN ibmsnap_operation = 'I' THEN 2 + END OPCODE, + uow.IBMSNAP_LOGMARKER, + cdc.* + FROM %[1]s cdc , %[2]s.IBMSNAP_UOW uow + WHERE cdc.ibmsnap_commitseq > ? + AND cdc.ibmsnap_commitseq <= ? + AND uow.ibmsnap_commitseq=cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_commitseq, + cdc.ibmsnap_intentseq) + WHERE opcode != 0 + ` + columnQuery = ` + SELECT c.colname, + c.keyseq, + c.typename + FROM syscat.tables AS t + INNER JOIN syscat.columns AS c + ON t.tabname = c.tabname + AND t.tabschema = c.tabschema + WHERE t.tabschema = ? + AND t.tabname = ? + ORDER BY c.colno + ` +) diff --git a/internal/source/db2/wire_gen.go b/internal/source/db2/wire_gen.go new file mode 100644 index 000000000..2e2bfb597 --- /dev/null +++ b/internal/source/db2/wire_gen.go @@ -0,0 +1,90 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + script2 "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/staging/memo" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" + "github.com/cockroachdb/cdc-sink/internal/util/applycfg" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" +) + +// Injectors from injector.go: + +// Start creates a DB2 logical replication loop using the +// provided configuration. +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + diagnostics := diag.New(ctx) + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + scriptConfig := &config.Script + loader, err := script.ProvideLoader(ctx, configs, scriptConfig, diagnostics) + if err != nil { + return nil, err + } + eagerConfig, err := ProvideEagerConfig(config, loader) + if err != nil { + return nil, err + } + targetConfig := &eagerConfig.Target + targetPool, err := sinkprod.ProvideTargetPool(ctx, targetConfig, diagnostics) + if err != nil { + return nil, err + } + targetStatements, err := sinkprod.ProvideStatementCache(targetConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlqConfig := &eagerConfig.DLQ + watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(dlqConfig, targetPool, watchers) + acceptor, err := apply.ProvideAcceptor(ctx, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + immediateImmediate := &immediate.Immediate{} + sequencerConfig := &eagerConfig.Sequencer + chaosChaos := &chaos.Chaos{ + Config: sequencerConfig, + } + stagingConfig := &eagerConfig.Staging + stagingPool, err := sinkprod.ProvideStagingPool(ctx, stagingConfig, diagnostics, targetConfig) + if err != nil { + return nil, err + } + stagingSchema, err := sinkprod.ProvideStagingDB(stagingConfig) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + sequencer := script2.ProvideSequencer(loader, targetPool, watchers) + conn, err := ProvideConn(ctx, acceptor, immediateImmediate, chaosChaos, config, memoMemo, sequencer, stagingPool, targetPool, watchers) + if err != nil { + return nil, err + } + db2 := &DB2{ + Conn: conn, + Diagnostics: diagnostics, + } + return db2, nil +} diff --git a/main.go b/main.go index f9f5b327f..90c036d5a 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/cockroachdb/cdc-sink/internal/cmd/db2" "github.com/cockroachdb/cdc-sink/internal/cmd/dumphelp" "github.com/cockroachdb/cdc-sink/internal/cmd/dumptemplates" "github.com/cockroachdb/cdc-sink/internal/cmd/licenses" @@ -103,6 +104,8 @@ func main() { f.CountVarP(&verbosity, "verbose", "v", "increase logging verbosity to debug; repeat for trace") root.AddCommand( + db2.Command(), + db2.Install(), dumphelp.Command(), dumptemplates.Command(), licenses.Command(),