From 96b424b4833324559053e5e1e069030ff1a227e0 Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Thu, 4 Jan 2024 21:24:42 -0500 Subject: [PATCH] First rough draft for the DB2 connector. Uses SQL replication, leveraging staging tables in the source database. The connector periodically pull new mutations from the staging tables. --- .github/db2/Dockerfile | 28 + .github/db2/README.md | 3 + .github/db2/asncdc.c | 162 +++++ .github/db2/asncdc_UDF.sql | 17 + .github/db2/asncdcaddremove.sql | 194 ++++++ .github/db2/asncdctables.sql | 479 ++++++++++++++ .github/db2/cdcsetup.sh | 18 + .github/db2/custom-init/cleanup_storage.sh | 12 + .github/db2/dbsetup.sh | 51 ++ .github/db2/inventory.sql | 77 +++ .github/db2/openshift_entrypoint.sh | 19 + .github/db2/startup-agent.sql | 1 + .github/db2/startup-cdc-demo.sql | 9 + .github/docker-compose.yml | 21 +- .github/workflows/go-test-db2.yaml | 207 ++++++ .github/workflows/go-tests.yaml | 2 +- .github/workflows/go.yaml | 13 + .gitignore | 6 + Makefile | 34 + go.mod | 1 + go.sum | 2 + internal/cmd/db2/db2.go | 39 ++ internal/cmd/db2/install.go | 226 +++++++ internal/sinktest/all/integration.go | 4 + .../scripttest/testdata/logical_test_db2.ts | 55 ++ internal/source/db2/config.go | 94 +++ internal/source/db2/conn.go | 279 ++++++++ internal/source/db2/db2.go | 40 ++ internal/source/db2/driver.go | 22 + internal/source/db2/injector.go | 46 ++ internal/source/db2/integration_test.go | 619 ++++++++++++++++++ internal/source/db2/lsn.go | 91 +++ internal/source/db2/metrics.go | 53 ++ internal/source/db2/operation_string.go | 29 + internal/source/db2/provider.go | 49 ++ internal/source/db2/queries.go | 356 ++++++++++ internal/source/db2/wire_gen.go | 90 +++ main.go | 3 + 38 files changed, 3449 insertions(+), 2 deletions(-) create mode 100644 .github/db2/Dockerfile create mode 100644 .github/db2/README.md create mode 100644 .github/db2/asncdc.c create mode 100644 .github/db2/asncdc_UDF.sql create mode 100644 .github/db2/asncdcaddremove.sql create mode 100644 .github/db2/asncdctables.sql create mode 100755 .github/db2/cdcsetup.sh create mode 100644 .github/db2/custom-init/cleanup_storage.sh create mode 100755 .github/db2/dbsetup.sh create mode 100644 .github/db2/inventory.sql create mode 100755 .github/db2/openshift_entrypoint.sh create mode 100644 .github/db2/startup-agent.sql create mode 100644 .github/db2/startup-cdc-demo.sql create mode 100644 .github/workflows/go-test-db2.yaml create mode 100644 Makefile create mode 100644 internal/cmd/db2/db2.go create mode 100644 internal/cmd/db2/install.go create mode 100644 internal/sinktest/scripttest/testdata/logical_test_db2.ts create mode 100644 internal/source/db2/config.go create mode 100644 internal/source/db2/conn.go create mode 100644 internal/source/db2/db2.go create mode 100644 internal/source/db2/driver.go create mode 100644 internal/source/db2/injector.go create mode 100644 internal/source/db2/integration_test.go create mode 100644 internal/source/db2/lsn.go create mode 100644 internal/source/db2/metrics.go create mode 100644 internal/source/db2/operation_string.go create mode 100644 internal/source/db2/provider.go create mode 100644 internal/source/db2/queries.go create mode 100644 internal/source/db2/wire_gen.go diff --git a/.github/db2/Dockerfile b/.github/db2/Dockerfile new file mode 100644 index 000000000..49008d71b --- /dev/null +++ b/.github/db2/Dockerfile @@ -0,0 +1,28 @@ +FROM icr.io/db2_community/db2 + +LABEL maintainer="Debezium Community" + +RUN mkdir -p /asncdctools/src + +ADD asncdc_UDF.sql /asncdctools/src +ADD asncdcaddremove.sql /asncdctools/src +ADD asncdctables.sql /asncdctools/src +ADD dbsetup.sh /asncdctools/src +ADD startup-agent.sql /asncdctools/src +ADD startup-cdc-demo.sql /asncdctools/src +ADD inventory.sql /asncdctools/src +ADD asncdc.c /asncdctools/src + +RUN mkdir /var/custom && \ + chmod -R 777 /asncdctools && \ + chmod -R 777 /var/custom + +ADD cdcsetup.sh /var/custom +ADD custom-init /var/custom-init + +RUN chmod -R 777 /var/custom-init + +ADD openshift_entrypoint.sh /var/db2_setup/lib + +RUN chmod 777 /var/custom/cdcsetup.sh && \ + chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh diff --git a/.github/db2/README.md b/.github/db2/README.md new file mode 100644 index 000000000..9c05eae9f --- /dev/null +++ b/.github/db2/README.md @@ -0,0 +1,3 @@ +# DB2 container + +This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables. \ No newline at end of file diff --git a/.github/db2/asncdc.c b/.github/db2/asncdc.c new file mode 100644 index 000000000..be1fc483f --- /dev/null +++ b/.github/db2/asncdc.c @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include +#include + +void SQL_API_FN asncdcservice( + SQLUDF_VARCHAR *asnCommand, /* input */ + SQLUDF_VARCHAR *asnService, + SQLUDF_CLOB *fileData, /* output */ + /* null indicators */ + SQLUDF_NULLIND *asnCommand_ind, /* input */ + SQLUDF_NULLIND *asnService_ind, + SQLUDF_NULLIND *fileData_ind, + SQLUDF_TRAIL_ARGS, + struct sqludf_dbinfo *dbinfo) +{ + + int fd; + char tmpFileName[] = "/tmp/fileXXXXXX"; + fd = mkstemp(tmpFileName); + + int strcheck = 0; + char cmdstring[256]; + + + char* szDb2path = getenv("HOME"); + + + + char str[20]; + int len = 0; + char c; + char *buffer = NULL; + FILE *pidfile; + + char dbname[129]; + memset(dbname, '\0', 129); + strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen); + dbname[dbinfo->dbnamelen] = '\0'; + + int pid; + if (strcmp(asnService, "asncdc") == 0) + { + strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName); + int callcheck; + callcheck = system(cmdstring); + pidfile = fopen(tmpFileName, "r"); + while ((c = fgetc(pidfile)) != EOF) + { + if (c == '\n') + { + break; + } + len++; + } + buffer = (char *)malloc(sizeof(char) * len); + fseek(pidfile, 0, SEEK_SET); + fread(buffer, sizeof(char), len, pidfile); + fclose(pidfile); + pidfile = fopen(tmpFileName, "w"); + if (strcmp(asnCommand, "start") == 0) + { + if (len == 0) // is not running + { + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname); + fprintf(pidfile, "start --> %s \n", cmdstring); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is already running"); + } + } + if ((strcmp(asnCommand, "prune") == 0) || + (strcmp(asnCommand, "reinit") == 0) || + (strcmp(asnCommand, "suspend") == 0) || + (strcmp(asnCommand, "resume") == 0) || + (strcmp(asnCommand, "status") == 0) || + (strcmp(asnCommand, "stop") == 0)) + { + if (len > 0) + { + //buffer[len] = '\0'; + //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer); + //fprintf(pidfile, "stop --> %s", cmdstring); + //callcheck = system(cmdstring); + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName); + //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is not running"); + } + } + + fclose(pidfile); + } + /* system(cmdstring); */ + + int rc = 0; + long fileSize = 0; + size_t readCnt = 0; + FILE *f = NULL; + + f = fopen(tmpFileName, "r"); + if (!f) + { + strcpy(SQLUDF_MSGTX, "Could not open file "); + strncat(SQLUDF_MSGTX, tmpFileName, + SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1); + strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN); + return; + } + + rc = fseek(f, 0, SEEK_END); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN); + return; + } + + /* verify the file size */ + fileSize = ftell(f); + if (fileSize > fileData->length) + { + strcpy(SQLUDF_MSGTX, "File too large"); + strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN); + return; + } + + /* go to the beginning and read the entire file */ + rc = fseek(f, 0, 0); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN); + return; + } + + readCnt = fread(fileData->data, 1, fileSize, f); + if (readCnt != fileSize) + { + /* raise a warning that something weird is going on */ + sprintf(SQLUDF_MSGTX, "Could not read entire file " + "(%d vs %d)", + readCnt, fileSize); + strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN); + *fileData_ind = -1; + } + else + { + fileData->length = readCnt; + *fileData_ind = 0; + } + // remove temorary file + rc = remove(tmpFileName); + //fclose(pFile); +} diff --git a/.github/db2/asncdc_UDF.sql b/.github/db2/asncdc_UDF.sql new file mode 100644 index 000000000..fc1da4f9a --- /dev/null +++ b/.github/db2/asncdc_UDF.sql @@ -0,0 +1,17 @@ +DROP SPECIFIC FUNCTION ASNCDC.asncdcservice; + +CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8)) + RETURNS CLOB(100K) + SPECIFIC asncdcservice + EXTERNAL NAME 'asncdc!asncdcservice' + LANGUAGE C + PARAMETER STYLE SQL + DBINFO + DETERMINISTIC + NOT FENCED + RETURNS NULL ON NULL INPUT + NO SQL + NO EXTERNAL ACTION + NO SCRATCHPAD + ALLOW PARALLEL + NO FINAL CALL; \ No newline at end of file diff --git a/.github/db2/asncdcaddremove.sql b/.github/db2/asncdcaddremove.sql new file mode 100644 index 000000000..e5484f06a --- /dev/null +++ b/.github/db2/asncdcaddremove.sql @@ -0,0 +1,194 @@ + +-- +-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE() +-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table +-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table +-- + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE stmtSQL VARCHAR(2048); + +DECLARE SQLCODE INT; +DECLARE SQLSTATE CHAR(5); +DECLARE RC_SQLCODE INT DEFAULT 0; +DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000'; + +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE; + +-- delete ASN.IBMSNAP_PRUNCTL entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_Register entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- drop CD Table / source +SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename ; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUBS_COLS entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_COLVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2'; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_TABVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE SQLSTATE CHAR(5); + +DECLARE stmtSQL VARCHAR(2048); + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' AS ( SELECT ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' || + ' CAST ('''' AS CHAR(1)) ' || + ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' || + tableschema || '_' || tablename || + ' ON ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' VOLATILE CARDINALITY'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' || + 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' || + 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' || + 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' || + 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' || + 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' || + 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' || + 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''N'', ' || + '1, ' || + '''Y'', ' || + '''Y'', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + 'null, ' || + 'null, ' || + '0, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + '''0801'', ' || + 'null, ' || + 'null, ' || + '''0'', ' || + '''Y'', ' || + '''N'', ' || + '''Y'', ' || + '''NNNN'', ' || + '''Y'', ' || + '''A'',' || + 'null ) '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || + 'TARGET_SERVER, ' || + 'TARGET_OWNER, ' || + 'TARGET_TABLE, ' || + 'SYNCHTIME, ' || + 'SYNCHPOINT, ' || + 'SOURCE_OWNER, ' || + 'SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, ' || + 'APPLY_QUAL, ' || + 'SET_NAME, ' || + 'CNTL_SERVER , ' || + 'TARGET_STRUCTURE , ' || + 'CNTL_ALIAS , ' || + 'PHYS_CHANGE_OWNER , ' || + 'PHYS_CHANGE_TABLE , ' || + 'MAP_ID ' || + ') VALUES ( ' || + '''KAFKA'', ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + 'NULL, ' || + 'NULL, ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''KAFKAQUAL'', ' || + '''SET001'', ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '8, ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || + ' )'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; diff --git a/.github/db2/asncdctables.sql b/.github/db2/asncdctables.sql new file mode 100644 index 000000000..88203a7dd --- /dev/null +++ b/.github/db2/asncdctables.sql @@ -0,0 +1,479 @@ +-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150 + +CREATE TABLE ASNCDC.IBMQREP_COLVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +POSITION SMALLINT NOT NULL, +NAME VARCHAR(128) NOT NULL, +TYPE SMALLINT NOT NULL, +LENGTH INTEGER NOT NULL, +NULLS CHAR( 1) NOT NULL, +DEFAULT VARCHAR(1536), +CODEPAGE INTEGER, +SCALE INTEGER, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX +ON ASNCDC.IBMQREP_COLVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +POSITION ASC); + +CREATE INDEX ASNCDC.IX2COLVERSION +ON ASNCDC.IBMQREP_COLVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE TABLE ASNCDC.IBMQREP_TABVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +VERSION INTEGER NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_NAME VARCHAR(128) NOT NULL, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX +ON ASNCDC.IBMQREP_TABVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +VERSION ASC); + +CREATE INDEX ASNCDC.IX2TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE INDEX ASNCDC.IX3TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +SOURCE_OWNER ASC, +SOURCE_NAME ASC); + +CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL( +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021') +ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES ( +'1021'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME TIMESTAMP NOT NULL, +RESTART_TIME TIMESTAMP NOT NULL, +CURRENT_MEMORY INT NOT NULL, +CD_ROWS_INSERTED INT NOT NULL, +RECAP_ROWS_SKIPPED INT NOT NULL, +TRIGR_ROWS_SKIPPED INT NOT NULL, +CHG_ROWS_SKIPPED INT NOT NULL, +TRANS_PROCESSED INT NOT NULL, +TRANS_SPILLED INT NOT NULL, +MAX_TRANS_SIZE INT NOT NULL, +LOCKING_RETRIES INT NOT NULL, +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +LOGREADLIMIT INT NOT NULL, +CAPTURE_IDLE INT NOT NULL, +SYNCHTIME TIMESTAMP NOT NULL, +CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT , +LAST_EOL_TIME TIMESTAMP, +RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +LOGREAD_API_TIME INT, +NUM_LOGREAD_CALLS INT, +NUM_END_OF_LOGS INT, +LOGRDR_SLEEPTIME INT, +NUM_LOGREAD_F_CALLS INT, +TRANS_QUEUED INT, +NUM_WARNTXS INT, +NUM_WARNLOGAPI INT) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX +ON ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT INT, +LAG_LIMIT INT, +COMMIT_INTERVAL INT, +PRUNE_INTERVAL INT, +TRACE_LIMIT INT, +MONITOR_LIMIT INT, +MONITOR_INTERVAL INT, +MEMORY_LIMIT SMALLINT, +REMOTE_SRC_SERVER CHAR( 18), +AUTOPRUNE CHAR( 1), +TERM CHAR( 1), +AUTOSTOP CHAR( 1), +LOGREUSE CHAR( 1), +LOGSTDOUT CHAR( 1), +SLEEP_INTERVAL SMALLINT, +CAPTURE_PATH VARCHAR(1040), +STARTMODE VARCHAR( 10), +LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256, +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021', +COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021') + ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT, +LAG_LIMIT, +COMMIT_INTERVAL, +PRUNE_INTERVAL, +TRACE_LIMIT, +MONITOR_LIMIT, +MONITOR_INTERVAL, +MEMORY_LIMIT, +SLEEP_INTERVAL, +AUTOPRUNE, +TERM, +AUTOSTOP, +LOGREUSE, +LOGSTDOUT, +CAPTURE_PATH, +STARTMODE, +COMPATIBILITY) +VALUES ( +10080, +10080, +30, +300, +10080, +10080, +300, +32, +5, +'Y', +'Y', +'N', +'N', +'N', +NULL, +'WARMSI', +'1021' +); + +CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS ( + CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL + ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX + ON ASNCDC.IBMSNAP_CAPSCHEMAS + (CAP_SCHEMA_NAME ASC); + +INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES ( +'ASNCDC'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE( +OPERATION CHAR( 8) NOT NULL, +TRACE_TIME TIMESTAMP NOT NULL, +DESCRIPTION VARCHAR(1024) NOT NULL) + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX +ON ASNCDC.IBMSNAP_CAPTRACE( +TRACE_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL( +TARGET_SERVER CHAR(18) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +CNTL_SERVER CHAR( 18) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +CNTL_ALIAS CHAR( 8), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +MAP_ID VARCHAR(10) NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX +ON ASNCDC.IBMSNAP_PRUNCNTL( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC, +TARGET_TABLE ASC, +TARGET_OWNER ASC); + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1 +ON ASNCDC.IBMSNAP_PRUNCNTL( +MAP_ID ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2 +ON ASNCDC.IBMSNAP_PRUNCNTL( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3 +ON ASNCDC.IBMSNAP_PRUNCNTL( +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK( +DUMMY CHAR( 1)) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER CHAR( 18) NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX +ON ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER ASC, +APPLY_QUAL ASC, +SET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +GLOBAL_RECORD CHAR( 1) NOT NULL, +SOURCE_STRUCTURE SMALLINT NOT NULL, +SOURCE_CONDENSED CHAR( 1) NOT NULL, +SOURCE_COMPLETE CHAR( 1) NOT NULL, +CD_OWNER VARCHAR(128), +CD_TABLE VARCHAR(128), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +DISABLE_REFRESH SMALLINT NOT NULL, +CCD_OWNER VARCHAR(128), +CCD_TABLE VARCHAR(128), +CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CCD_CONDENSED CHAR( 1), +CCD_COMPLETE CHAR( 1), +ARCH_LEVEL CHAR( 4) NOT NULL, +DESCRIPTION CHAR(254), +BEFORE_IMG_PREFIX VARCHAR( 4), +CONFLICT_LEVEL CHAR( 1), +CHG_UPD_TO_DEL_INS CHAR( 1), +CHGONLY CHAR( 1), +RECAPTURE CHAR( 1), +OPTION_FLAGS CHAR( 4) NOT NULL, +STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y', +STATE CHAR( 1) WITH DEFAULT 'I', +STATE_INFO CHAR( 8)) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX +ON ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1 +ON ASNCDC.IBMSNAP_REGISTER( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2 +ON ASNCDC.IBMSNAP_REGISTER( +GLOBAL_RECORD ASC); + +ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_RESTART( +MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +MAX_COMMIT_TIME TIMESTAMP NOT NULL, +MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +CURR_COMMIT_TIME TIMESTAMP NOT NULL, +CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT , +SIGNAL_TYPE VARCHAR( 30) NOT NULL, +SIGNAL_SUBTYPE VARCHAR( 30), +SIGNAL_INPUT_IN VARCHAR(500), +SIGNAL_STATE CHAR( 1) NOT NULL, +SIGNAL_LSN VARCHAR( 16) FOR BIT DATA) +DATA CAPTURE CHANGES + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_SIGNALX +ON ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +COL_TYPE CHAR( 1) NOT NULL, +TARGET_NAME VARCHAR(128) NOT NULL, +IS_KEY CHAR( 1) NOT NULL, +COLNO SMALLINT NOT NULL, +EXPRESSION VARCHAR(1024) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX +ON ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC, +TARGET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY; + +--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX +--ON ASNCDC.IBMSNAP_SUBS_EVENT( +--EVENT_NAME ASC, +--EVENT_TIME ASC); + + +--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +TARGET_CONDENSED CHAR( 1) NOT NULL, +TARGET_COMPLETE CHAR( 1) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +PREDICATES VARCHAR(1024), +MEMBER_STATE CHAR( 1), +TARGET_KEY_CHG CHAR( 1) NOT NULL, +UOW_CD_PREDICATES VARCHAR(1024), +JOIN_UOW_CD CHAR( 1), +LOADX_TYPE SMALLINT, +LOADX_SRC_N_OWNER VARCHAR( 128), +LOADX_SRC_N_TABLE VARCHAR(128)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX +ON ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SET_TYPE CHAR( 1) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +ACTIVATE SMALLINT NOT NULL, +SOURCE_SERVER CHAR( 18) NOT NULL, +SOURCE_ALIAS CHAR( 8), +TARGET_SERVER CHAR( 18) NOT NULL, +TARGET_ALIAS CHAR( 8), +STATUS SMALLINT NOT NULL, +LASTRUN TIMESTAMP NOT NULL, +REFRESH_TYPE CHAR( 1) NOT NULL, +SLEEP_MINUTES INT, +EVENT_NAME CHAR( 18), +LASTSUCCESS TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CAPTURE_SCHEMA VARCHAR(128) NOT NULL, +TGT_CAPTURE_SCHEMA VARCHAR(128), +FEDERATED_SRC_SRVR VARCHAR( 18), +FEDERATED_TGT_SRVR VARCHAR( 18), +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +OPTION_FLAGS CHAR( 4) NOT NULL, +COMMIT_COUNT SMALLINT, +MAX_SYNCH_MINUTES SMALLINT, +AUX_STMTS SMALLINT NOT NULL, +ARCH_LEVEL CHAR( 4) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX +ON ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +BEFORE_OR_AFTER CHAR( 1) NOT NULL, +STMT_NUMBER SMALLINT NOT NULL, +EI_OR_CALL CHAR( 1) NOT NULL, +SQL_STMT VARCHAR(1024), +ACCEPT_SQLSTATES VARCHAR( 50)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX +ON ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +BEFORE_OR_AFTER ASC, +STMT_NUMBER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_UOW( +IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL, +IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +IBMSNAP_LOGMARKER TIMESTAMP NOT NULL, +IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL, +IBMSNAP_AUTHID VARCHAR(128) NOT NULL, +IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT , +IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX +ON ASNCDC.IBMSNAP_UOW( +IBMSNAP_COMMITSEQ ASC, +IBMSNAP_LOGMARKER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPENQ ( + LOCK_NAME CHAR(9 OCTETS) + ) + ORGANIZE BY ROW + DATA CAPTURE NONE + COMPRESS NO; diff --git a/.github/db2/cdcsetup.sh b/.github/db2/cdcsetup.sh new file mode 100755 index 000000000..c5d963d5b --- /dev/null +++ b/.github/db2/cdcsetup.sh @@ -0,0 +1,18 @@ +#/bin/bash + +if [ ! -f /asncdctools/src/asncdc.nlk ]; then +rc=1 +echo "waiting for db2inst1 exists ." +while [ "$rc" -ne 0 ] +do + sleep 5 + id db2inst1 + rc=$? + echo '.' +done + +su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1 +fi +touch /asncdctools/src/asncdc.nlk + +echo "CDC setup completed." diff --git a/.github/db2/custom-init/cleanup_storage.sh b/.github/db2/custom-init/cleanup_storage.sh new file mode 100644 index 000000000..9f0c458fd --- /dev/null +++ b/.github/db2/custom-init/cleanup_storage.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +################################################################################ +# Wipes the storage directory. +# Note that this essentially makes the database non-persistent when pod is deleted +################################################################################ + +echo "Inspecting database directory" +ls $STORAGE_DIR + +echo "Wiping database directory" +rm -rf $STORAGE_DIR/* diff --git a/.github/db2/dbsetup.sh b/.github/db2/dbsetup.sh new file mode 100755 index 000000000..9caf3d913 --- /dev/null +++ b/.github/db2/dbsetup.sh @@ -0,0 +1,51 @@ +#/bin/bash +echo "Compile ASN tool ..." +cd /asncdctools/src +/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc + +DBNAME=$1 +DB2DIR=/opt/ibm/db2/V11.5 +rc=1 +echo "Waiting for DB2 start ( $DBNAME ) ." +while [ "$rc" -ne 0 ] +do + sleep 5 + db2 connect to $DBNAME + rc=$? + echo '.' +done + +# enable metacatalog read via JDBC +cd $HOME/sqllib/bnd +db2 bind db2schema.bnd blocking all grant public sqlerror continue + +# do a backup and restart the db +db2 backup db $DBNAME to /dev/null +db2 restart db $DBNAME + +db2 connect to $DBNAME + +cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function +chmod 777 /database/config/db2inst1/sqllib/function + +# add UDF / start stop asncap +db2 -tvmf /asncdctools/src/asncdc_UDF.sql + +# create asntables +db2 -tvmf /asncdctools/src/asncdctables.sql + +# add UDF / add remove asntables + +db2 -tvmf /asncdctools/src/asncdcaddremove.sql + + + + +# create sample table and datat +db2 -tvmf /asncdctools/src/inventory.sql +db2 -tvmf /asncdctools/src/startup-agent.sql +sleep 10 +db2 -tvmf /asncdctools/src/startup-cdc-demo.sql + + +echo "done" diff --git a/.github/db2/inventory.sql b/.github/db2/inventory.sql new file mode 100644 index 000000000..1d441d590 --- /dev/null +++ b/.github/db2/inventory.sql @@ -0,0 +1,77 @@ + +-- Create and populate our products using a single insert with many rows +CREATE TABLE DB2INST1.PRODUCTS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','16oz carpenter''s hammer',1.0); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('rocks','box of assorted rocks',5.3); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('spare tire','24 inch spare tire',22.2); + +CREATE TABLE DB2INST1.PRODUCTS_ON_HAND ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (101,3); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (102,8); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (103,18); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (104,4); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (105,5); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (106,0); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (107,44); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (108,2); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (109,5); + +CREATE TABLE DB2INST1.CUSTOMERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 1001, INCREMENT BY 1) PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); + +CREATE TABLE DB2INST1.ORDERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 10001, INCREMENT BY 1) PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES DB2INST1.CUSTOMERS(id), + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-16', 1001, 1, 102); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-17', 1002, 2, 105); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-19', 1002, 2, 106); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-21', 1003, 1, 107); diff --git a/.github/db2/openshift_entrypoint.sh b/.github/db2/openshift_entrypoint.sh new file mode 100755 index 000000000..ae9521f0e --- /dev/null +++ b/.github/db2/openshift_entrypoint.sh @@ -0,0 +1,19 @@ +#!/bin/bash +################################################################################ +# Runs custom init scripts followed by the original entry point +############################################################################### + +source ${SETUPDIR?}/include/db2_constants +source ${SETUPDIR?}/include/db2_common_functions + +if [[ -d /var/custom-init ]]; then + echo "(*) Running user-provided init scripts ... " + chmod -R 777 /var/custom-init + for script in `ls /var/custom-init`; do + echo "(*) Running $script ..." + /var/custom-init/$script + done +fi + +echo "Running original entry point" +/var/db2_setup/lib/setup_db2_instance.sh diff --git a/.github/db2/startup-agent.sql b/.github/db2/startup-agent.sql new file mode 100644 index 000000000..ae9150e34 --- /dev/null +++ b/.github/db2/startup-agent.sql @@ -0,0 +1 @@ +VALUES ASNCDC.ASNCDCSERVICES('start','asncdc'); \ No newline at end of file diff --git a/.github/db2/startup-cdc-demo.sql b/.github/db2/startup-cdc-demo.sql new file mode 100644 index 000000000..b9efc155e --- /dev/null +++ b/.github/db2/startup-cdc-demo.sql @@ -0,0 +1,9 @@ + +VALUES ASNCDC.ASNCDCSERVICES('status','asncdc'); + +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS_ON_HAND' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'CUSTOMERS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'ORDERS' ); + +VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc'); \ No newline at end of file diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index f45f9a8d5..d626dd9f0 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -73,7 +73,26 @@ services: image: ghcr.io/cockroachdb/cdc-sink/firestore-emulator:latest # Expose the emulator on port 8181 to avoid conflict with CRDB admin UI. ports: - - "8181:8080" + - "8181:8080" + db2-v11.5.9: + # Using a DB2 image with additional SQL replication utilities. + # To build the image use .github/db2 + # Original is at icr.io/db2_community/db2 + build: ./db2 + privileged: True + ports: + - 50000:50000 + environment: + - PERSISTENT_HOME=false + - LICENSE=accept + - DBNAME=TESTDB + - DB2INST1_PASSWORD=SoupOrSecret + healthcheck: + # The setup scripts creates /asncdctools/src/asncdc.nlk when CDC is up. + # TODO (silvano) improve start up time + test: bash -c '[ -f /asncdctools/src/asncdc.nlk ] ' + interval: 1s + retries: 600 mysql-v5.7: image: mysql:5.7 platform: linux/x86_64 diff --git a/.github/workflows/go-test-db2.yaml b/.github/workflows/go-test-db2.yaml new file mode 100644 index 000000000..92874a906 --- /dev/null +++ b/.github/workflows/go-test-db2.yaml @@ -0,0 +1,207 @@ +# Copyright 2023 The Cockroach Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +name: DB2 Tests +permissions: + contents: read + packages: read +on: + workflow_call: + workflow_dispatch: +# on: +# push: +# branches: [ sr8_db2 ] +jobs: + # Static code-quality checks. + code-quality: + name: Code Quality + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Ensure binary starts + if: ${{ !cancelled() }} + run: go run . help + + # Integration matrix tests for all supported CRDB and source DBs. + tests: + name: Integration Tests + runs-on: ${{ matrix.runs-on || 'ubuntu-latest-8-core' }} + timeout-minutes: 20 + strategy: + fail-fast: false + # Refer to the CRDB support policy when determining how many + # older releases to support. + # https://www.cockroachlabs.com/docs/releases/release-support-policy.html + # + # This matrix is explicit, since we have a few axes (target vs + # integration) that can't be expressed with the automatic + # cross-product behavior provided by the matrix operator. + matrix: + include: + - cockroachdb: v23.1 + integration: db2-v11.5.9 + + env: + COVER_OUT: coverage-${{ strategy.job-index }}.out + DOCKER_LOGS_OUT: docker-${{ strategy.job-index }}.log + FIRESTORE_EMULATOR_HOST: 127.0.0.1:8181 + JUNIT_OUT: junit-${{ strategy.job-index }}.xml + TEST_OUT: go-test-${{ strategy.job-index }}.json + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + # Ensure we can grab any private images we need for testing. + - name: Log in to GitHub Package Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Start Containers + working-directory: .github + run: > + docker compose up -d --wait + cockroachdb-${{ matrix.cockroachdb }} + ${{ matrix.integration }} + ${{ matrix.source }} + ${{ matrix.target }} + + # The go test json output will be written into a pipeline to + # create a JUnit.xml file. The test reports are aggregated later + # on to produce a nicer summary of the test output in the GitHub + # Actions UI. + # + # Inspired by + # https://www.cloudwithchris.com/blog/githubactions-testsummary-go/ + - name: Go Tests + env: + COCKROACH_DEV_LICENSE: ${{ secrets.COCKROACH_DEV_LICENSE }} + CDC_INTEGRATION: ${{ matrix.integration }} + TEST_SOURCE_CONNECT: ${{ matrix.sourceConn }} + TEST_TARGET_CONNECT: ${{ matrix.targetConn }} + IBM_DRIVER: /tmp/drivers/clidriver + CGO_CFLAGS: -I${IBM_DRIVER}/include + CGO_LDFLAGS: -L${IBM_DRIVER}/lib + run: > + set -o pipefail; + make testdb2 2>&1 | + go run github.com/jstemmer/go-junit-report/v2 + -iocopy + -out ${{ env.JUNIT_OUT }} + -p cockroachdb=${{ matrix.cockroachdb }} + -p integration=${{ matrix.integration }} + -package-name ${{ matrix.cockroachdb }}-${{ matrix.integration }} | + tee ${{ env.TEST_OUT }} + + - name: Upload coverage + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ${{ env.COVER_OUT }} + + # Capture container logs in case they're needed for diagnostics. + - name: Docker container logs + if: always() + working-directory: .github + run: docker-compose logs --no-color > ${{ env.DOCKER_LOGS_OUT }} + + # Upload all test reports to a common artifact name, to make them + # available to the summarization step. The go test json is + # uploaded as a developer convenience. + - name: Stash test logs + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-reports + path: | + ${{ env.COVER_OUT }} + ${{ env.DOCKER_LOGS_OUT }} + ${{ env.JUNIT_OUT }} + ${{ env.TEST_OUT }} + retention-days: 7 + + # Aggregate the results of multiple jobs within this workflow into a + # single status object that we can use for branch protection. + # + # https://docs.github.com/en/rest/commits/statuses + status: + name: Create status objects + runs-on: ubuntu-latest + permissions: + statuses: write + needs: # Update failure case below + - code-quality + - tests + if: ${{ always() }} + env: + CONTEXT: Workflow Golang + GH_TOKEN: ${{ github.token }} + MERGE_SHA: ${{ github.event.merge_group.head_sha }} + PR_SHA: ${{ github.event.pull_request.head.sha }} + STATE: success + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + steps: + - name: Failure + if: ${{ needs.code-quality.result != 'success' || needs.tests.result != 'success' }} + run: echo "STATE=failure" >> $GITHUB_ENV + - name: Report + run: | + set -eo pipefail + + if [ ! -z "$PR_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$PR_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + if [ ! -z "$MERGE_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$MERGE_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + # This job downloads the test log files generated in the integration + # job and summarizes them into the GitHub Actions UI. + summarize-tests: + name: Test summaries + runs-on: ubuntu-latest + needs: tests + if: ${{ always() }} + steps: + - name: Download reports + uses: actions/download-artifact@v3 + with: + name: integration-reports + - name: Summarize + uses: test-summary/action@v2 + with: + paths: junit-*.xml diff --git a/.github/workflows/go-tests.yaml b/.github/workflows/go-tests.yaml index 65f85363b..efb519eb0 100644 --- a/.github/workflows/go-tests.yaml +++ b/.github/workflows/go-tests.yaml @@ -43,7 +43,7 @@ jobs: - name: Copyright headers if: ${{ !cancelled() }} - run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore '**/testdata/**/*.sql' . + run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore ".github/db2/**" -ignore '**/testdata/**/*.sql' . - name: Lint if: ${{ !cancelled() }} diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index ea98c18ad..637cbac50 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -82,6 +82,19 @@ jobs: statuses: write secrets: inherit + go-db2: + uses: ./.github/workflows/go-test-db2.yaml + needs: + - go-build-cache + # We use the merge queue prior to pushing to a branch, so there's no + # reason to repeat tests that just ran. + if: ${{ github.event_name != 'push' }} + permissions: + contents: read + packages: read + statuses: write + secrets: inherit + go-wiki: uses: ./.github/workflows/go-wiki.yaml needs: diff --git a/.gitignore b/.gitignore index 48edd5f21..d9badf399 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,12 @@ cdc-sink # Dependency directories (remove the comment below to include it) # vendor/ +# Temporary directory +tmp/ + +# Downloaded drivers +drivers/ + # CockroachDB data directory cockroach-data/ goroutine_dump/ diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..770237492 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ + +PWD = $(shell pwd) +COVER_OUT ?= cover.out +IBM_DRIVER = ${PWD}/drivers/clidriver +export CGO_CFLAGS=-I${IBM_DRIVER}/include +export CGO_LDFLAGS=-L${IBM_DRIVER}/lib +export DYLD_LIBRARY_PATH=${IBM_DRIVER}/lib +export LD_LIBRARY_PATH=${IBM_DRIVER}/lib + +.PHONY: all clidriver db2 clean realclean testdb2 + +all: cdc-sink + +cdc-sink: + go build + +clidriver: + go run . db2install --dest drivers + +db2: clidriver + go build -tags db2 + +clean: + rm cdc-sink + +realclean: clean + rm -rf drivers + +testdb2: clidriver + go test -tags db2 -count 1 -coverpkg=./internal/source/db2 \ + -covermode=atomic \ + -coverprofile=${COVER_OUT} \ + -race \ + -v ./internal/source/db2 diff --git a/go.mod b/go.mod index 6a278285b..675da9865 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/google/subcommands v1.2.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/ibmdb/go_ibm_db v0.4.5 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index 056a3b3fb..03ad33e3c 100644 --- a/go.sum +++ b/go.sum @@ -296,6 +296,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/ibmdb/go_ibm_db v0.4.5 h1:a0qKWbA5shCRo5HRRnAuENwhjg6AkgfPNr9xx0IZ160= +github.com/ibmdb/go_ibm_db v0.4.5/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP49UTmEvg= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= diff --git a/internal/cmd/db2/db2.go b/internal/cmd/db2/db2.go new file mode 100644 index 000000000..4047706e9 --- /dev/null +++ b/internal/cmd/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains a command to perform logical replication +// from an IBM DB2 server. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/db2" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/spf13/cobra" +) + +// Command returns the pglogical subcommand. +func Command() *cobra.Command { + cfg := &db2.Config{} + return stdlogical.New(&stdlogical.Template{ + Bind: cfg.Bind, + Short: "start a DB2 replication feed", + Start: func(ctx *stopper.Context, cmd *cobra.Command) (any, error) { + return db2.Start(ctx, cfg) + }, + Use: "db2", + }) +} diff --git a/internal/cmd/db2/install.go b/internal/cmd/db2/install.go new file mode 100644 index 000000000..0f00637c1 --- /dev/null +++ b/internal/cmd/db2/install.go @@ -0,0 +1,226 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "archive/tar" + "archive/zip" + "compress/gzip" + "fmt" + "io" + "io/fs" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +const ( + defaultURL = "https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/" + defaultDest = "drivers" +) + +// Install downloads and installs the IBM DB2 ODBC driver. +// Because of license restrictions, the driver cannot be under +// source control. It needs to be downloaded when used. +func Install() *cobra.Command { + var downloadURL, dest string + cmd := &cobra.Command{ + Args: cobra.NoArgs, + Short: "installs the db2 driver", + Use: "db2install", + RunE: func(cmd *cobra.Command, args []string) error { + driverPath := path.Join(dest, "clidriver") + _, err := os.Stat(driverPath) + _, includeErr := os.Stat(path.Join(driverPath, "include")) + _, libErr := os.Stat(path.Join(driverPath, "lib")) + if os.IsNotExist(err) || os.IsNotExist(includeErr) || os.IsNotExist(libErr) { + return install(dest, downloadURL) + } + if err != nil { + return err + } + if includeErr != nil { + return includeErr + } + if libErr != nil { + return libErr + } + fmt.Println("Driver already installed") + return nil + }, + } + cmd.Flags().StringVar(&dest, "dest", defaultDest, + "destination dir") + cmd.Flags().StringVar(&downloadURL, "url", defaultURL, + "url to download the driver") + return cmd +} + +// copy creates a file and copies the content from the source reader. +func copy(path string, src io.Reader, mode fs.FileMode) error { + writer, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, mode) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + defer writer.Close() + if _, err = io.Copy(writer, src); err != nil { + return errors.Wrapf(err, "failed to extract file") + } + return nil +} + +// download a package to specified temp destination. +func download(tmpFile, baseURL, pkg string) error { + out, err := os.Create(tmpFile) + if err != nil { + return err + } + defer out.Close() + pkgURL, err := url.JoinPath(baseURL, pkg) + if err != nil { + return err + } + resp, err := http.Get(pkgURL) + if err != nil { + return err + } + defer resp.Body.Close() + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + return nil +} + +// extractTar extracts the content of the tar file into the +// target directory +func extractTar(sourcefile string, targetDirectory string) error { + stream, err := os.Open(sourcefile) + if err != nil { + return err + } + defer stream.Close() + uncompressedStream, err := gzip.NewReader(stream) + if err != nil { + return err + } + defer uncompressedStream.Close() + tarReader := tar.NewReader(uncompressedStream) + for { + header, err := tarReader.Next() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + target := path.Join(targetDirectory, strings.TrimPrefix(header.Name, "/")) + fmt.Println(target) + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(target, 0755); err != nil { + return errors.Wrapf(err, "failed to extract file") + } + case tar.TypeReg: + err := copy(target, tarReader, header.FileInfo().Mode()) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + case tar.TypeSymlink: + err := os.Symlink(header.Linkname, target) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + default: + return errors.Errorf( + "uknown file type: %s in %s", + string(header.Typeflag), + header.Name) + } + } +} + +// extractZip extracts the content of the zip file into the +// target directory +func extractZip(sourcefile string, targetDirectory string) error { + reader, err := zip.OpenReader(sourcefile) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + defer reader.Close() + for _, f := range reader.Reader.File { + zipped, err := f.Open() + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + defer zipped.Close() + path := filepath.Join(targetDirectory, f.Name) + if f.FileInfo().IsDir() { + err := os.MkdirAll(path, f.Mode()) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + } else { + err := copy(path, zipped, f.Mode()) + if err != nil { + return errors.Wrapf(err, "failed to extract file") + } + } + } + return nil +} + +// install the IBM DB2 driver for the runtime platform into +// the specified directory +func install(target, baseURL string) error { + var pkgName string + const wordsize = 32 << (^uint(0) >> 32 & 1) + switch runtime.GOOS { + case "darwin": + pkgName = "macos64_odbc_cli.tar.gz" + case "windows": + pkgName = fmt.Sprintf("ntx%d_odbc_cli.zip", wordsize) + case "linux": + switch runtime.GOARCH { + case "amd64": + pkgName = "linuxx64_odbc_cli.tar.gz" + default: + return errors.Errorf("unknown arch %q", runtime.GOARCH) + } + default: + return errors.Errorf("unknown OS %q", runtime.GOOS) + } + fmt.Printf("Installing %s in %s\n", pkgName, target) + tmpFile := path.Join(os.TempDir(), pkgName) + defer os.Remove(tmpFile) + err := download(tmpFile, baseURL, pkgName) + if err != nil { + return err + } + fmt.Println("download successful") + if strings.HasSuffix(pkgName, ".zip") { + return extractZip(tmpFile, target) + } + return extractTar(tmpFile, target) +} diff --git a/internal/sinktest/all/integration.go b/internal/sinktest/all/integration.go index 666fa7573..66820dfbf 100644 --- a/internal/sinktest/all/integration.go +++ b/internal/sinktest/all/integration.go @@ -29,6 +29,10 @@ const ( // integration tests for some databases. We expect this to be of the // format "database-v123". IntegrationEnvName = "CDC_INTEGRATION" + // DB2Name must be kept in alignment with the + // .github/docker-compose.yml file and the integration matrix + // variable in workflows/tests.yaml. + DB2Name = "db2" // FirestoreName must be kept in alignment with the // .github/docker-compose.yml file and the integration matrix // variable in workflows/tests.yaml. diff --git a/internal/sinktest/scripttest/testdata/logical_test_db2.ts b/internal/sinktest/scripttest/testdata/logical_test_db2.ts new file mode 100644 index 000000000..7b4f2fb85 --- /dev/null +++ b/internal/sinktest/scripttest/testdata/logical_test_db2.ts @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; +import {Document, Table} from "cdc-sink@v1"; + +// The sentinel name will be replaced by the test rig. It would normally be +// "my_db.public" or "my_db" depending on the target product. +api.configureSource("{{ SCHEMA }}", { + dispatch: (doc: Document, meta: Document): Record => { + console.trace(JSON.stringify(doc), JSON.stringify(meta)); + let ret: Record = {}; + ret[meta.table] = [ + { + PK: doc.PK, + ignored: 'by_configuration_below', + v_dispatched: doc.V, // Rename the property + } + ]; + return ret + } +}) + +// We introduce an unknown column in the dispatch function above. +// We'll add an extra configuration here to ignore it. +// The sentinel name would be replaced by "my_table". +api.configureTable("{{ TABLE }}", { + map: (doc: Document): Document => { + console.trace("map", JSON.stringify(doc)); + if (doc.v_dispatched === undefined) { + throw "did not find expected property"; + } + doc.v_mapped = doc.v_dispatched; + delete doc.v_dispatched; // Avoid schema-drift error due to unknown column. + return doc; + }, + ignore: { + "ignored": true, + } +}) diff --git a/internal/source/db2/config.go b/internal/source/db2/config.go new file mode 100644 index 000000000..63a7659d6 --- /dev/null +++ b/internal/source/db2/config.go @@ -0,0 +1,94 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "net/url" + "strconv" + "strings" + + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +// Config contains the configuration necessary for creating a +// replication connection to a DB2 server. SourceConn is mandatory. +type Config struct { + logical.BaseConfig + logical.LoopConfig + + SourceConn string // Connection string for the source db. + SourceSchema ident.Schema // The schema we are replicating. + + // The fields below are extracted by Preflight. + host string + password string + port uint16 + user string + database string +} + +var schema string + +// Bind adds flags to the set. It delegates to the embedded Config.Bind. +func (c *Config) Bind(f *pflag.FlagSet) { + c.BaseConfig.Bind(f) + c.LoopConfig.LoopName = "db2" + c.LoopConfig.Bind(f) + f.StringVar(&c.SourceConn, "sourceConn", "", + "the source database's connection string") + f.StringVar(&schema, "sourceSchema", "", "the source schema") + +} + +// Preflight updates the configuration with sane defaults or returns an +// error if there are missing options for which a default cannot be +// provided. +func (c *Config) Preflight() error { + if err := c.BaseConfig.Preflight(); err != nil { + return err + } + if err := c.LoopConfig.Preflight(); err != nil { + return err + } + if c.LoopName == "" { + return errors.New("no LoopName was configured") + } + if schema != "" { + c.SourceSchema = ident.MustSchema(ident.New(schema)) + } + if c.SourceConn == "" { + return errors.New("no SourceConn was configured") + } + u, err := url.Parse(c.SourceConn) + if err != nil { + return err + } + port, err := strconv.ParseUint(u.Port(), 0, 16) + if err != nil { + return errors.Wrapf(err, "invalid port %s", u.Port()) + } + c.host = u.Hostname() + c.port = uint16(port) + c.user = u.User.Username() + c.password, _ = u.User.Password() + c.database, _ = strings.CutPrefix(u.Path, "/") + + return nil +} diff --git a/internal/source/db2/conn.go b/internal/source/db2/conn.go new file mode 100644 index 000000000..a4c5aa6ba --- /dev/null +++ b/internal/source/db2/conn.go @@ -0,0 +1,279 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "encoding/json" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// conn is responsible to pull the mutation from the DB2 staging tables +// and applying them to the target database via the logical.Batch interface. +// The process leverages the DB2 SQL replication, similar to +// the debezium DB2 connector. +// To set up DB2 for SQL replication, see the instructions at +// https://debezium.io/documentation/reference/stable/connectors/db2.html#setting-up-db2 +// Note: In DB2 identifiers (e.g. table names, column names) are converted to +// uppercase, unless they are in quotes. The target schema must use the +// same convention. +type conn struct { + columns *ident.TableMap[[]types.ColData] + primaryKeys *ident.TableMap[map[int]int] + config *Config +} + +var ( + _ diag.Diagnostic = (*conn)(nil) + _ logical.Dialect = (*conn)(nil) +) + +// New instantiates a connection to the DB2 server. +func New(config *Config) logical.Dialect { + return &conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } +} + +// Process implements logical.Dialect. +func (c *conn) Process( + ctx *stopper.Context, ch <-chan logical.Message, events logical.Events, +) error { + var startBatch time.Time + var mutInBatch int + var batch logical.Batch + defer func() { + log.Warn("db2.Process done. Rolling back") + if batch != nil { + rollbackBatchSize.Observe(float64(mutInBatch)) + _ = batch.OnRollback(ctx) + } + }() + for msg := range ch { + // Ensure that we resynchronize. + if logical.IsRollback(msg) { + if batch != nil { + rollbackBatchSize.Observe(float64(mutInBatch)) + if err := batch.OnRollback(ctx); err != nil { + return err + } + batch = nil + } + continue + } + + ev, _ := msg.(message) + switch ev.op { + case beginOp: + var err error + batch, err = events.OnBegin(ctx) + mutInBatch = 0 + startBatch = time.Now() + if err != nil { + return err + } + case endOp: + select { + case err := <-batch.OnCommit(ctx): + if err != nil { + return err + } + batchSize.Observe(float64(mutInBatch)) + batchLatency.Observe(float64(time.Since(startBatch).Seconds())) + batch = nil + case <-ctx.Done(): + return ctx.Err() + } + + if err := events.SetConsistentPoint(ctx, &ev.lsn); err != nil { + return err + } + case insertOp, updateOp, deleteOp: + sourceTable := ident.NewTable( + ident.MustSchema(ident.New(ev.table.sourceOwner)), + ident.New(ev.table.sourceTable)) + targetTable := ident.NewTable( + c.config.TargetSchema.Schema(), + ident.New(ev.table.sourceTable)) + targetCols, ok := c.columns.Get(sourceTable) + if !ok { + return errors.Errorf("unable to retrieve target columns for %s", sourceTable) + } + primaryKeys, ok := c.primaryKeys.Get(sourceTable) + if !ok { + return errors.Errorf("unable to retrieve primary keys for %s", sourceTable) + } + var err error + var mut types.Mutation + key := make([]any, len(primaryKeys)) + enc := make(map[string]any) + for idx, sourceCol := range ev.values { + targetCol := targetCols[idx] + switch s := sourceCol.(type) { + case nil: + enc[targetCol.Name.Raw()] = nil + default: + enc[targetCol.Name.Raw()] = s + } + if targetCol.Primary { + key[primaryKeys[idx]] = sourceCol + } + } + mut.Key, err = json.Marshal(key) + if err != nil { + return err + } + if ev.op != deleteOp { + mut.Data, err = json.Marshal(enc) + if err != nil { + return err + } + } + mutInBatch++ + mutationCount.With(prometheus.Labels{ + "table": sourceTable.Raw(), + "op": ev.op.String()}).Inc() + log.Tracef("%s %s %s %v\n", ev.op, sourceTable.Raw(), targetTable.Raw(), key) + script.AddMeta("db2", targetTable, &mut) + err = batch.OnData(ctx, script.SourceName(targetTable), targetTable, []types.Mutation{mut}) + if err != nil { + return err + } + default: + log.Warnf("unknown op %v", ev.op) + } + } + return nil +} + +// ReadInto implements logical.Dialect. +func (c *conn) ReadInto( + ctx *stopper.Context, ch chan<- logical.Message, state logical.State, +) error { + db, err := c.Open() + if err != nil { + return errors.Wrap(err, "failed to open a connection to the target db") + } + defer func() { + log.Warn("db2.ReadInto done. Closing database connection") + err := db.Close() + log.Info("Db closed.", err) + }() + cp, _ := state.GetConsistentPoint() + if cp == nil { + return errors.New("missing lsn") + } + previousLsn, _ := cp.(*lsn) + for { + nextLsn, err := c.getNextLsn(ctx, db, previousLsn) + if err != nil { + return errors.Wrap(err, "cannot retrieve next sequence number") + } + log.Debugf("NEXT [%x]\n", nextLsn.Value) + if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) { + select { + case <-time.After(1 * time.Second): + continue + case <-ctx.Stopping(): + return nil + } + } + log.Debugf("BEGIN [%x]\n", previousLsn.Value) + select { + case ch <- message{ + op: beginOp, + lsn: *nextLsn, + }: + case <-ctx.Stopping(): + return nil + } + + lsnRange := lsnRange{ + from: previousLsn, + to: nextLsn, + } + tables, err := c.getTables(ctx, db, c.config.SourceSchema) + if err != nil { + log.Error("getTables failed", err) + return errors.Wrap(err, "cannot get DB2 CDC tables") + } + for _, t := range tables { + err := c.fetchColumnMetadata(ctx, db, &t) + if err != nil { + log.Error("fetchColumnMetadata failed", err) + return errors.Wrap(err, "cannot retrieve colum names") + } + } + + for _, tbl := range tables { + start := time.Now() + count, err := c.postMutations(ctx, db, tbl, lsnRange, ch) + if err != nil { + log.Error("postMutations failed", err) + return errors.Wrap(err, "cannot post mutation") + } + if ctx.IsStopping() { + return nil + } + log.Debugf("post mutations for %s starting at %x. Count %d. Time %d.", tbl.sourceTable, + lsnRange.from.Value, count, + time.Since(start).Milliseconds()) + queryLatency.With(prometheus.Labels{ + "table": tbl.sourceTable}).Observe(time.Since(start).Seconds()) + } + + select { + case ch <- message{ + op: endOp, + lsn: *nextLsn, + }: + case <-ctx.Stopping(): + return nil + } + log.Debugf("COMMIT [%x]\n", nextLsn.Value) + previousLsn = nextLsn + } + +} + +// ZeroStamp implements logical.Dialect. +func (c *conn) ZeroStamp() stamp.Stamp { + return lsnZero() +} + +// Diagnostic implements diag.Diagnostic. +func (c *conn) Diagnostic(_ context.Context) any { + return map[string]any{ + "hostname": c.config.host, + "database": c.config.database, + "defaultLsn": c.config.DefaultConsistentPoint, + "columns": c.columns, + } +} diff --git a/internal/source/db2/db2.go b/internal/source/db2/db2.go new file mode 100644 index 000000000..73d0f4cb0 --- /dev/null +++ b/internal/source/db2/db2.go @@ -0,0 +1,40 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains support for reading a db2 sql replication feed. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" +) + +// DB2 is a logical replication loop for the IBM DB2 database. +// It leverages SQL replication on the source. +type DB2 struct { + Diagnostics *diag.Diagnostics + Loop *logical.Loop +} + +var ( + _ stdlogical.HasDiagnostics = (*DB2)(nil) +) + +// GetDiagnostics implements [stdlogical.HasDiagnostics]. +func (d *DB2) GetDiagnostics() *diag.Diagnostics { + return d.Diagnostics +} diff --git a/internal/source/db2/driver.go b/internal/source/db2/driver.go new file mode 100644 index 000000000..99209ccad --- /dev/null +++ b/internal/source/db2/driver.go @@ -0,0 +1,22 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import _ "github.com/ibmdb/go_ibm_db" diff --git a/internal/source/db2/injector.go b/internal/source/db2/injector.go new file mode 100644 index 000000000..d461d2bcb --- /dev/null +++ b/internal/source/db2/injector.go @@ -0,0 +1,46 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build wireinject +// +build wireinject + +package db2 + +import ( + "context" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/staging" + "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + panic(wire.Build( + wire.Bind(new(context.Context), new(*stopper.Context)), + wire.Bind(new(logical.Config), new(*Config)), + wire.Struct(new(DB2), "*"), + Set, + diag.New, + logical.Set, + script.Set, + staging.Set, + target.Set, + )) +} diff --git a/internal/source/db2/integration_test.go b/internal/source/db2/integration_test.go new file mode 100644 index 000000000..672c38b4f --- /dev/null +++ b/internal/source/db2/integration_test.go @@ -0,0 +1,619 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import ( + "context" + "database/sql" + "fmt" + "math" + "os" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sinktest" + "github.com/cockroachdb/cdc-sink/internal/sinktest/all" + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + _ "github.com/ibmdb/go_ibm_db" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const defaultSourceConn = "db2://db2inst1:SoupOrSecret@localhost:50000/TESTDB" + +// TODO (silvano): provide a configuration option. +var cdcSchema = ident.MustSchema(ident.New("ASNCDC")) + +type fixtureConfig struct { + backfill bool + chaosProb float32 + immediate bool + script bool +} + +type tableInfo struct { + name ident.Ident + sourceColName ident.Ident + sourceColType string + targetColName ident.Ident + targetColType string +} + +func TestMain(m *testing.M) { + all.IntegrationMain(m, all.DB2Name) +} +func TestB2Logical(t *testing.T) { + t.Run("consistent", func(t *testing.T) { testDB2Logical(t, &fixtureConfig{}) }) + t.Run("consistent-backfill", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{backfill: true}) + }) + t.Run("consistent-backfill-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{backfill: true, chaosProb: 0.0005}) + }) + t.Run("consistent-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{chaosProb: 0.0005}) + }) + t.Run("consistent-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{script: true}) + }) + t.Run("immediate", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true}) + }) + t.Run("immediate-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true, chaosProb: 0.0005}) + }) + t.Run("immediate-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true, script: true}) + }) +} + +func testDB2Logical(t *testing.T, fc *fixtureConfig) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDB2LogicalInt(t, fixture, fc) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} +func testDB2LogicalInt(t *testing.T, fixture *base.Fixture, fc *fixtureConfig) { + a := assert.New(t) + r := require.New(t) + ctx := fixture.Context + // Using uppercase for consistency + tableName := ident.New("T") + targetColName := "V" + if fc.script { + targetColName = "v_mapped" + } + table := tableInfo{ + name: tableName, + sourceColName: ident.New("V"), + sourceColType: "varchar(20)", + targetColName: ident.New(targetColName), + targetColType: "string", + } + + repl, config, err := createRepl(ctx, fixture, fc, tableName) + r.NoError(err) + defer repl.cleanUp(ctx) + + tableMap, cancel, err := repl.createTables(ctx, table) + r.NoError(err) + defer cancel() + start := time.Now() + // Make sure CDC is started in the source + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + const rowCount = 512 + values := make([]any, rowCount) + for i := 0; i < rowCount; i++ { + values[i] = fmt.Sprintf("v=%d", i) + } + // Insert data into source table. + start = time.Now() + err = repl.insertValues(ctx, tableMap.source, values, false) + r.NoError(err) + log.Infof("Inserted rows in %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + start = time.Now() + for { + var count int + if err := repl.target.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", tableMap.target)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + log.Infof("Rows replicated %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET %s= 'updated'`, tableMap.source, table.sourceColName)) + r.NoError(err) + // Wait for the update to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE "PK" < 50`, tableMap.source)) + r.NoError(err) + // Wait for the deletes to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount-50 { + break + } + time.Sleep(1 * time.Second) + } + + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +func TestDataTypes(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDataTypes(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testDataTypes(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + // Build a long value for string and byte types. + var sb strings.Builder + shortString := "0123456789ABCDEF" + for sb.Len() < 1<<16 { + sb.WriteString(shortString) + } + longString := sb.String() + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{immediate: false}, ident.Ident{}) + r.NoError(err) + defer repl.cleanUp(ctx) + + tcs := []struct { + source string + size string + crdb string + values []any // We automatically test for NULL below. + }{ + + // BIGINT(size) + {`bigint`, ``, `bigint`, []any{0, -1, 112358}}, + // BINARY(size) + {`binary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + + // BLOB(size) + {`blob`, `(65536)`, `blob`, []any{[]byte(longString)}}, + + // BOOLEAN + // Currently SQL Replication on Db2 does not support BOOLEAN. + //{`boolean`, ``, `int`, []any{true, false}}, + // CHAR(size) + {`char`, `(128)`, `string`, []any{[]byte(shortString)}}, + // CLOB(size) + {`clob`, `(128)`, `string`, []any{[]byte(shortString)}}, + // DATE + {`date`, ``, `date`, []any{"1000-01-01", "9999-12-31"}}, + // DATETIME(fsp) + {`datetime`, ``, `timestamp`, []any{"1000-01-01 00:00:00", "9999-12-31 23:59:59"}}, + + {`dbclob`, `(128)`, `string`, []any{shortString}}, + + // DECFLOAT + // DECIMAL(size, d) + {`decfloat`, ``, `decimal`, []any{math.E, math.Phi, math.Pi}}, + {`decimal`, `(11,10)`, `decimal`, []any{math.E, math.Phi, math.Pi}}, + + {`double`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + {`float`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // INT(size) + // INTEGER(size) + {`int`, ``, `int`, []any{0, -1, 112358}}, + + {`real`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // SMALLINT(size) + {`smallint`, ``, `int`, []any{0, -1, 127}}, + + // TIME(fsp) + {`time`, ``, `time`, []any{"16:20:00"}}, + // TIMESTAMP(fsp) + {`timestamp`, ``, `timestamp`, []any{"1970-01-01 00:00:01", "2038-01-19 03:14:07"}}, + // VARBINARY(size) + {`varbinary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + // VARCHAR(size) + {`varchar`, `(128)`, `text`, []any{shortString}}, + + // Sentinel + {`char`, `(4)`, `text`, []any{`done`}}, + + // XML replication is not supported on LUW + //{`xml`, ``, `text`, `a`}, + } + + // Create a dummy table for each type. + tables := make([]*tableMapping, len(tcs)) + for idx, tc := range tcs { + tbl := tableInfo{ + name: ident.New(fmt.Sprintf("tgt_%s_%d", tc.source, idx)), + sourceColName: ident.New("V"), + sourceColType: tc.source + tc.size, + targetColName: ident.New("V"), + targetColType: tc.crdb, + } + tgt, cancel, err := repl.createTables(ctx, tbl) + r.NoError(err) + tables[idx] = tgt + defer cancel() + } + + // CDC needs to be restarted on DB2. + err = repl.cdcRestart(ctx) + r.NoError(err) + + // Insert data in the source tables. + for idx, tc := range tcs { + err := repl.insertValues(ctx, tables[idx].source, tc.values, true) + r.NoError(err) + } + + // Wait for the CDC source tables to be fully populated. + lastidx := len(tcs) - 1 + expected := len(tcs[lastidx].values) + 1 + var count int + query := fmt.Sprintf("SELECT count(*) FROM %s", tables[lastidx].sourceCdc.Raw()) + for count < expected { + err := repl.source.QueryRowContext(ctx, fmt.Sprintf(query)).Scan(&count) + r.NoError(err) + if count < expected { + time.Sleep(1 * time.Second) + } + } + + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + + // Wait for rows to show up. + for idx, tc := range tcs { + tc := tc + t.Run(tc.source, func(t *testing.T) { + expected := len(tc.values) + 1 + a := assert.New(t) + var count int + for count < expected { + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s", tables[idx].target)).Scan(&count); !a.NoError(err) { + return + } + if count < expected { + time.Sleep(100 * time.Millisecond) + + } + } + // Expect the test data and a row with a NULL value. + a.Equalf(expected, count, "mismatch in %s", tables[idx].target) + }) + } + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +// UTILITIES + +// replAdmin provides utilites to manage source and target database. +type replAdmin struct { + source *sql.DB + sourceSchema ident.Schema + target *sql.DB + targetSchema ident.Schema +} + +type tableMapping struct { + source ident.Table + sourceCdc ident.Table + target ident.Table +} + +// cdcCmd issues a command to the CDC service on the source +// database, waiting for the given string to appear in CDC status, if supplied. +func (r *replAdmin) cdcCmd(ctx context.Context, command string, expect string) (bool, error) { + query := fmt.Sprintf("VALUES ASNCDC.ASNCDCSERVICES('%s','asncdc') ", command) + out, err := r.source.QueryContext(ctx, query) + if err != nil { + return false, err + } + defer out.Close() + for out.Next() { + var msg string + err := out.Scan(&msg) + if err != nil { + return false, err + } + done := strings.Contains(msg, expect) + if done { + return true, nil + } + } + return false, nil +} + +// cdcWaitFor waits until the given string is seen in the CDC status. +func (r *replAdmin) cdcWaitFor(ctx context.Context, expect string) error { + done := false + var err error + for !done { + done, err = r.cdcCmd(ctx, "status", expect) + if err != nil { + return err + } + if !done { + time.Sleep(1000 * time.Millisecond) + } + } + return nil +} + +// cdcRestart restarts CDC in the source database +func (r *replAdmin) cdcRestart(ctx context.Context) error { + err := r.cdcStop(ctx) + if err != nil { + return err + } + return r.cdcStart(ctx) +} + +// cdcStart starts CDC in the source database +func (r *replAdmin) cdcStart(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "start", "") + if err != nil { + return err + } + return r.cdcWaitFor(ctx, "is doing work") +} + +// cdcStop stops CDC in the source database +func (r *replAdmin) cdcStop(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "stop", "") + if err != nil { + return err + } + return r.cdcWaitFor(ctx, "asncap is not running") +} + +func (r *replAdmin) cleanUp(ctx context.Context) error { + err := r.cdcStop(ctx) + if err != nil { + return err + } + return r.source.Close() +} + +// createRepl sets up the replication environment. +// It creates a schema in the DB2 source. +func createRepl( + ctx *stopper.Context, fixture *base.Fixture, fc *fixtureConfig, tblName ident.Ident, +) (*replAdmin, *Config, error) { + + dbName, _ := fixture.TargetSchema.Schema().Split() + // For now, we are using Debezium stored procedure to add tables to CDC. + // The stored procedure does not deal with case sensitivity well. + // TODO (silvano): fix stored procedure. + sourceSchema := ident.MustSchema(ident.New( + strings.ToUpper(strings.ReplaceAll(dbName.Raw(), "-", "_")))) + + var tbl ident.Table + if !tblName.Empty() { + tbl = ident.NewTable(fixture.TargetSchema.Schema(), tblName) + } + config, err := getConfig(fixture, fc, tbl, + sourceSchema, fixture.TargetSchema.Schema()) + if err != nil { + return nil, nil, err + } + conn := &conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } + db, err := conn.Open() + if err != nil { + return nil, nil, err + } + // Create a schema on the source to store all the tables. + _, err = db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA %s", sourceSchema)) + if err != nil { + return nil, nil, err + } + // To speed up testing, we reduced commit/sleep interval to decrease latency. + // With this settings the Capture program has a significant impact on foreground + // activities, and they are NOT recommended in a production env. + // COMMIT_INTERVAL = interval in seconds for how often the Capture log reader thread commits. + // SLEEP_INTERVAL = interval in seconds that the Capture program sleeps when idle. + _, err = db.ExecContext(ctx, "UPDATE ASNCDC.IBMSNAP_CAPPARMS SET COMMIT_INTERVAL=2,SLEEP_INTERVAL=1") + if err != nil { + return nil, nil, err + } + // We start the replication from the current max log sequence number + lsn, err := getCurrentLsn(ctx, db) + if err != nil { + return nil, nil, err + } + config.DefaultConsistentPoint = string(lsn.Value) + return &replAdmin{ + source: db, + sourceSchema: sourceSchema, + target: fixture.TargetPool.DB, + targetSchema: fixture.TargetSchema.Schema(), + }, config, nil +} + +func (r *replAdmin) createTables( + ctx context.Context, table tableInfo, +) (*tableMapping, func() error, error) { + // TODO (silvano): fix case sensitivity. + tb := ident.New(strings.ToUpper(table.name.Raw())) + tgt := ident.NewTable(r.targetSchema, tb) + src := ident.NewTable(r.sourceSchema, tb) + cdc := ident.NewTable(cdcSchema, ident.New(strings.ToUpper(fmt.Sprintf("CDC_%s_%s", + r.sourceSchema.Raw(), tb.Raw())))) + res := &tableMapping{ + source: src, + sourceCdc: cdc, + target: tgt, + } + // Create the schema in both locations. + _, err := r.source.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s ("PK" INT PRIMARY KEY not null, %s %s)`, + src, table.sourceColName, table.sourceColType)) + if err != nil { + return &tableMapping{}, nil, err + } + _, err = r.target.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s ("PK" INT PRIMARY KEY, %s %s)`, + tgt, table.targetColName, table.targetColType)) + if err != nil { + return &tableMapping{}, nil, err + } + // enable CDC for the table + _, err = r.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw())) + if err != nil { + return &tableMapping{}, nil, err + } + return res, func() error { + query := fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw()) + _, err := r.source.ExecContext(ctx, query) + return err + }, nil +} + +// getConfig is an helper function to create a configuration for the connector +func getConfig( + fixture *base.Fixture, + fc *fixtureConfig, + tbl ident.Table, + dbName ident.Schema, + targetSchema ident.Schema, +) (*Config, error) { + + conn := defaultSourceConn + if found := os.Getenv("TEST_DB2_CONN"); len(found) > 0 { + conn = found + } + log.Infof("Starting source table: %s %s %v", conn, tbl, fc) + config := &Config{ + BaseConfig: logical.BaseConfig{ + ApplyTimeout: 5 * time.Minute, // Increase to make using the debugger easier. + ChaosProb: fc.chaosProb, + Immediate: fc.immediate, + RetryDelay: 100 * time.Millisecond, + StagingSchema: fixture.StagingDB.Schema(), + TargetConn: fixture.TargetPool.ConnectionString, + }, + LoopConfig: logical.LoopConfig{ + LoopName: "db2", + TargetSchema: targetSchema, + }, + SourceConn: conn, + SourceSchema: dbName, + } + if fc.backfill { + config.BackfillWindow = time.Minute + } + if fc.script { + config.ScriptConfig = script.Config{ + FS: scripttest.ScriptFSFor(tbl), + MainPath: "/testdata/logical_test_db2.ts", + } + } + return config, config.Preflight() +} + +// getCurrentLsn retrieves the latest sequence number in the monitoring tables. +func getCurrentLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, "select MAX(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON") + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + if len(v) == 0 { + return lsnZero(), nil + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +func (r *replAdmin) insertValues( + ctx context.Context, tbl ident.Table, values []any, insertNull bool, +) error { + tx, err := r.source.Begin() + if err != nil { + return err + } + defer tx.Rollback() + for valIdx, value := range values { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (?, ?)`, tbl), valIdx, value); err != nil { + return err + } + } + if insertNull { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (-1, NULL)`, tbl)); err != nil { + return err + } + } + return tx.Commit() +} diff --git a/internal/source/db2/lsn.go b/internal/source/db2/lsn.go new file mode 100644 index 000000000..49126c700 --- /dev/null +++ b/internal/source/db2/lsn.go @@ -0,0 +1,91 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/pkg/errors" +) + +// lsn represents the offset, in bytes, of a log record from the beginning of a database log file +type lsn struct { + Value []byte +} + +type lsnRange struct { + from *lsn + to *lsn +} + +var _ stamp.Stamp = (*lsn)(nil) +var _ fmt.Stringer = (*lsn)(nil) + +const zero = "00000000000000000000000000000000" + +// Less implements stamp.Stamp. +func (l *lsn) Less(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Compare(l.Value, o.Value) < 0 +} + +func (l *lsn) Equal(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Equal(l.Value, o.Value) +} + +func (l *lsn) String() string { + return fmt.Sprintf("%x", l.Value) +} + +type lsnMemo struct { + Value []byte `json:"value,omitempty"` +} + +func (l *lsn) MarshalJSON() ([]byte, error) { + p := lsnMemo{Value: l.Value} + return json.Marshal(p) +} + +func (l *lsn) UnmarshalJSON(data []byte) error { + var p lsnMemo + if err := json.Unmarshal(data, &p); err != nil { + return errors.WithStack(err) + } + l.Value = p.Value + return nil +} + +// UnmarshalText supports CLI flags and default values. +func (l *lsn) UnmarshalText(data []byte) error { + if len(data) == 0 { + return nil + } + l.Value = data + return nil +} + +func lsnZero() *lsn { + z, _ := hex.DecodeString(zero) + return &lsn{ + Value: z, + } +} diff --git a/internal/source/db2/metrics.go b/internal/source/db2/metrics.go new file mode 100644 index 000000000..a1f95a63c --- /dev/null +++ b/internal/source/db2/metrics.go @@ -0,0 +1,53 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + batchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_latency", + Help: "the length of time it took to process a batch", + Buckets: metrics.LatencyBuckets, + }) + batchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_size", + Help: "the size of batch", + Buckets: metrics.Buckets(1, 10000), + }) + mutationCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "db2_mutation_total", + Help: "Total number of mutations by source table.", + }, + []string{"table", "op"}, + ) + queryLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "db2_query_latency", + Help: "the length of time it took to process a query", + Buckets: metrics.LatencyBuckets, + }, []string{"table"}) + rollbackBatchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_rollback_size", + Help: "the size of a rollback", + Buckets: metrics.Buckets(1, 10000), + }) +) diff --git a/internal/source/db2/operation_string.go b/internal/source/db2/operation_string.go new file mode 100644 index 000000000..df1d306ce --- /dev/null +++ b/internal/source/db2/operation_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=operation"; DO NOT EDIT. + +package db2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[unknownOp-0] + _ = x[deleteOp-1] + _ = x[insertOp-2] + _ = x[updateOp-3] + _ = x[beginOp-4] + _ = x[endOp-5] + _ = x[rollbackOp-6] +} + +const _operation_name = "unknownOpdeleteOpinsertOpupdateOpbeginOpendOprollbackOp" + +var _operation_index = [...]uint8{0, 9, 17, 25, 33, 40, 45, 55} + +func (i operation) String() string { + if i < 0 || i >= operation(len(_operation_index)-1) { + return "operation(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _operation_name[_operation_index[i]:_operation_index[i+1]] +} diff --git a/internal/source/db2/provider.go b/internal/source/db2/provider.go new file mode 100644 index 000000000..057fe4258 --- /dev/null +++ b/internal/source/db2/provider.go @@ -0,0 +1,49 @@ +// Copyright 2025 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/google/wire" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideDialect, + ProvideLoop, +) + +// ProvideDialect is called by Wire to construct this package's +// logical.Dialect implementation. There's a fake dependency on +// the script loader so that flags can be evaluated first. +func ProvideDialect(config *Config, _ *script.Loader) (logical.Dialect, error) { + if err := config.Preflight(); err != nil { + return nil, err + } + + return New(config), nil +} + +// ProvideLoop is called by Wire to construct the sole logical loop used +// in the db2 mode. +func ProvideLoop( + cfg *Config, dialect logical.Dialect, loops *logical.Factory, +) (*logical.Loop, error) { + cfg.Dialect = dialect + return loops.Start(&cfg.LoopConfig) +} diff --git a/internal/source/db2/queries.go b/internal/source/db2/queries.go new file mode 100644 index 000000000..1cd81557b --- /dev/null +++ b/internal/source/db2/queries.go @@ -0,0 +1,356 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/pkg/errors" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" +) + +var ( + + // maxLsn is the max Log Sequence Number stored in the staging tables. + // This is used during live capture of events + maxLsn = `SELECT max(t.SYNCHPOINT) + FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM + ASNCDC.IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM ASNCDC.IBMSNAP_REGISTER) t + ` + + // nexLsn is the Log Sequence Number of the next batch from monitoring table. + // This is used during catch up, to batch mutations. + // The monitoring table is refreshed periodically based on the MONITOR_INTERVAL parameter as + // set it the ASNCDC.IBMSNAP_CAPPARMS table. By default, it set to 300 seconds. + nextLsn = ` + select MIN(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON where CD_ROWS_INSERTED > 0 AND RESTART_SEQ > ? + ` + + // get mutations + db2CdcTables = ` + SELECT r.SOURCE_OWNER, r.SOURCE_TABLE, + r.CD_OWNER, r.CD_TABLE, + r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, + t.TBSPACEID, t.TABLEID + FROM + ASNCDC.IBMSNAP_REGISTER r + LEFT JOIN SYSCAT.TABLES t + ON r.SOURCE_OWNER = t.TABSCHEMA + AND r.SOURCE_TABLE = t.TABNAME + WHERE r.SOURCE_OWNER <> '' + ` + // get mutations for a schema + db2CdcTablesForSchema = ` + SELECT r.SOURCE_OWNER, r.SOURCE_TABLE, + r.CD_OWNER, r.CD_TABLE, + r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, + t.TBSPACEID, t.TABLEID + FROM + ASNCDC.IBMSNAP_REGISTER r + LEFT JOIN SYSCAT.TABLES t + ON r.SOURCE_OWNER = t.TABSCHEMA + AND r.SOURCE_TABLE = t.TABNAME + WHERE r.SOURCE_OWNER = ? + ` + // Note: we skip deletes before updates. + changeQuery = ` + SELECT * FROM + (SELECT CASE + WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 0 + WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 3 + WHEN IBMSNAP_OPERATION = 'D' THEN 1 + WHEN IBMSNAP_OPERATION = 'I' THEN 2 + END + OPCODE, + cdc.* + FROM # cdc WHERE IBMSNAP_COMMITSEQ > ? AND IBMSNAP_COMMITSEQ <= ? + order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ) + WHERE OPCODE != 0 + ` + columnQuery = ` + SELECT c.colname,c.keyseq, c.TYPENAME + FROM syscat.tables as t + inner join syscat.columns as c + on t.tabname = c.tabname + and t.tabschema = c.tabschema + WHERE t.tabschema = ? AND t.tabname = ? + ORDER by c.COLNO + ` +) + +type cdcTable struct { + sourceOwner string + sourceTable string + cdOwner string + cdTable string + cdNewSyc []byte + cdOldSyc []byte + spaceID int + tableID int +} + +type message struct { + table cdcTable + op operation + lsn lsn + values []any +} + +//go:generate go run golang.org/x/tools/cmd/stringer -type=operation + +type operation int + +const ( + unknownOp operation = iota + deleteOp + insertOp + updateOp + beginOp + endOp + rollbackOp +) + +// Open a new connection to the source database. +func (c *conn) Open() (*sql.DB, error) { + con := fmt.Sprintf("HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=%s", + c.config.host, + c.config.database, + c.config.port, + c.config.user, + c.config.password) + log.Debugf("DB2 connection: HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=...", + c.config.host, + c.config.database, + c.config.port, + c.config.user) + + return sql.Open("go_ibm_db", con) +} + +// postMutations finds all the rows that have been changed on the given table +// and posts them to the channel used by the logical loop to be processed. +func (c *conn) postMutations( + ctx *stopper.Context, db *sql.DB, cdcTable cdcTable, lsnRange lsnRange, ch chan<- logical.Message, +) (int, error) { + count := 0 + // TODO (silvano): memoize, use templates + table := ident.NewTable(ident.MustSchema(ident.New(cdcTable.cdOwner)), ident.New(cdcTable.cdTable)) + query := strings.ReplaceAll(changeQuery, "#", table.Raw()) + // TODO (silvano): we might want to run in batches with a limit. + rows, err := db.QueryContext(ctx, query, lsnRange.from.Value, lsnRange.to.Value) + if err != nil { + return 0, err + } + defer rows.Close() + cols, _ := rows.ColumnTypes() + // the first four columns are metadata (opcode, commit_seq, intent_seq, op_id) + // the remaining columns contain the + numMetaColums := 4 + for rows.Next() { + count++ + mut := len(cols) + res := make([]any, mut) + ptr := make([]any, mut) + for i := range cols { + ptr[i] = &res[i] + } + err = rows.Scan(ptr...) + if err != nil { + log.Fatal(err) + } + msg := message{ + table: cdcTable, + } + // first column is the opcode + op, ok := res[0].(int32) + if !ok { + return count, errors.Errorf("invalid operation %T", res[0]) + } + msg.op = operation(op) + + // second column is the LSN of the committed transaction. + cs, ok := res[1].([]byte) + if !ok { + return count, errors.Errorf("invalid commit sequence %v", res[1]) + } + msg.lsn = lsn{Value: cs} + + msg.values = make([]any, len(res)-4) + vcols := cols[numMetaColums:] + for i, v := range res[numMetaColums:] { + if v == nil { + msg.values[i] = nil + continue + } + // TODO (silvano): handle all the types + // https://www.ibm.com/docs/en/db2-for-zos/13?topic=elements-data-types + switch vcols[i].DatabaseTypeName() { + case "VARCHAR", "CHAR", "CLOB": + s, _ := v.([]byte) + msg.values[i] = string(s) + case "DECIMAL", "DECFLOAT": + s, _ := v.([]byte) + msg.values[i], err = decimal.NewFromString(string(s)) + if err != nil { + errors.Wrapf(err, "cannot convert decimal %s", s) + return count, err + } + default: + msg.values[i] = v + } + } + select { + case ch <- msg: + case <-ctx.Stopping(): + return -1, nil + } + } + return count, nil +} + +// getMaxLsn retrieves the latest sequence number in all the staging tables. +func (c *conn) getMaxLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, maxLsn) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + if len(v) == 0 { + return lsnZero(), nil + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +// getNextLsn retrieves the next sequence number +func (c *conn) getNextLsn(ctx *stopper.Context, db *sql.DB, current *lsn) (*lsn, error) { + // Checking the monitoring table for a batch of mutations. + rows, err := db.QueryContext(ctx, nextLsn, current.Value) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + // if we don't have a LSN from the monitoring table, + // we can check the staging tables directly. + if len(v) == 0 { + return c.getMaxLsn(ctx, db) + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +// fetchColumnMetadata fetches the column metadata for a given table. +func (c *conn) fetchColumnMetadata(ctx *stopper.Context, db *sql.DB, t *cdcTable) error { + sourceTable := ident.NewTable( + ident.MustSchema(ident.New(t.sourceOwner)), + ident.New(t.sourceTable)) + + if _, ok := c.columns.Get(sourceTable); ok { + return nil + } + rows, err := db.QueryContext(ctx, columnQuery, sourceTable.Schema().Raw(), sourceTable.Table().Raw()) + if err != nil { + return err + } + defer rows.Close() + colData := make([]types.ColData, 0) + primaryKeys := make(map[int]int) + idx := 0 + for rows.Next() { + var name, typ string + var pk *int + err = rows.Scan(&name, &pk, &typ) + if err != nil { + return err + } + if pk != nil { + primaryKeys[*pk] = idx + } + log.Tracef("Table %s col %q %q primary?: %t", sourceTable, name, typ, pk != nil) + colData = append(colData, types.ColData{ + Name: ident.New(name), + Primary: pk != nil, + Type: typ, + }) + idx++ + } + c.columns.Put(sourceTable, colData) + c.primaryKeys.Put(sourceTable, primaryKeys) + + return nil +} + +func queryTables(ctx *stopper.Context, db *sql.DB, schema ident.Schema) (*sql.Rows, error) { + if schema.Empty() { + return db.QueryContext(ctx, db2CdcTables) + } + return db.QueryContext(ctx, db2CdcTablesForSchema, schema.Raw()) +} + +// getTables returns the tables that have replication enabled. +func (c *conn) getTables( + ctx *stopper.Context, db *sql.DB, schema ident.Schema, +) ([]cdcTable, error) { + rows, err := queryTables(ctx, db, schema) + if err != nil { + return nil, err + } + defer rows.Close() + result := make([]cdcTable, 0) + for rows.Next() { + var res cdcTable + err = rows.Scan(&res.sourceOwner, + &res.sourceTable, + &res.cdOwner, + &res.cdTable, + &res.cdNewSyc, + &res.cdOldSyc, + &res.spaceID, + &res.tableID, + ) + if err != nil { + return nil, err + } + result = append(result, res) + } + return result, nil +} diff --git a/internal/source/db2/wire_gen.go b/internal/source/db2/wire_gen.go new file mode 100644 index 000000000..9a83c656d --- /dev/null +++ b/internal/source/db2/wire_gen.go @@ -0,0 +1,90 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/staging/memo" + "github.com/cockroachdb/cdc-sink/internal/staging/version" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" + "github.com/cockroachdb/cdc-sink/internal/util/applycfg" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" +) + +// Injectors from injector.go: + +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + diagnostics := diag.New(ctx) + scriptConfig, err := logical.ProvideUserScriptConfig(config) + if err != nil { + return nil, err + } + loader, err := script.ProvideLoader(scriptConfig) + if err != nil { + return nil, err + } + dialect, err := ProvideDialect(config, loader) + if err != nil { + return nil, err + } + baseConfig, err := logical.ProvideBaseConfig(config, loader) + if err != nil { + return nil, err + } + targetPool, err := logical.ProvideTargetPool(ctx, baseConfig, diagnostics) + if err != nil { + return nil, err + } + targetStatements, err := logical.ProvideTargetStatements(ctx, baseConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + dlqConfig := logical.ProvideDLQConfig(baseConfig) + watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(dlqConfig, targetPool, watchers) + appliers, err := apply.ProvideFactory(ctx, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + stagingPool, err := logical.ProvideStagingPool(ctx, baseConfig, diagnostics) + if err != nil { + return nil, err + } + stagingSchema, err := logical.ProvideStagingDB(baseConfig) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + checker := version.ProvideChecker(stagingPool, memoMemo) + factory, err := logical.ProvideFactory(ctx, appliers, configs, baseConfig, diagnostics, memoMemo, loader, stagingPool, targetPool, watchers, checker) + if err != nil { + return nil, err + } + loop, err := ProvideLoop(config, dialect, factory) + if err != nil { + return nil, err + } + db2 := &DB2{ + Diagnostics: diagnostics, + Loop: loop, + } + return db2, nil +} diff --git a/main.go b/main.go index 865f290b0..c42036bcf 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/cockroachdb/cdc-sink/internal/cmd/db2" "github.com/cockroachdb/cdc-sink/internal/cmd/dumphelp" "github.com/cockroachdb/cdc-sink/internal/cmd/dumptemplates" "github.com/cockroachdb/cdc-sink/internal/cmd/fslogical" @@ -104,6 +105,8 @@ func main() { f.CountVarP(&verbosity, "verbose", "v", "increase logging verbosity to debug; repeat for trace") root.AddCommand( + db2.Command(), + db2.Install(), dumphelp.Command(), dumptemplates.Command(), fslogical.Command(),