From c840b125ec697962e4225a90fa0a441d18c76388 Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Thu, 4 Jan 2024 21:24:42 -0500 Subject: [PATCH] First draft for the DB2 connector. Uses DB2 SQL replication, leveraging staging tables in the source database. For instructions on how to put table in capture mode in DB2, so that changes can be sent to the staging tables, see https://debezium.io/documentation/reference/stable/connectors/db2.html#putting-tables-in-capture-mode. The connector periodically pulls new mutations from the staging tables and inserts them to the target database via the logical.Batch interface. --- .github/db2/Dockerfile | 28 + .github/db2/README.md | 5 + .github/db2/asncdc.c | 162 +++++ .github/db2/asncdc_UDF.sql | 17 + .github/db2/asncdcaddremove.sql | 194 ++++++ .github/db2/asncdctables.sql | 479 +++++++++++++ .github/db2/cdcsetup.sh | 18 + .github/db2/custom-init/cleanup_storage.sh | 12 + .github/db2/dbsetup.sh | 51 ++ .github/db2/inventory.sql | 77 +++ .github/db2/openshift_entrypoint.sh | 19 + .github/db2/startup-agent.sql | 1 + .github/db2/startup-cdc-demo.sql | 9 + .github/docker-compose.yml | 21 +- .github/workflows/go-test-db2.yaml | 207 ++++++ .github/workflows/go-tests.yaml | 2 +- .github/workflows/go.yaml | 13 + .gitignore | 6 + Makefile | 34 + go.mod | 6 + go.sum | 12 + internal/cmd/db2/db2.go | 39 ++ internal/cmd/db2/install.go | 153 +++++ internal/sinktest/all/integration.go | 4 + .../scripttest/testdata/logical_test_db2.ts | 55 ++ internal/source/db2/config.go | 96 +++ internal/source/db2/conn.go | 279 ++++++++ internal/source/db2/db2.go | 40 ++ internal/source/db2/driver.go | 22 + internal/source/db2/injector.go | 46 ++ internal/source/db2/integration_test.go | 649 ++++++++++++++++++ internal/source/db2/lsn.go | 93 +++ internal/source/db2/lsn_test.go | 127 ++++ internal/source/db2/metrics.go | 53 ++ internal/source/db2/operation_string.go | 29 + internal/source/db2/provider.go | 49 ++ internal/source/db2/queries.go | 354 ++++++++++ internal/source/db2/wire_gen.go | 90 +++ main.go | 3 + 39 files changed, 3552 insertions(+), 2 deletions(-) create mode 100644 .github/db2/Dockerfile create mode 100644 .github/db2/README.md create mode 100644 .github/db2/asncdc.c create mode 100644 .github/db2/asncdc_UDF.sql create mode 100644 .github/db2/asncdcaddremove.sql create mode 100644 .github/db2/asncdctables.sql create mode 100755 .github/db2/cdcsetup.sh create mode 100644 .github/db2/custom-init/cleanup_storage.sh create mode 100755 .github/db2/dbsetup.sh create mode 100644 .github/db2/inventory.sql create mode 100755 .github/db2/openshift_entrypoint.sh create mode 100644 .github/db2/startup-agent.sql create mode 100644 .github/db2/startup-cdc-demo.sql create mode 100644 .github/workflows/go-test-db2.yaml create mode 100644 Makefile create mode 100644 internal/cmd/db2/db2.go create mode 100644 internal/cmd/db2/install.go create mode 100644 internal/sinktest/scripttest/testdata/logical_test_db2.ts create mode 100644 internal/source/db2/config.go create mode 100644 internal/source/db2/conn.go create mode 100644 internal/source/db2/db2.go create mode 100644 internal/source/db2/driver.go create mode 100644 internal/source/db2/injector.go create mode 100644 internal/source/db2/integration_test.go create mode 100644 internal/source/db2/lsn.go create mode 100644 internal/source/db2/lsn_test.go create mode 100644 internal/source/db2/metrics.go create mode 100644 internal/source/db2/operation_string.go create mode 100644 internal/source/db2/provider.go create mode 100644 internal/source/db2/queries.go create mode 100644 internal/source/db2/wire_gen.go diff --git a/.github/db2/Dockerfile b/.github/db2/Dockerfile new file mode 100644 index 000000000..49008d71b --- /dev/null +++ b/.github/db2/Dockerfile @@ -0,0 +1,28 @@ +FROM icr.io/db2_community/db2 + +LABEL maintainer="Debezium Community" + +RUN mkdir -p /asncdctools/src + +ADD asncdc_UDF.sql /asncdctools/src +ADD asncdcaddremove.sql /asncdctools/src +ADD asncdctables.sql /asncdctools/src +ADD dbsetup.sh /asncdctools/src +ADD startup-agent.sql /asncdctools/src +ADD startup-cdc-demo.sql /asncdctools/src +ADD inventory.sql /asncdctools/src +ADD asncdc.c /asncdctools/src + +RUN mkdir /var/custom && \ + chmod -R 777 /asncdctools && \ + chmod -R 777 /var/custom + +ADD cdcsetup.sh /var/custom +ADD custom-init /var/custom-init + +RUN chmod -R 777 /var/custom-init + +ADD openshift_entrypoint.sh /var/db2_setup/lib + +RUN chmod 777 /var/custom/cdcsetup.sh && \ + chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh diff --git a/.github/db2/README.md b/.github/db2/README.md new file mode 100644 index 000000000..3f04f8a71 --- /dev/null +++ b/.github/db2/README.md @@ -0,0 +1,5 @@ +# DB2 container + +This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables. + +Taken from https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server diff --git a/.github/db2/asncdc.c b/.github/db2/asncdc.c new file mode 100644 index 000000000..be1fc483f --- /dev/null +++ b/.github/db2/asncdc.c @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include +#include + +void SQL_API_FN asncdcservice( + SQLUDF_VARCHAR *asnCommand, /* input */ + SQLUDF_VARCHAR *asnService, + SQLUDF_CLOB *fileData, /* output */ + /* null indicators */ + SQLUDF_NULLIND *asnCommand_ind, /* input */ + SQLUDF_NULLIND *asnService_ind, + SQLUDF_NULLIND *fileData_ind, + SQLUDF_TRAIL_ARGS, + struct sqludf_dbinfo *dbinfo) +{ + + int fd; + char tmpFileName[] = "/tmp/fileXXXXXX"; + fd = mkstemp(tmpFileName); + + int strcheck = 0; + char cmdstring[256]; + + + char* szDb2path = getenv("HOME"); + + + + char str[20]; + int len = 0; + char c; + char *buffer = NULL; + FILE *pidfile; + + char dbname[129]; + memset(dbname, '\0', 129); + strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen); + dbname[dbinfo->dbnamelen] = '\0'; + + int pid; + if (strcmp(asnService, "asncdc") == 0) + { + strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName); + int callcheck; + callcheck = system(cmdstring); + pidfile = fopen(tmpFileName, "r"); + while ((c = fgetc(pidfile)) != EOF) + { + if (c == '\n') + { + break; + } + len++; + } + buffer = (char *)malloc(sizeof(char) * len); + fseek(pidfile, 0, SEEK_SET); + fread(buffer, sizeof(char), len, pidfile); + fclose(pidfile); + pidfile = fopen(tmpFileName, "w"); + if (strcmp(asnCommand, "start") == 0) + { + if (len == 0) // is not running + { + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname); + fprintf(pidfile, "start --> %s \n", cmdstring); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is already running"); + } + } + if ((strcmp(asnCommand, "prune") == 0) || + (strcmp(asnCommand, "reinit") == 0) || + (strcmp(asnCommand, "suspend") == 0) || + (strcmp(asnCommand, "resume") == 0) || + (strcmp(asnCommand, "status") == 0) || + (strcmp(asnCommand, "stop") == 0)) + { + if (len > 0) + { + //buffer[len] = '\0'; + //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer); + //fprintf(pidfile, "stop --> %s", cmdstring); + //callcheck = system(cmdstring); + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName); + //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is not running"); + } + } + + fclose(pidfile); + } + /* system(cmdstring); */ + + int rc = 0; + long fileSize = 0; + size_t readCnt = 0; + FILE *f = NULL; + + f = fopen(tmpFileName, "r"); + if (!f) + { + strcpy(SQLUDF_MSGTX, "Could not open file "); + strncat(SQLUDF_MSGTX, tmpFileName, + SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1); + strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN); + return; + } + + rc = fseek(f, 0, SEEK_END); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN); + return; + } + + /* verify the file size */ + fileSize = ftell(f); + if (fileSize > fileData->length) + { + strcpy(SQLUDF_MSGTX, "File too large"); + strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN); + return; + } + + /* go to the beginning and read the entire file */ + rc = fseek(f, 0, 0); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN); + return; + } + + readCnt = fread(fileData->data, 1, fileSize, f); + if (readCnt != fileSize) + { + /* raise a warning that something weird is going on */ + sprintf(SQLUDF_MSGTX, "Could not read entire file " + "(%d vs %d)", + readCnt, fileSize); + strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN); + *fileData_ind = -1; + } + else + { + fileData->length = readCnt; + *fileData_ind = 0; + } + // remove temorary file + rc = remove(tmpFileName); + //fclose(pFile); +} diff --git a/.github/db2/asncdc_UDF.sql b/.github/db2/asncdc_UDF.sql new file mode 100644 index 000000000..fc1da4f9a --- /dev/null +++ b/.github/db2/asncdc_UDF.sql @@ -0,0 +1,17 @@ +DROP SPECIFIC FUNCTION ASNCDC.asncdcservice; + +CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8)) + RETURNS CLOB(100K) + SPECIFIC asncdcservice + EXTERNAL NAME 'asncdc!asncdcservice' + LANGUAGE C + PARAMETER STYLE SQL + DBINFO + DETERMINISTIC + NOT FENCED + RETURNS NULL ON NULL INPUT + NO SQL + NO EXTERNAL ACTION + NO SCRATCHPAD + ALLOW PARALLEL + NO FINAL CALL; \ No newline at end of file diff --git a/.github/db2/asncdcaddremove.sql b/.github/db2/asncdcaddremove.sql new file mode 100644 index 000000000..e5484f06a --- /dev/null +++ b/.github/db2/asncdcaddremove.sql @@ -0,0 +1,194 @@ + +-- +-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE() +-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table +-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table +-- + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE stmtSQL VARCHAR(2048); + +DECLARE SQLCODE INT; +DECLARE SQLSTATE CHAR(5); +DECLARE RC_SQLCODE INT DEFAULT 0; +DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000'; + +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE; + +-- delete ASN.IBMSNAP_PRUNCTL entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_Register entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- drop CD Table / source +SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename ; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUBS_COLS entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_COLVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2'; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_TABVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE SQLSTATE CHAR(5); + +DECLARE stmtSQL VARCHAR(2048); + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' AS ( SELECT ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' || + ' CAST ('''' AS CHAR(1)) ' || + ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' || + tableschema || '_' || tablename || + ' ON ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' VOLATILE CARDINALITY'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' || + 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' || + 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' || + 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' || + 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' || + 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' || + 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' || + 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''N'', ' || + '1, ' || + '''Y'', ' || + '''Y'', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + 'null, ' || + 'null, ' || + '0, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + '''0801'', ' || + 'null, ' || + 'null, ' || + '''0'', ' || + '''Y'', ' || + '''N'', ' || + '''Y'', ' || + '''NNNN'', ' || + '''Y'', ' || + '''A'',' || + 'null ) '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || + 'TARGET_SERVER, ' || + 'TARGET_OWNER, ' || + 'TARGET_TABLE, ' || + 'SYNCHTIME, ' || + 'SYNCHPOINT, ' || + 'SOURCE_OWNER, ' || + 'SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, ' || + 'APPLY_QUAL, ' || + 'SET_NAME, ' || + 'CNTL_SERVER , ' || + 'TARGET_STRUCTURE , ' || + 'CNTL_ALIAS , ' || + 'PHYS_CHANGE_OWNER , ' || + 'PHYS_CHANGE_TABLE , ' || + 'MAP_ID ' || + ') VALUES ( ' || + '''KAFKA'', ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + 'NULL, ' || + 'NULL, ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''KAFKAQUAL'', ' || + '''SET001'', ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '8, ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || + ' )'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; diff --git a/.github/db2/asncdctables.sql b/.github/db2/asncdctables.sql new file mode 100644 index 000000000..88203a7dd --- /dev/null +++ b/.github/db2/asncdctables.sql @@ -0,0 +1,479 @@ +-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150 + +CREATE TABLE ASNCDC.IBMQREP_COLVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +POSITION SMALLINT NOT NULL, +NAME VARCHAR(128) NOT NULL, +TYPE SMALLINT NOT NULL, +LENGTH INTEGER NOT NULL, +NULLS CHAR( 1) NOT NULL, +DEFAULT VARCHAR(1536), +CODEPAGE INTEGER, +SCALE INTEGER, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX +ON ASNCDC.IBMQREP_COLVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +POSITION ASC); + +CREATE INDEX ASNCDC.IX2COLVERSION +ON ASNCDC.IBMQREP_COLVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE TABLE ASNCDC.IBMQREP_TABVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +VERSION INTEGER NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_NAME VARCHAR(128) NOT NULL, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX +ON ASNCDC.IBMQREP_TABVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +VERSION ASC); + +CREATE INDEX ASNCDC.IX2TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE INDEX ASNCDC.IX3TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +SOURCE_OWNER ASC, +SOURCE_NAME ASC); + +CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL( +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021') +ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES ( +'1021'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME TIMESTAMP NOT NULL, +RESTART_TIME TIMESTAMP NOT NULL, +CURRENT_MEMORY INT NOT NULL, +CD_ROWS_INSERTED INT NOT NULL, +RECAP_ROWS_SKIPPED INT NOT NULL, +TRIGR_ROWS_SKIPPED INT NOT NULL, +CHG_ROWS_SKIPPED INT NOT NULL, +TRANS_PROCESSED INT NOT NULL, +TRANS_SPILLED INT NOT NULL, +MAX_TRANS_SIZE INT NOT NULL, +LOCKING_RETRIES INT NOT NULL, +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +LOGREADLIMIT INT NOT NULL, +CAPTURE_IDLE INT NOT NULL, +SYNCHTIME TIMESTAMP NOT NULL, +CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT , +LAST_EOL_TIME TIMESTAMP, +RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +LOGREAD_API_TIME INT, +NUM_LOGREAD_CALLS INT, +NUM_END_OF_LOGS INT, +LOGRDR_SLEEPTIME INT, +NUM_LOGREAD_F_CALLS INT, +TRANS_QUEUED INT, +NUM_WARNTXS INT, +NUM_WARNLOGAPI INT) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX +ON ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT INT, +LAG_LIMIT INT, +COMMIT_INTERVAL INT, +PRUNE_INTERVAL INT, +TRACE_LIMIT INT, +MONITOR_LIMIT INT, +MONITOR_INTERVAL INT, +MEMORY_LIMIT SMALLINT, +REMOTE_SRC_SERVER CHAR( 18), +AUTOPRUNE CHAR( 1), +TERM CHAR( 1), +AUTOSTOP CHAR( 1), +LOGREUSE CHAR( 1), +LOGSTDOUT CHAR( 1), +SLEEP_INTERVAL SMALLINT, +CAPTURE_PATH VARCHAR(1040), +STARTMODE VARCHAR( 10), +LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256, +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021', +COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021') + ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT, +LAG_LIMIT, +COMMIT_INTERVAL, +PRUNE_INTERVAL, +TRACE_LIMIT, +MONITOR_LIMIT, +MONITOR_INTERVAL, +MEMORY_LIMIT, +SLEEP_INTERVAL, +AUTOPRUNE, +TERM, +AUTOSTOP, +LOGREUSE, +LOGSTDOUT, +CAPTURE_PATH, +STARTMODE, +COMPATIBILITY) +VALUES ( +10080, +10080, +30, +300, +10080, +10080, +300, +32, +5, +'Y', +'Y', +'N', +'N', +'N', +NULL, +'WARMSI', +'1021' +); + +CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS ( + CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL + ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX + ON ASNCDC.IBMSNAP_CAPSCHEMAS + (CAP_SCHEMA_NAME ASC); + +INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES ( +'ASNCDC'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE( +OPERATION CHAR( 8) NOT NULL, +TRACE_TIME TIMESTAMP NOT NULL, +DESCRIPTION VARCHAR(1024) NOT NULL) + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX +ON ASNCDC.IBMSNAP_CAPTRACE( +TRACE_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL( +TARGET_SERVER CHAR(18) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +CNTL_SERVER CHAR( 18) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +CNTL_ALIAS CHAR( 8), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +MAP_ID VARCHAR(10) NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX +ON ASNCDC.IBMSNAP_PRUNCNTL( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC, +TARGET_TABLE ASC, +TARGET_OWNER ASC); + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1 +ON ASNCDC.IBMSNAP_PRUNCNTL( +MAP_ID ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2 +ON ASNCDC.IBMSNAP_PRUNCNTL( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3 +ON ASNCDC.IBMSNAP_PRUNCNTL( +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK( +DUMMY CHAR( 1)) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER CHAR( 18) NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX +ON ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER ASC, +APPLY_QUAL ASC, +SET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +GLOBAL_RECORD CHAR( 1) NOT NULL, +SOURCE_STRUCTURE SMALLINT NOT NULL, +SOURCE_CONDENSED CHAR( 1) NOT NULL, +SOURCE_COMPLETE CHAR( 1) NOT NULL, +CD_OWNER VARCHAR(128), +CD_TABLE VARCHAR(128), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +DISABLE_REFRESH SMALLINT NOT NULL, +CCD_OWNER VARCHAR(128), +CCD_TABLE VARCHAR(128), +CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CCD_CONDENSED CHAR( 1), +CCD_COMPLETE CHAR( 1), +ARCH_LEVEL CHAR( 4) NOT NULL, +DESCRIPTION CHAR(254), +BEFORE_IMG_PREFIX VARCHAR( 4), +CONFLICT_LEVEL CHAR( 1), +CHG_UPD_TO_DEL_INS CHAR( 1), +CHGONLY CHAR( 1), +RECAPTURE CHAR( 1), +OPTION_FLAGS CHAR( 4) NOT NULL, +STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y', +STATE CHAR( 1) WITH DEFAULT 'I', +STATE_INFO CHAR( 8)) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX +ON ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1 +ON ASNCDC.IBMSNAP_REGISTER( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2 +ON ASNCDC.IBMSNAP_REGISTER( +GLOBAL_RECORD ASC); + +ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_RESTART( +MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +MAX_COMMIT_TIME TIMESTAMP NOT NULL, +MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +CURR_COMMIT_TIME TIMESTAMP NOT NULL, +CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT , +SIGNAL_TYPE VARCHAR( 30) NOT NULL, +SIGNAL_SUBTYPE VARCHAR( 30), +SIGNAL_INPUT_IN VARCHAR(500), +SIGNAL_STATE CHAR( 1) NOT NULL, +SIGNAL_LSN VARCHAR( 16) FOR BIT DATA) +DATA CAPTURE CHANGES + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_SIGNALX +ON ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +COL_TYPE CHAR( 1) NOT NULL, +TARGET_NAME VARCHAR(128) NOT NULL, +IS_KEY CHAR( 1) NOT NULL, +COLNO SMALLINT NOT NULL, +EXPRESSION VARCHAR(1024) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX +ON ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC, +TARGET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY; + +--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX +--ON ASNCDC.IBMSNAP_SUBS_EVENT( +--EVENT_NAME ASC, +--EVENT_TIME ASC); + + +--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +TARGET_CONDENSED CHAR( 1) NOT NULL, +TARGET_COMPLETE CHAR( 1) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +PREDICATES VARCHAR(1024), +MEMBER_STATE CHAR( 1), +TARGET_KEY_CHG CHAR( 1) NOT NULL, +UOW_CD_PREDICATES VARCHAR(1024), +JOIN_UOW_CD CHAR( 1), +LOADX_TYPE SMALLINT, +LOADX_SRC_N_OWNER VARCHAR( 128), +LOADX_SRC_N_TABLE VARCHAR(128)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX +ON ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SET_TYPE CHAR( 1) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +ACTIVATE SMALLINT NOT NULL, +SOURCE_SERVER CHAR( 18) NOT NULL, +SOURCE_ALIAS CHAR( 8), +TARGET_SERVER CHAR( 18) NOT NULL, +TARGET_ALIAS CHAR( 8), +STATUS SMALLINT NOT NULL, +LASTRUN TIMESTAMP NOT NULL, +REFRESH_TYPE CHAR( 1) NOT NULL, +SLEEP_MINUTES INT, +EVENT_NAME CHAR( 18), +LASTSUCCESS TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CAPTURE_SCHEMA VARCHAR(128) NOT NULL, +TGT_CAPTURE_SCHEMA VARCHAR(128), +FEDERATED_SRC_SRVR VARCHAR( 18), +FEDERATED_TGT_SRVR VARCHAR( 18), +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +OPTION_FLAGS CHAR( 4) NOT NULL, +COMMIT_COUNT SMALLINT, +MAX_SYNCH_MINUTES SMALLINT, +AUX_STMTS SMALLINT NOT NULL, +ARCH_LEVEL CHAR( 4) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX +ON ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +BEFORE_OR_AFTER CHAR( 1) NOT NULL, +STMT_NUMBER SMALLINT NOT NULL, +EI_OR_CALL CHAR( 1) NOT NULL, +SQL_STMT VARCHAR(1024), +ACCEPT_SQLSTATES VARCHAR( 50)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX +ON ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +BEFORE_OR_AFTER ASC, +STMT_NUMBER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_UOW( +IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL, +IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +IBMSNAP_LOGMARKER TIMESTAMP NOT NULL, +IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL, +IBMSNAP_AUTHID VARCHAR(128) NOT NULL, +IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT , +IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX +ON ASNCDC.IBMSNAP_UOW( +IBMSNAP_COMMITSEQ ASC, +IBMSNAP_LOGMARKER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPENQ ( + LOCK_NAME CHAR(9 OCTETS) + ) + ORGANIZE BY ROW + DATA CAPTURE NONE + COMPRESS NO; diff --git a/.github/db2/cdcsetup.sh b/.github/db2/cdcsetup.sh new file mode 100755 index 000000000..c5d963d5b --- /dev/null +++ b/.github/db2/cdcsetup.sh @@ -0,0 +1,18 @@ +#/bin/bash + +if [ ! -f /asncdctools/src/asncdc.nlk ]; then +rc=1 +echo "waiting for db2inst1 exists ." +while [ "$rc" -ne 0 ] +do + sleep 5 + id db2inst1 + rc=$? + echo '.' +done + +su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1 +fi +touch /asncdctools/src/asncdc.nlk + +echo "CDC setup completed." diff --git a/.github/db2/custom-init/cleanup_storage.sh b/.github/db2/custom-init/cleanup_storage.sh new file mode 100644 index 000000000..9f0c458fd --- /dev/null +++ b/.github/db2/custom-init/cleanup_storage.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +################################################################################ +# Wipes the storage directory. +# Note that this essentially makes the database non-persistent when pod is deleted +################################################################################ + +echo "Inspecting database directory" +ls $STORAGE_DIR + +echo "Wiping database directory" +rm -rf $STORAGE_DIR/* diff --git a/.github/db2/dbsetup.sh b/.github/db2/dbsetup.sh new file mode 100755 index 000000000..9caf3d913 --- /dev/null +++ b/.github/db2/dbsetup.sh @@ -0,0 +1,51 @@ +#/bin/bash +echo "Compile ASN tool ..." +cd /asncdctools/src +/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc + +DBNAME=$1 +DB2DIR=/opt/ibm/db2/V11.5 +rc=1 +echo "Waiting for DB2 start ( $DBNAME ) ." +while [ "$rc" -ne 0 ] +do + sleep 5 + db2 connect to $DBNAME + rc=$? + echo '.' +done + +# enable metacatalog read via JDBC +cd $HOME/sqllib/bnd +db2 bind db2schema.bnd blocking all grant public sqlerror continue + +# do a backup and restart the db +db2 backup db $DBNAME to /dev/null +db2 restart db $DBNAME + +db2 connect to $DBNAME + +cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function +chmod 777 /database/config/db2inst1/sqllib/function + +# add UDF / start stop asncap +db2 -tvmf /asncdctools/src/asncdc_UDF.sql + +# create asntables +db2 -tvmf /asncdctools/src/asncdctables.sql + +# add UDF / add remove asntables + +db2 -tvmf /asncdctools/src/asncdcaddremove.sql + + + + +# create sample table and datat +db2 -tvmf /asncdctools/src/inventory.sql +db2 -tvmf /asncdctools/src/startup-agent.sql +sleep 10 +db2 -tvmf /asncdctools/src/startup-cdc-demo.sql + + +echo "done" diff --git a/.github/db2/inventory.sql b/.github/db2/inventory.sql new file mode 100644 index 000000000..1d441d590 --- /dev/null +++ b/.github/db2/inventory.sql @@ -0,0 +1,77 @@ + +-- Create and populate our products using a single insert with many rows +CREATE TABLE DB2INST1.PRODUCTS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','16oz carpenter''s hammer',1.0); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('rocks','box of assorted rocks',5.3); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('spare tire','24 inch spare tire',22.2); + +CREATE TABLE DB2INST1.PRODUCTS_ON_HAND ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (101,3); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (102,8); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (103,18); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (104,4); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (105,5); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (106,0); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (107,44); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (108,2); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (109,5); + +CREATE TABLE DB2INST1.CUSTOMERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 1001, INCREMENT BY 1) PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); + +CREATE TABLE DB2INST1.ORDERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 10001, INCREMENT BY 1) PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES DB2INST1.CUSTOMERS(id), + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-16', 1001, 1, 102); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-17', 1002, 2, 105); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-19', 1002, 2, 106); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-21', 1003, 1, 107); diff --git a/.github/db2/openshift_entrypoint.sh b/.github/db2/openshift_entrypoint.sh new file mode 100755 index 000000000..ae9521f0e --- /dev/null +++ b/.github/db2/openshift_entrypoint.sh @@ -0,0 +1,19 @@ +#!/bin/bash +################################################################################ +# Runs custom init scripts followed by the original entry point +############################################################################### + +source ${SETUPDIR?}/include/db2_constants +source ${SETUPDIR?}/include/db2_common_functions + +if [[ -d /var/custom-init ]]; then + echo "(*) Running user-provided init scripts ... " + chmod -R 777 /var/custom-init + for script in `ls /var/custom-init`; do + echo "(*) Running $script ..." + /var/custom-init/$script + done +fi + +echo "Running original entry point" +/var/db2_setup/lib/setup_db2_instance.sh diff --git a/.github/db2/startup-agent.sql b/.github/db2/startup-agent.sql new file mode 100644 index 000000000..ae9150e34 --- /dev/null +++ b/.github/db2/startup-agent.sql @@ -0,0 +1 @@ +VALUES ASNCDC.ASNCDCSERVICES('start','asncdc'); \ No newline at end of file diff --git a/.github/db2/startup-cdc-demo.sql b/.github/db2/startup-cdc-demo.sql new file mode 100644 index 000000000..b9efc155e --- /dev/null +++ b/.github/db2/startup-cdc-demo.sql @@ -0,0 +1,9 @@ + +VALUES ASNCDC.ASNCDCSERVICES('status','asncdc'); + +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS_ON_HAND' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'CUSTOMERS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'ORDERS' ); + +VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc'); \ No newline at end of file diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index f45f9a8d5..d626dd9f0 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -73,7 +73,26 @@ services: image: ghcr.io/cockroachdb/cdc-sink/firestore-emulator:latest # Expose the emulator on port 8181 to avoid conflict with CRDB admin UI. ports: - - "8181:8080" + - "8181:8080" + db2-v11.5.9: + # Using a DB2 image with additional SQL replication utilities. + # To build the image use .github/db2 + # Original is at icr.io/db2_community/db2 + build: ./db2 + privileged: True + ports: + - 50000:50000 + environment: + - PERSISTENT_HOME=false + - LICENSE=accept + - DBNAME=TESTDB + - DB2INST1_PASSWORD=SoupOrSecret + healthcheck: + # The setup scripts creates /asncdctools/src/asncdc.nlk when CDC is up. + # TODO (silvano) improve start up time + test: bash -c '[ -f /asncdctools/src/asncdc.nlk ] ' + interval: 1s + retries: 600 mysql-v5.7: image: mysql:5.7 platform: linux/x86_64 diff --git a/.github/workflows/go-test-db2.yaml b/.github/workflows/go-test-db2.yaml new file mode 100644 index 000000000..92874a906 --- /dev/null +++ b/.github/workflows/go-test-db2.yaml @@ -0,0 +1,207 @@ +# Copyright 2023 The Cockroach Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +name: DB2 Tests +permissions: + contents: read + packages: read +on: + workflow_call: + workflow_dispatch: +# on: +# push: +# branches: [ sr8_db2 ] +jobs: + # Static code-quality checks. + code-quality: + name: Code Quality + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Ensure binary starts + if: ${{ !cancelled() }} + run: go run . help + + # Integration matrix tests for all supported CRDB and source DBs. + tests: + name: Integration Tests + runs-on: ${{ matrix.runs-on || 'ubuntu-latest-8-core' }} + timeout-minutes: 20 + strategy: + fail-fast: false + # Refer to the CRDB support policy when determining how many + # older releases to support. + # https://www.cockroachlabs.com/docs/releases/release-support-policy.html + # + # This matrix is explicit, since we have a few axes (target vs + # integration) that can't be expressed with the automatic + # cross-product behavior provided by the matrix operator. + matrix: + include: + - cockroachdb: v23.1 + integration: db2-v11.5.9 + + env: + COVER_OUT: coverage-${{ strategy.job-index }}.out + DOCKER_LOGS_OUT: docker-${{ strategy.job-index }}.log + FIRESTORE_EMULATOR_HOST: 127.0.0.1:8181 + JUNIT_OUT: junit-${{ strategy.job-index }}.xml + TEST_OUT: go-test-${{ strategy.job-index }}.json + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + # Ensure we can grab any private images we need for testing. + - name: Log in to GitHub Package Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Start Containers + working-directory: .github + run: > + docker compose up -d --wait + cockroachdb-${{ matrix.cockroachdb }} + ${{ matrix.integration }} + ${{ matrix.source }} + ${{ matrix.target }} + + # The go test json output will be written into a pipeline to + # create a JUnit.xml file. The test reports are aggregated later + # on to produce a nicer summary of the test output in the GitHub + # Actions UI. + # + # Inspired by + # https://www.cloudwithchris.com/blog/githubactions-testsummary-go/ + - name: Go Tests + env: + COCKROACH_DEV_LICENSE: ${{ secrets.COCKROACH_DEV_LICENSE }} + CDC_INTEGRATION: ${{ matrix.integration }} + TEST_SOURCE_CONNECT: ${{ matrix.sourceConn }} + TEST_TARGET_CONNECT: ${{ matrix.targetConn }} + IBM_DRIVER: /tmp/drivers/clidriver + CGO_CFLAGS: -I${IBM_DRIVER}/include + CGO_LDFLAGS: -L${IBM_DRIVER}/lib + run: > + set -o pipefail; + make testdb2 2>&1 | + go run github.com/jstemmer/go-junit-report/v2 + -iocopy + -out ${{ env.JUNIT_OUT }} + -p cockroachdb=${{ matrix.cockroachdb }} + -p integration=${{ matrix.integration }} + -package-name ${{ matrix.cockroachdb }}-${{ matrix.integration }} | + tee ${{ env.TEST_OUT }} + + - name: Upload coverage + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ${{ env.COVER_OUT }} + + # Capture container logs in case they're needed for diagnostics. + - name: Docker container logs + if: always() + working-directory: .github + run: docker-compose logs --no-color > ${{ env.DOCKER_LOGS_OUT }} + + # Upload all test reports to a common artifact name, to make them + # available to the summarization step. The go test json is + # uploaded as a developer convenience. + - name: Stash test logs + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-reports + path: | + ${{ env.COVER_OUT }} + ${{ env.DOCKER_LOGS_OUT }} + ${{ env.JUNIT_OUT }} + ${{ env.TEST_OUT }} + retention-days: 7 + + # Aggregate the results of multiple jobs within this workflow into a + # single status object that we can use for branch protection. + # + # https://docs.github.com/en/rest/commits/statuses + status: + name: Create status objects + runs-on: ubuntu-latest + permissions: + statuses: write + needs: # Update failure case below + - code-quality + - tests + if: ${{ always() }} + env: + CONTEXT: Workflow Golang + GH_TOKEN: ${{ github.token }} + MERGE_SHA: ${{ github.event.merge_group.head_sha }} + PR_SHA: ${{ github.event.pull_request.head.sha }} + STATE: success + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + steps: + - name: Failure + if: ${{ needs.code-quality.result != 'success' || needs.tests.result != 'success' }} + run: echo "STATE=failure" >> $GITHUB_ENV + - name: Report + run: | + set -eo pipefail + + if [ ! -z "$PR_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$PR_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + if [ ! -z "$MERGE_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$MERGE_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + # This job downloads the test log files generated in the integration + # job and summarizes them into the GitHub Actions UI. + summarize-tests: + name: Test summaries + runs-on: ubuntu-latest + needs: tests + if: ${{ always() }} + steps: + - name: Download reports + uses: actions/download-artifact@v3 + with: + name: integration-reports + - name: Summarize + uses: test-summary/action@v2 + with: + paths: junit-*.xml diff --git a/.github/workflows/go-tests.yaml b/.github/workflows/go-tests.yaml index 65f85363b..efb519eb0 100644 --- a/.github/workflows/go-tests.yaml +++ b/.github/workflows/go-tests.yaml @@ -43,7 +43,7 @@ jobs: - name: Copyright headers if: ${{ !cancelled() }} - run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore '**/testdata/**/*.sql' . + run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore ".github/db2/**" -ignore '**/testdata/**/*.sql' . - name: Lint if: ${{ !cancelled() }} diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index ea98c18ad..637cbac50 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -82,6 +82,19 @@ jobs: statuses: write secrets: inherit + go-db2: + uses: ./.github/workflows/go-test-db2.yaml + needs: + - go-build-cache + # We use the merge queue prior to pushing to a branch, so there's no + # reason to repeat tests that just ran. + if: ${{ github.event_name != 'push' }} + permissions: + contents: read + packages: read + statuses: write + secrets: inherit + go-wiki: uses: ./.github/workflows/go-wiki.yaml needs: diff --git a/.gitignore b/.gitignore index 48edd5f21..d9badf399 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,12 @@ cdc-sink # Dependency directories (remove the comment below to include it) # vendor/ +# Temporary directory +tmp/ + +# Downloaded drivers +drivers/ + # CockroachDB data directory cockroach-data/ goroutine_dump/ diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..770237492 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ + +PWD = $(shell pwd) +COVER_OUT ?= cover.out +IBM_DRIVER = ${PWD}/drivers/clidriver +export CGO_CFLAGS=-I${IBM_DRIVER}/include +export CGO_LDFLAGS=-L${IBM_DRIVER}/lib +export DYLD_LIBRARY_PATH=${IBM_DRIVER}/lib +export LD_LIBRARY_PATH=${IBM_DRIVER}/lib + +.PHONY: all clidriver db2 clean realclean testdb2 + +all: cdc-sink + +cdc-sink: + go build + +clidriver: + go run . db2install --dest drivers + +db2: clidriver + go build -tags db2 + +clean: + rm cdc-sink + +realclean: clean + rm -rf drivers + +testdb2: clidriver + go test -tags db2 -count 1 -coverpkg=./internal/source/db2 \ + -covermode=atomic \ + -coverprofile=${COVER_OUT} \ + -race \ + -v ./internal/source/db2 diff --git a/go.mod b/go.mod index d620db1ee..8fa5d2798 100644 --- a/go.mod +++ b/go.mod @@ -38,9 +38,14 @@ require ( ) require ( + github.com/codeclysm/extract/v3 v3.1.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/h2non/filetype v1.1.3 // indirect + github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 // indirect + github.com/klauspost/compress v1.15.13 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/ulikunitz/xz v0.5.11 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel v1.21.0 // indirect @@ -72,6 +77,7 @@ require ( github.com/google/subcommands v1.2.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/ibmdb/go_ibm_db v0.4.5 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index 25af9c7dd..32d8e7b23 100644 --- a/go.sum +++ b/go.sum @@ -115,6 +115,8 @@ github.com/cockroachdb/gostdlib v1.19.0 h1:cSISxkVnTlWhTkyple/T6NXzOi5659FkhxvUg github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTymPZUKMfURjBtY= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8 h1:Hli+oX84dKq44sLVCcsGKqifm5Lg9J8VoJ2P3h9iPdI= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8/go.mod h1:75wnig8+TF6vst9hChkpcFO7YrRLddouJ5is8uqpfv0= +github.com/codeclysm/extract/v3 v3.1.1 h1:iHZtdEAwSTqPrd+1n4jfhr1qBhUWtHlMTjT90+fJVXg= +github.com/codeclysm/extract/v3 v3.1.1/go.mod h1:ZJi80UG2JtfHqJI+lgJSCACttZi++dHxfWuPaMhlOfQ= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= @@ -291,11 +293,15 @@ github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56 github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg= +github.com/h2non/filetype v1.1.3/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/ibmdb/go_ibm_db v0.4.5 h1:a0qKWbA5shCRo5HRRnAuENwhjg6AkgfPNr9xx0IZ160= +github.com/ibmdb/go_ibm_db v0.4.5/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP49UTmEvg= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= @@ -324,9 +330,13 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report/v2 v2.1.0 h1:X3+hPYlSczH9IMIpSC9CQSZA0L+BipYafciZUWHEmsc= github.com/jstemmer/go-junit-report/v2 v2.1.0/go.mod h1:mgHVr7VUo5Tn8OLVr1cKnLuEy0M92wdRntM99h7RkgQ= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 h1:rhqTjzJlm7EbkELJDKMTU7udov+Se0xZkWmugr6zGok= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v2wtGp9Gmz1Ze3eVRAWJMLokvN3QjdzCHLY= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0= +github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -423,6 +433,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= +github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/cmd/db2/db2.go b/internal/cmd/db2/db2.go new file mode 100644 index 000000000..4047706e9 --- /dev/null +++ b/internal/cmd/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains a command to perform logical replication +// from an IBM DB2 server. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/db2" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/spf13/cobra" +) + +// Command returns the pglogical subcommand. +func Command() *cobra.Command { + cfg := &db2.Config{} + return stdlogical.New(&stdlogical.Template{ + Bind: cfg.Bind, + Short: "start a DB2 replication feed", + Start: func(ctx *stopper.Context, cmd *cobra.Command) (any, error) { + return db2.Start(ctx, cfg) + }, + Use: "db2", + }) +} diff --git a/internal/cmd/db2/install.go b/internal/cmd/db2/install.go new file mode 100644 index 000000000..02c4e5594 --- /dev/null +++ b/internal/cmd/db2/install.go @@ -0,0 +1,153 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/codeclysm/extract/v3" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +const ( + defaultURL = "https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/" + defaultDest = "drivers" +) + +// Install downloads and installs the IBM DB2 ODBC driver. +// Because of license restrictions, the driver cannot be under +// source control. It needs to be downloaded when used. +func Install() *cobra.Command { + var downloadURL, dest string + cmd := &cobra.Command{ + Args: cobra.NoArgs, + Short: "installs the db2 driver", + Use: "db2install", + RunE: func(cmd *cobra.Command, args []string) error { + driverPath := path.Join(dest, "clidriver") + _, err := os.Stat(driverPath) + _, includeErr := os.Stat(path.Join(driverPath, "include")) + _, libErr := os.Stat(path.Join(driverPath, "lib")) + if os.IsNotExist(err) || os.IsNotExist(includeErr) || os.IsNotExist(libErr) { + pkgName, err := install(cmd.Context(), dest, downloadURL) + if err != nil { + return err + } + fmt.Printf("Installed %s into %s.\n", pkgName, dest) + return nil + } + if err != nil { + return err + } + if includeErr != nil { + return includeErr + } + if libErr != nil { + return libErr + } + fmt.Println("Driver already installed.") + return nil + }, + } + cmd.Flags().StringVar(&dest, "dest", defaultDest, + "destination dir") + cmd.Flags().StringVar(&downloadURL, "url", defaultURL, + "url to download the driver") + return cmd +} + +// download a package to specified temp destination. +func download(tmpFile, baseURL, pkg string) (downloadError error) { + out, err := os.Create(tmpFile) + if err != nil { + return err + } + defer func() { + if err := out.Close(); downloadError != nil && err != nil { + downloadError = err + } + }() + pkgURL, err := url.JoinPath(baseURL, pkg) + if err != nil { + return err + } + resp, err := http.Get(pkgURL) + if err != nil { + return err + } + defer resp.Body.Close() + _, downloadError = io.Copy(out, resp.Body) + return +} + +// install the IBM DB2 driver for the runtime platform into +// the specified directory +func install(ctx context.Context, target, baseURL string) (string, error) { + var pkgName string + const wordsize = 32 << (^uint(0) >> 32 & 1) + switch runtime.GOOS { + case "darwin": + pkgName = "macos64_odbc_cli.tar.gz" + case "windows": + pkgName = fmt.Sprintf("ntx%d_odbc_cli.zip", wordsize) + case "linux": + switch runtime.GOARCH { + case "amd64": + pkgName = "linuxx64_odbc_cli.tar.gz" + default: + return "", errors.Errorf("unknown arch %q", runtime.GOARCH) + } + default: + return "", errors.Errorf("unknown OS %q", runtime.GOOS) + } + pkg := filepath.Join(target, pkgName) + _, err := os.Stat(pkg) + if os.IsNotExist(err) { + fmt.Printf("Installing %s%s in %s.\n", baseURL, pkgName, target) + tmpFile := path.Join(os.TempDir(), pkgName) + defer os.Remove(tmpFile) + err := download(tmpFile, baseURL, pkgName) + if err != nil { + return "", err + } + fmt.Println("Download successful.") + pkg = tmpFile + } else { + fmt.Printf("Found %s in %s\n", pkgName, target) + } + f, err := os.Open(pkg) + if err != nil { + return "", err + } + if strings.HasSuffix(pkg, ".zip") { + return pkgName, extract.Zip(ctx, f, target, nil) + } + if strings.HasSuffix(pkg, ".gz") { + return pkgName, extract.Gz(ctx, f, target, nil) + } + return "", errors.Errorf("unknown file type %s", pkgName) +} diff --git a/internal/sinktest/all/integration.go b/internal/sinktest/all/integration.go index 666fa7573..66820dfbf 100644 --- a/internal/sinktest/all/integration.go +++ b/internal/sinktest/all/integration.go @@ -29,6 +29,10 @@ const ( // integration tests for some databases. We expect this to be of the // format "database-v123". IntegrationEnvName = "CDC_INTEGRATION" + // DB2Name must be kept in alignment with the + // .github/docker-compose.yml file and the integration matrix + // variable in workflows/tests.yaml. + DB2Name = "db2" // FirestoreName must be kept in alignment with the // .github/docker-compose.yml file and the integration matrix // variable in workflows/tests.yaml. diff --git a/internal/sinktest/scripttest/testdata/logical_test_db2.ts b/internal/sinktest/scripttest/testdata/logical_test_db2.ts new file mode 100644 index 000000000..7b4f2fb85 --- /dev/null +++ b/internal/sinktest/scripttest/testdata/logical_test_db2.ts @@ -0,0 +1,55 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; +import {Document, Table} from "cdc-sink@v1"; + +// The sentinel name will be replaced by the test rig. It would normally be +// "my_db.public" or "my_db" depending on the target product. +api.configureSource("{{ SCHEMA }}", { + dispatch: (doc: Document, meta: Document): Record => { + console.trace(JSON.stringify(doc), JSON.stringify(meta)); + let ret: Record = {}; + ret[meta.table] = [ + { + PK: doc.PK, + ignored: 'by_configuration_below', + v_dispatched: doc.V, // Rename the property + } + ]; + return ret + } +}) + +// We introduce an unknown column in the dispatch function above. +// We'll add an extra configuration here to ignore it. +// The sentinel name would be replaced by "my_table". +api.configureTable("{{ TABLE }}", { + map: (doc: Document): Document => { + console.trace("map", JSON.stringify(doc)); + if (doc.v_dispatched === undefined) { + throw "did not find expected property"; + } + doc.v_mapped = doc.v_dispatched; + delete doc.v_dispatched; // Avoid schema-drift error due to unknown column. + return doc; + }, + ignore: { + "ignored": true, + } +}) diff --git a/internal/source/db2/config.go b/internal/source/db2/config.go new file mode 100644 index 000000000..f612d0ed2 --- /dev/null +++ b/internal/source/db2/config.go @@ -0,0 +1,96 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "net/url" + "strconv" + "strings" + + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +// Config contains the configuration necessary for creating a +// replication connection to a DB2 server. SourceConn is mandatory. +type Config struct { + logical.BaseConfig + logical.LoopConfig + + // Connection string for the source db. Mandatory. + SourceConn string + // The schema we are replicating. If not provided, all the schemas will be replicated. + SourceSchema ident.Schema + + // The fields below are extracted by Preflight. + host string + password string + port uint16 + user string + database string +} + +var schema string + +// Bind adds flags to the set. It delegates to the embedded Config.Bind. +func (c *Config) Bind(f *pflag.FlagSet) { + c.BaseConfig.Bind(f) + c.LoopConfig.LoopName = "db2" + c.LoopConfig.Bind(f) + f.StringVar(&c.SourceConn, "sourceConn", "", + "the source database's connection string") + f.StringVar(&schema, "sourceSchema", "", "the source schema") + +} + +// Preflight updates the configuration with sane defaults or returns an +// error if there are missing options for which a default cannot be +// provided. +func (c *Config) Preflight() error { + if err := c.BaseConfig.Preflight(); err != nil { + return err + } + if err := c.LoopConfig.Preflight(); err != nil { + return err + } + if c.LoopName == "" { + return errors.New("no LoopName was configured") + } + if schema != "" { + c.SourceSchema = ident.MustSchema(ident.New(schema)) + } + if c.SourceConn == "" { + return errors.New("no SourceConn was configured") + } + u, err := url.Parse(c.SourceConn) + if err != nil { + return err + } + port, err := strconv.ParseUint(u.Port(), 0, 16) + if err != nil { + return errors.Wrapf(err, "invalid port %s", u.Port()) + } + c.host = u.Hostname() + c.port = uint16(port) + c.user = u.User.Username() + c.password, _ = u.User.Password() + c.database, _ = strings.CutPrefix(u.Path, "/") + + return nil +} diff --git a/internal/source/db2/conn.go b/internal/source/db2/conn.go new file mode 100644 index 000000000..d6fe3e126 --- /dev/null +++ b/internal/source/db2/conn.go @@ -0,0 +1,279 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "encoding/json" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// conn is responsible to pull the mutations from the DB2 staging tables +// and applying them to the target database via the logical.Batch interface. +// The process leverages the DB2 SQL replication, similar to +// the debezium DB2 connector. +// To set up DB2 for SQL replication, see the instructions at +// https://debezium.io/documentation/reference/stable/connectors/db2.html#setting-up-db2 +// Note: In DB2 identifiers (e.g. table names, column names) are converted to +// uppercase, unless they are in quotes. The target schema must use the +// same convention. +type conn struct { + columns *ident.TableMap[[]types.ColData] + primaryKeys *ident.TableMap[map[int]int] + config *Config +} + +var ( + _ diag.Diagnostic = (*conn)(nil) + _ logical.Dialect = (*conn)(nil) +) + +// New instantiates a connection to the DB2 server. +func New(config *Config) logical.Dialect { + return &conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } +} + +// Process implements logical.Dialect. +func (c *conn) Process( + ctx *stopper.Context, ch <-chan logical.Message, events logical.Events, +) error { + var startBatch time.Time + var mutInBatch int + var batch logical.Batch + defer func() { + log.Warn("db2.Process done. Rolling back") + if batch != nil { + rollbackBatchSize.Observe(float64(mutInBatch)) + _ = batch.OnRollback(ctx) + } + }() + for msg := range ch { + // Ensure that we resynchronize. + if logical.IsRollback(msg) { + if batch != nil { + rollbackBatchSize.Observe(float64(mutInBatch)) + if err := batch.OnRollback(ctx); err != nil { + return err + } + batch = nil + } + continue + } + + ev, _ := msg.(message) + switch ev.op { + case beginOp: + var err error + batch, err = events.OnBegin(ctx) + mutInBatch = 0 + startBatch = time.Now() + if err != nil { + return err + } + case endOp: + select { + case err := <-batch.OnCommit(ctx): + if err != nil { + return err + } + batchSize.Observe(float64(mutInBatch)) + batchLatency.Observe(float64(time.Since(startBatch).Seconds())) + batch = nil + case <-ctx.Done(): + return ctx.Err() + } + + if err := events.SetConsistentPoint(ctx, &ev.lsn); err != nil { + return err + } + case insertOp, updateOp, deleteOp: + sourceTable := ident.NewTable( + ident.MustSchema(ident.New(ev.table.sourceOwner)), + ident.New(ev.table.sourceTable)) + targetTable := ident.NewTable( + c.config.TargetSchema.Schema(), + ident.New(ev.table.sourceTable)) + targetCols, ok := c.columns.Get(sourceTable) + if !ok { + return errors.Errorf("unable to retrieve target columns for %s", sourceTable) + } + primaryKeys, ok := c.primaryKeys.Get(sourceTable) + if !ok { + return errors.Errorf("unable to retrieve primary keys for %s", sourceTable) + } + var err error + var mut types.Mutation + key := make([]any, len(primaryKeys)) + enc := make(map[string]any) + for idx, sourceCol := range ev.values { + targetCol := targetCols[idx] + switch s := sourceCol.(type) { + case nil: + enc[targetCol.Name.Raw()] = nil + default: + enc[targetCol.Name.Raw()] = s + } + if targetCol.Primary { + key[primaryKeys[idx]] = sourceCol + } + } + mut.Key, err = json.Marshal(key) + if err != nil { + return err + } + if ev.op != deleteOp { + mut.Data, err = json.Marshal(enc) + if err != nil { + return err + } + } + mutInBatch++ + mutationCount.With(prometheus.Labels{ + "table": sourceTable.Raw(), + "op": ev.op.String()}).Inc() + log.Tracef("%s %s %s %v\n", ev.op, sourceTable.Raw(), targetTable.Raw(), key) + script.AddMeta("db2", targetTable, &mut) + err = batch.OnData(ctx, script.SourceName(targetTable), targetTable, []types.Mutation{mut}) + if err != nil { + return err + } + default: + log.Warnf("unknown op %v", ev.op) + } + } + return nil +} + +// ReadInto implements logical.Dialect. +func (c *conn) ReadInto( + ctx *stopper.Context, ch chan<- logical.Message, state logical.State, +) error { + db, err := c.Open() + if err != nil { + return errors.Wrap(err, "failed to open a connection to the target db") + } + defer func() { + log.Warn("db2.ReadInto done. Closing database connection") + err := db.Close() + log.Info("Db closed.", err) + }() + cp, _ := state.GetConsistentPoint() + if cp == nil { + return errors.New("missing lsn") + } + previousLsn, _ := cp.(*lsn) + for { + nextLsn, err := c.getNextLsn(ctx, db, previousLsn) + if err != nil { + return errors.Wrap(err, "cannot retrieve next sequence number") + } + log.Debugf("NEXT [%x]\n", nextLsn.Value) + if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) { + select { + case <-time.After(1 * time.Second): + continue + case <-ctx.Stopping(): + return nil + } + } + log.Debugf("BEGIN [%x]\n", previousLsn.Value) + select { + case ch <- message{ + op: beginOp, + lsn: *nextLsn, + }: + case <-ctx.Stopping(): + return nil + } + + lsnRange := lsnRange{ + from: previousLsn, + to: nextLsn, + } + tables, err := c.getTables(ctx, db, c.config.SourceSchema) + if err != nil { + log.Error("getTables failed", err) + return errors.Wrap(err, "cannot get DB2 CDC tables") + } + for _, t := range tables { + err := c.fetchColumnMetadata(ctx, db, &t) + if err != nil { + log.Error("fetchColumnMetadata failed", err) + return errors.Wrap(err, "cannot retrieve colum names") + } + } + + for _, tbl := range tables { + start := time.Now() + count, err := c.postMutations(ctx, db, tbl, lsnRange, ch) + if err != nil { + log.Error("postMutations failed", err) + return errors.Wrap(err, "cannot post mutation") + } + if ctx.IsStopping() { + return nil + } + log.Debugf("post mutations for %s starting at %x. Count %d. Time %d.", tbl.sourceTable, + lsnRange.from.Value, count, + time.Since(start).Milliseconds()) + queryLatency.With(prometheus.Labels{ + "table": tbl.sourceTable}).Observe(time.Since(start).Seconds()) + } + + select { + case ch <- message{ + op: endOp, + lsn: *nextLsn, + }: + case <-ctx.Stopping(): + return nil + } + log.Debugf("COMMIT [%x]\n", nextLsn.Value) + previousLsn = nextLsn + } + +} + +// ZeroStamp implements logical.Dialect. +func (c *conn) ZeroStamp() stamp.Stamp { + return lsnZero() +} + +// Diagnostic implements diag.Diagnostic. +func (c *conn) Diagnostic(_ context.Context) any { + return map[string]any{ + "hostname": c.config.host, + "database": c.config.database, + "defaultLsn": c.config.DefaultConsistentPoint, + "columns": c.columns, + } +} diff --git a/internal/source/db2/db2.go b/internal/source/db2/db2.go new file mode 100644 index 000000000..73d0f4cb0 --- /dev/null +++ b/internal/source/db2/db2.go @@ -0,0 +1,40 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains support for reading a db2 sql replication feed. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" +) + +// DB2 is a logical replication loop for the IBM DB2 database. +// It leverages SQL replication on the source. +type DB2 struct { + Diagnostics *diag.Diagnostics + Loop *logical.Loop +} + +var ( + _ stdlogical.HasDiagnostics = (*DB2)(nil) +) + +// GetDiagnostics implements [stdlogical.HasDiagnostics]. +func (d *DB2) GetDiagnostics() *diag.Diagnostics { + return d.Diagnostics +} diff --git a/internal/source/db2/driver.go b/internal/source/db2/driver.go new file mode 100644 index 000000000..99209ccad --- /dev/null +++ b/internal/source/db2/driver.go @@ -0,0 +1,22 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import _ "github.com/ibmdb/go_ibm_db" diff --git a/internal/source/db2/injector.go b/internal/source/db2/injector.go new file mode 100644 index 000000000..d461d2bcb --- /dev/null +++ b/internal/source/db2/injector.go @@ -0,0 +1,46 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build wireinject +// +build wireinject + +package db2 + +import ( + "context" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/staging" + "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + panic(wire.Build( + wire.Bind(new(context.Context), new(*stopper.Context)), + wire.Bind(new(logical.Config), new(*Config)), + wire.Struct(new(DB2), "*"), + Set, + diag.New, + logical.Set, + script.Set, + staging.Set, + target.Set, + )) +} diff --git a/internal/source/db2/integration_test.go b/internal/source/db2/integration_test.go new file mode 100644 index 000000000..02a09cec8 --- /dev/null +++ b/internal/source/db2/integration_test.go @@ -0,0 +1,649 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import ( + "context" + "database/sql" + "fmt" + "math" + "os" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sinktest" + "github.com/cockroachdb/cdc-sink/internal/sinktest/all" + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + _ "github.com/ibmdb/go_ibm_db" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const defaultSourceConn = "db2://db2inst1:SoupOrSecret@localhost:50000/TESTDB" + +// TODO (silvano): provide a configuration option. +var cdcSchema = ident.MustSchema(ident.New("ASNCDC")) + +type fixtureConfig struct { + backfill bool + chaosProb float32 + immediate bool + script bool +} + +type tableInfo struct { + name ident.Ident + sourceColName ident.Ident + sourceColType string + targetColName ident.Ident + targetColType string +} + +func TestMain(m *testing.M) { + all.IntegrationMain(m, all.DB2Name) +} +func TestB2Logical(t *testing.T) { + t.Run("consistent", func(t *testing.T) { testDB2Logical(t, &fixtureConfig{}) }) + t.Run("consistent-backfill", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{backfill: true}) + }) + t.Run("consistent-backfill-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{backfill: true, chaosProb: 0.0005}) + }) + t.Run("consistent-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{chaosProb: 0.0005}) + }) + t.Run("consistent-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{script: true}) + }) + t.Run("immediate", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true}) + }) + t.Run("immediate-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true, chaosProb: 0.0005}) + }) + t.Run("immediate-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{immediate: true, script: true}) + }) +} + +func testDB2Logical(t *testing.T, fc *fixtureConfig) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDB2LogicalInt(t, fixture, fc) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} +func testDB2LogicalInt(t *testing.T, fixture *base.Fixture, fc *fixtureConfig) { + a := assert.New(t) + r := require.New(t) + ctx := fixture.Context + // Using uppercase for consistency + tableName := ident.New("T") + targetColName := "V" + if fc.script { + targetColName = "v_mapped" + } + table := tableInfo{ + name: tableName, + sourceColName: ident.New("V"), + sourceColType: "varchar(20)", + targetColName: ident.New(targetColName), + targetColType: "string", + } + + repl, config, err := createRepl(ctx, fixture, fc, tableName) + r.NoError(err) + defer repl.cleanUp(ctx) + + tableMap, cancel, err := repl.createTables(ctx, table) + r.NoError(err) + defer cancel() + start := time.Now() + // CDC needs to be initialized again on DB2. + // TODO (silvano): switch to reInit, once we move the + // COMMIT_INTERVAL,SLEEP_INTERVAL settings to the image. + // Changing them requires a restart. + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + const rowCount = 128 + values := make([]any, rowCount) + for i := 0; i < rowCount; i++ { + values[i] = fmt.Sprintf("v=%d", i) + } + // Insert data into source table. + start = time.Now() + err = repl.insertValues(ctx, tableMap.source, values, false) + r.NoError(err) + log.Infof("Inserted rows in %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + start = time.Now() + for { + var count int + if err := repl.target.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", tableMap.target)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + log.Infof("Rows replicated %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET %s= 'updated'`, tableMap.source, table.sourceColName)) + r.NoError(err) + // Wait for the update to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE "PK" < 50`, tableMap.source)) + r.NoError(err) + // Wait for the deletes to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount-50 { + break + } + time.Sleep(1 * time.Second) + } + + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +func TestDataTypes(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDataTypes(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testDataTypes(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + // Build a long value for string and byte types. + var sb strings.Builder + shortString := "0123456789ABCDEF" + for sb.Len() < 1<<16 { + sb.WriteString(shortString) + } + longString := sb.String() + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{immediate: false}, ident.Ident{}) + r.NoError(err) + defer repl.cleanUp(ctx) + + tcs := []struct { + source string + size string + crdb string + values []any // We automatically test for NULL below. + }{ + + // BIGINT(size) + {`bigint`, ``, `bigint`, []any{0, -1, 112358}}, + // BINARY(size) + {`binary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + + // BLOB(size) + {`blob`, `(65536)`, `blob`, []any{[]byte(longString)}}, + + // BOOLEAN + // Currently SQL Replication on Db2 does not support BOOLEAN. + //{`boolean`, ``, `int`, []any{true, false}}, + // CHAR(size) + {`char`, `(128)`, `string`, []any{[]byte(shortString)}}, + // CLOB(size) + {`clob`, `(128)`, `string`, []any{[]byte(shortString)}}, + // DATE + {`date`, ``, `date`, []any{"1000-01-01", "9999-12-31"}}, + // DATETIME(fsp) + {`datetime`, ``, `timestamp`, []any{"1000-01-01 00:00:00", "9999-12-31 23:59:59"}}, + + {`dbclob`, `(128)`, `string`, []any{shortString}}, + + // DECFLOAT + // DECIMAL(size, d) + {`decfloat`, ``, `decimal`, []any{math.E, math.Phi, math.Pi}}, + {`decimal`, `(11,10)`, `decimal`, []any{math.E, math.Phi, math.Pi}}, + + {`double`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + {`float`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // INT(size) + // INTEGER(size) + {`int`, ``, `int`, []any{0, -1, 112358}}, + + {`real`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // SMALLINT(size) + {`smallint`, ``, `int`, []any{0, -1, 127}}, + + // TIME(fsp) + {`time`, ``, `time`, []any{"16:20:00"}}, + // TIMESTAMP(fsp) + {`timestamp`, ``, `timestamp`, []any{"1970-01-01 00:00:01", "2038-01-19 03:14:07"}}, + // VARBINARY(size) + {`varbinary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + // VARCHAR(size) + {`varchar`, `(128)`, `text`, []any{shortString}}, + + // Sentinel + {`char`, `(4)`, `text`, []any{`done`}}, + + // XML replication is not supported on LUW + //{`xml`, ``, `text`, `a`}, + } + + // Create a dummy table for each type. + tables := make([]*tableMapping, len(tcs)) + for idx, tc := range tcs { + tbl := tableInfo{ + name: ident.New(fmt.Sprintf("tgt_%s_%d", tc.source, idx)), + sourceColName: ident.New("V"), + sourceColType: tc.source + tc.size, + targetColName: ident.New("V"), + targetColType: tc.crdb, + } + tgt, cancel, err := repl.createTables(ctx, tbl) + r.NoError(err) + tables[idx] = tgt + defer cancel() + } + + // CDC needs to be initialized again on DB2. + err = repl.cdcRestart(ctx) + r.NoError(err) + + // Insert data in the source tables. + for idx, tc := range tcs { + err := repl.insertValues(ctx, tables[idx].source, tc.values, true) + r.NoError(err) + } + + // Wait for the CDC source tables to be fully populated. + lastidx := len(tcs) - 1 + expected := len(tcs[lastidx].values) + 1 + var count int + query := fmt.Sprintf("SELECT count(*) FROM %s", tables[lastidx].sourceCdc.Raw()) + for count < expected { + err := repl.source.QueryRowContext(ctx, fmt.Sprintf(query)).Scan(&count) + r.NoError(err) + if count < expected { + time.Sleep(1 * time.Second) + } + } + + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + + // Wait for rows to show up. + for idx, tc := range tcs { + tc := tc + t.Run(tc.source, func(t *testing.T) { + expected := len(tc.values) + 1 + a := assert.New(t) + var count int + for count < expected { + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s", tables[idx].target)).Scan(&count); !a.NoError(err) { + return + } + if count < expected { + time.Sleep(100 * time.Millisecond) + + } + } + // Expect the test data and a row with a NULL value. + a.Equalf(expected, count, "mismatch in %s", tables[idx].target) + }) + } + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +// UTILITIES + +// replAdmin provides utilites to manage source and target database. +type replAdmin struct { + source *sql.DB + sourceSchema ident.Schema + target *sql.DB + targetSchema ident.Schema + restarted bool +} + +type tableMapping struct { + source ident.Table + sourceCdc ident.Table + target ident.Table +} + +// cdcCmd issues a command to the CDC service on the source +// database, waiting for the given string to appear in CDC status, if supplied. +func (r *replAdmin) cdcCmd(ctx context.Context, command string, expect string) (bool, error) { + query := fmt.Sprintf("VALUES ASNCDC.ASNCDCSERVICES('%s','asncdc') ", command) + out, err := r.source.QueryContext(ctx, query) + if err != nil { + return false, err + } + defer out.Close() + for out.Next() { + var msg string + err := out.Scan(&msg) + if err != nil { + return false, err + } + done := strings.Contains(msg, expect) + if done { + return true, nil + } + } + return false, nil +} + +// cdcWaitFor waits until the given string is seen in the CDC status. +func (r *replAdmin) cdcWaitFor(ctx context.Context, expect string) error { + done := false + var err error + for !done { + done, err = r.cdcCmd(ctx, "status", expect) + if err != nil { + return err + } + if !done { + time.Sleep(1000 * time.Millisecond) + } + } + return nil +} + +// cdcRestart restarts CDC in the source database +func (r *replAdmin) cdcRestart(ctx context.Context) error { + err := r.cdcStop(ctx) + if err != nil { + return err + } + return r.cdcStart(ctx) +} + +// cdcStart starts CDC in the source database +func (r *replAdmin) cdcStart(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "start", "") + if err != nil { + return err + } + r.restarted = true + return r.cdcWaitFor(ctx, "is doing work") +} + +// cdcReinit re-initializes CDC in the source database +func (r *replAdmin) cdcReinit(ctx context.Context) error { + up, err := r.cdcCmd(ctx, "status", "is doing work") + if err != nil { + return err + } + // We try to re-init first, since it's faster. + // If that fails, we restart it. + if up { + reinit, err := r.cdcCmd(ctx, "reinit", "REINIT") + if err != nil { + return err + } + if reinit { + log.Info("CDC was already up, we re-init it") + return r.cdcWaitFor(ctx, "is doing work") + } + } + return r.cdcRestart(ctx) +} + +// cdcStop stops CDC in the source database +func (r *replAdmin) cdcStop(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "stop", "") + if err != nil { + return err + } + return r.cdcWaitFor(ctx, "asncap is not running") +} + +func (r *replAdmin) cleanUp(ctx context.Context) error { + // if we restarted CDC, we need it to stop it. + if r.restarted { + err := r.cdcStop(ctx) + if err != nil { + return err + } + } + return r.source.Close() +} + +// createRepl sets up the replication environment. +// It creates a schema in the DB2 source. +func createRepl( + ctx *stopper.Context, fixture *base.Fixture, fc *fixtureConfig, tblName ident.Ident, +) (*replAdmin, *Config, error) { + + dbName, _ := fixture.TargetSchema.Schema().Split() + // For now, we are using Debezium stored procedure to add tables to CDC. + // The stored procedure does not deal with case sensitivity well. + // TODO (silvano): fix stored procedure. + sourceSchema := ident.MustSchema(ident.New( + strings.ToUpper(strings.ReplaceAll(dbName.Raw(), "-", "_")))) + + var tbl ident.Table + if !tblName.Empty() { + tbl = ident.NewTable(fixture.TargetSchema.Schema(), tblName) + } + config, err := getConfig(fixture, fc, tbl, + sourceSchema, fixture.TargetSchema.Schema()) + if err != nil { + return nil, nil, err + } + conn := &conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } + db, err := conn.Open() + if err != nil { + return nil, nil, err + } + // Create a schema on the source to store all the tables. + _, err = db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA %s", sourceSchema)) + if err != nil { + return nil, nil, err + } + // To speed up testing, we reduced commit/sleep/monitor interval to decrease latency. + // With this settings the Capture program has a significant impact on foreground + // activities, and they are NOT recommended in a production env. + // COMMIT_INTERVAL = interval in seconds for how often the Capture log reader thread commits. + // SLEEP_INTERVAL = interval in seconds that the Capture program sleeps when idle. + _, err = db.ExecContext(ctx, "UPDATE ASNCDC.IBMSNAP_CAPPARMS SET COMMIT_INTERVAL=2,SLEEP_INTERVAL=1") + if err != nil { + return nil, nil, err + } + // We start the replication from the current max log sequence number + lsn, err := getCurrentLsn(ctx, db) + if err != nil { + return nil, nil, err + } + config.DefaultConsistentPoint = fmt.Sprintf("%x", lsn.Value) + log.Infof("Starting consistent point %x", lsn.Value) + return &replAdmin{ + source: db, + sourceSchema: sourceSchema, + target: fixture.TargetPool.DB, + targetSchema: fixture.TargetSchema.Schema(), + }, config, nil +} + +func (r *replAdmin) createTables( + ctx context.Context, table tableInfo, +) (*tableMapping, func() error, error) { + // TODO (silvano): fix case sensitivity. + tb := ident.New(strings.ToUpper(table.name.Raw())) + tgt := ident.NewTable(r.targetSchema, tb) + src := ident.NewTable(r.sourceSchema, tb) + cdc := ident.NewTable(cdcSchema, ident.New(strings.ToUpper(fmt.Sprintf("CDC_%s_%s", + r.sourceSchema.Raw(), tb.Raw())))) + res := &tableMapping{ + source: src, + sourceCdc: cdc, + target: tgt, + } + // Create the schema in both locations. + _, err := r.source.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s ("PK" INT PRIMARY KEY not null, %s %s)`, + src, table.sourceColName, table.sourceColType)) + if err != nil { + return &tableMapping{}, nil, err + } + _, err = r.target.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s ("PK" INT PRIMARY KEY, %s %s)`, + tgt, table.targetColName, table.targetColType)) + if err != nil { + return &tableMapping{}, nil, err + } + // enable CDC for the table + _, err = r.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw())) + if err != nil { + return &tableMapping{}, nil, err + } + return res, func() error { + query := fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw()) + _, err := r.source.ExecContext(ctx, query) + return err + }, nil +} + +// getConfig is an helper function to create a configuration for the connector +func getConfig( + fixture *base.Fixture, + fc *fixtureConfig, + tbl ident.Table, + dbName ident.Schema, + targetSchema ident.Schema, +) (*Config, error) { + + conn := defaultSourceConn + if found := os.Getenv("TEST_DB2_CONN"); len(found) > 0 { + conn = found + } + log.Infof("Starting source table: %s %s %v", conn, tbl, fc) + config := &Config{ + BaseConfig: logical.BaseConfig{ + ApplyTimeout: 5 * time.Minute, // Increase to make using the debugger easier. + ChaosProb: fc.chaosProb, + Immediate: fc.immediate, + RetryDelay: 100 * time.Millisecond, + StagingSchema: fixture.StagingDB.Schema(), + TargetConn: fixture.TargetPool.ConnectionString, + }, + LoopConfig: logical.LoopConfig{ + LoopName: "db2", + TargetSchema: targetSchema, + }, + SourceConn: conn, + SourceSchema: dbName, + } + if fc.backfill { + config.BackfillWindow = time.Minute + } + if fc.script { + config.ScriptConfig = script.Config{ + FS: scripttest.ScriptFSFor(tbl), + MainPath: "/testdata/logical_test_db2.ts", + } + } + return config, config.Preflight() +} + +// getCurrentLsn retrieves the latest sequence number in the monitoring tables. +func getCurrentLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, "select MAX(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON") + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + if len(v) == 0 { + return lsnZero(), nil + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +func (r *replAdmin) insertValues( + ctx context.Context, tbl ident.Table, values []any, insertNull bool, +) error { + tx, err := r.source.Begin() + if err != nil { + return err + } + defer tx.Rollback() + for valIdx, value := range values { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (?, ?)`, tbl), valIdx, value); err != nil { + return err + } + } + if insertNull { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (-1, NULL)`, tbl)); err != nil { + return err + } + } + return tx.Commit() +} diff --git a/internal/source/db2/lsn.go b/internal/source/db2/lsn.go new file mode 100644 index 000000000..f44f7f0dc --- /dev/null +++ b/internal/source/db2/lsn.go @@ -0,0 +1,93 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/pkg/errors" +) + +// lsn represents the offset, in bytes, of a log record from the beginning of a database log file +type lsn struct { + Value []byte +} + +type lsnRange struct { + from *lsn + to *lsn +} + +var _ stamp.Stamp = (*lsn)(nil) +var _ fmt.Stringer = (*lsn)(nil) + +const zero = "00000000000000000000000000000000" + +// Less implements stamp.Stamp. +func (l *lsn) Less(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Compare(l.Value, o.Value) < 0 +} + +// Equal check if two log sequence numbers are the same. +func (l *lsn) Equal(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Equal(l.Value, o.Value) +} + +// String implements fmt.Stringer. +func (l *lsn) String() string { + return fmt.Sprintf("%x", l.Value) +} + +type lsnMemo struct { + Value []byte `json:"value,omitempty"` +} + +func (l *lsn) MarshalJSON() ([]byte, error) { + p := lsnMemo{Value: l.Value} + return json.Marshal(p) +} + +func (l *lsn) UnmarshalJSON(data []byte) error { + var p lsnMemo + if err := json.Unmarshal(data, &p); err != nil { + return errors.WithStack(err) + } + l.Value = p.Value + return nil +} + +// UnmarshalText supports CLI flags and default values. +func (l *lsn) UnmarshalText(data []byte) (err error) { + if len(data) == 0 { + l.Value, _ = hex.DecodeString(zero) + } + l.Value, err = hex.DecodeString(string(data)) + return +} + +func lsnZero() *lsn { + z, _ := hex.DecodeString(zero) + return &lsn{ + Value: z, + } +} diff --git a/internal/source/db2/lsn_test.go b/internal/source/db2/lsn_test.go new file mode 100644 index 000000000..e9e74001f --- /dev/null +++ b/internal/source/db2/lsn_test.go @@ -0,0 +1,127 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func parse(t *testing.T, s string) *lsn { + o, err := hex.DecodeString(s) + require.NoError(t, err) + return &lsn{Value: o} +} + +// TestLess verifies stamp.Stamp.Less +func TestLess(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, false}, + {"zero_one", zero, "00000000000000000000000000000001", true}, + {"one_zero", "00000000000000000000000000000001", zero, false}, + {"zero_max", zero, "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", true}, + {"max_zero", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", zero, false}, + {"max", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Less(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestLess verifies equality between two Log Sequence Number +func TestEqual(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, true}, + {"zero_one", zero, "00000000000000000000000000000001", false}, + {"one_zero", "00000000000000000000000000000001", zero, false}, + {"zero_max", zero, "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", false}, + {"max_zero", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", zero, false}, + {"max", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Equal(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestJSON verifies marshalling to JSON and unmarshalling from JSON. +func TestJSON(t *testing.T) { + tests := []struct { + name string + one string + want []byte + }{ + {"zero", zero, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x3d, 0x3d, 0x22, 0x7d}}, + {"one", "00000000000000000000000000000001", []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x51, 0x3d, 0x3d, 0x22, 0x7d}}, + {"max", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x77, 0x3d, 0x3d, 0x22, 0x7d}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + got, err := one.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, tt.want, got) + // verify roundtrip + rt := &lsn{} + rt.UnmarshalJSON(got) + assert.Equal(t, one.Value, rt.Value) + }) + } +} + +// TestUnmarshalText verifies unmarshalling from a text string. +func TestUnmarshalText(t *testing.T) { + + tests := []struct { + name string + text string + }{ + {"zero", zero}, + {"one", "00000000000000000000000000000001"}, + {"max", "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.text) + rt := &lsn{} + rt.UnmarshalText([]byte(tt.text)) + assert.Equal(t, one.Value, rt.Value) + }) + } +} diff --git a/internal/source/db2/metrics.go b/internal/source/db2/metrics.go new file mode 100644 index 000000000..ef420d139 --- /dev/null +++ b/internal/source/db2/metrics.go @@ -0,0 +1,53 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + batchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_latency", + Help: "the length of time it took to process a batch", + Buckets: metrics.LatencyBuckets, + }) + batchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_size", + Help: "the size of batch", + Buckets: metrics.Buckets(1, 10000), + }) + mutationCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "db2_mutations_total", + Help: "Total number of mutations by source table.", + }, + []string{"table", "op"}, + ) + queryLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "db2_query_latency", + Help: "the length of time it took to process a query", + Buckets: metrics.LatencyBuckets, + }, []string{"table"}) + rollbackBatchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_rollback_size", + Help: "the size of a rollback", + Buckets: metrics.Buckets(1, 10000), + }) +) diff --git a/internal/source/db2/operation_string.go b/internal/source/db2/operation_string.go new file mode 100644 index 000000000..df1d306ce --- /dev/null +++ b/internal/source/db2/operation_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=operation"; DO NOT EDIT. + +package db2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[unknownOp-0] + _ = x[deleteOp-1] + _ = x[insertOp-2] + _ = x[updateOp-3] + _ = x[beginOp-4] + _ = x[endOp-5] + _ = x[rollbackOp-6] +} + +const _operation_name = "unknownOpdeleteOpinsertOpupdateOpbeginOpendOprollbackOp" + +var _operation_index = [...]uint8{0, 9, 17, 25, 33, 40, 45, 55} + +func (i operation) String() string { + if i < 0 || i >= operation(len(_operation_index)-1) { + return "operation(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _operation_name[_operation_index[i]:_operation_index[i+1]] +} diff --git a/internal/source/db2/provider.go b/internal/source/db2/provider.go new file mode 100644 index 000000000..057fe4258 --- /dev/null +++ b/internal/source/db2/provider.go @@ -0,0 +1,49 @@ +// Copyright 2025 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/google/wire" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideDialect, + ProvideLoop, +) + +// ProvideDialect is called by Wire to construct this package's +// logical.Dialect implementation. There's a fake dependency on +// the script loader so that flags can be evaluated first. +func ProvideDialect(config *Config, _ *script.Loader) (logical.Dialect, error) { + if err := config.Preflight(); err != nil { + return nil, err + } + + return New(config), nil +} + +// ProvideLoop is called by Wire to construct the sole logical loop used +// in the db2 mode. +func ProvideLoop( + cfg *Config, dialect logical.Dialect, loops *logical.Factory, +) (*logical.Loop, error) { + cfg.Dialect = dialect + return loops.Start(&cfg.LoopConfig) +} diff --git a/internal/source/db2/queries.go b/internal/source/db2/queries.go new file mode 100644 index 000000000..d0febc079 --- /dev/null +++ b/internal/source/db2/queries.go @@ -0,0 +1,354 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/pkg/errors" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" +) + +var ( + + // maxLsn is the max Log Sequence Number stored in the staging tables. + // This is used during live capture of events + maxLsn = `SELECT max(t.SYNCHPOINT) + FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM + ASNCDC.IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM ASNCDC.IBMSNAP_REGISTER) t + ` + + // nexLsn is the Log Sequence Number of the next batch from monitoring table. + // This is used during catch up, to batch mutations. + // The monitoring table is refreshed periodically based on the MONITOR_INTERVAL parameter as + // set it the ASNCDC.IBMSNAP_CAPPARMS table. By default, it set to 300 seconds. + nextLsn = ` + select MIN(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON where CD_ROWS_INSERTED > 0 AND RESTART_SEQ > ? + ` + + // get mutations + db2CdcTables = ` + SELECT r.SOURCE_OWNER, r.SOURCE_TABLE, + r.CD_OWNER, r.CD_TABLE, + r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, + t.TBSPACEID, t.TABLEID + FROM + ASNCDC.IBMSNAP_REGISTER r + LEFT JOIN SYSCAT.TABLES t + ON r.SOURCE_OWNER = t.TABSCHEMA + AND r.SOURCE_TABLE = t.TABNAME + WHERE r.SOURCE_OWNER <> '' + ` + // get mutations for a schema + db2CdcTablesForSchema = ` + SELECT r.SOURCE_OWNER, r.SOURCE_TABLE, + r.CD_OWNER, r.CD_TABLE, + r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, + t.TBSPACEID, t.TABLEID + FROM + ASNCDC.IBMSNAP_REGISTER r + LEFT JOIN SYSCAT.TABLES t + ON r.SOURCE_OWNER = t.TABSCHEMA + AND r.SOURCE_TABLE = t.TABNAME + WHERE r.SOURCE_OWNER = ? + ` + // Note: we skip deletes before updates. + changeQuery = ` + SELECT * FROM + (SELECT CASE + WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 0 + WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 3 + WHEN IBMSNAP_OPERATION = 'D' THEN 1 + WHEN IBMSNAP_OPERATION = 'I' THEN 2 + END + OPCODE, + cdc.* + FROM # cdc WHERE IBMSNAP_COMMITSEQ > ? AND IBMSNAP_COMMITSEQ <= ? + order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ) + WHERE OPCODE != 0 + ` + columnQuery = ` + SELECT c.colname,c.keyseq, c.TYPENAME + FROM syscat.tables as t + inner join syscat.columns as c + on t.tabname = c.tabname + and t.tabschema = c.tabschema + WHERE t.tabschema = ? AND t.tabname = ? + ORDER by c.COLNO + ` +) + +type cdcTable struct { + sourceOwner string + sourceTable string + cdOwner string + cdTable string + cdNewSyc []byte + cdOldSyc []byte + spaceID int + tableID int +} + +type message struct { + table cdcTable + op operation + lsn lsn + values []any +} + +//go:generate go run golang.org/x/tools/cmd/stringer -type=operation + +type operation int + +const ( + unknownOp operation = iota + deleteOp + insertOp + updateOp + beginOp + endOp + rollbackOp +) + +// Open a new connection to the source database. +func (c *conn) Open() (*sql.DB, error) { + con := fmt.Sprintf("HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=%s", + c.config.host, + c.config.database, + c.config.port, + c.config.user, + c.config.password) + log.Debugf("DB2 connection: HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=...", + c.config.host, + c.config.database, + c.config.port, + c.config.user) + + return sql.Open("go_ibm_db", con) +} + +// postMutations finds all the rows that have been changed on the given table +// and posts them to the channel used by the logical loop to be processed. +func (c *conn) postMutations( + ctx *stopper.Context, db *sql.DB, cdcTable cdcTable, lsnRange lsnRange, ch chan<- logical.Message, +) (int, error) { + count := 0 + // TODO (silvano): memoize, use templates + table := ident.NewTable(ident.MustSchema(ident.New(cdcTable.cdOwner)), ident.New(cdcTable.cdTable)) + query := strings.ReplaceAll(changeQuery, "#", table.Raw()) + // TODO (silvano): we might want to run in batches with a limit. + rows, err := db.QueryContext(ctx, query, lsnRange.from.Value, lsnRange.to.Value) + if err != nil { + return 0, err + } + defer rows.Close() + cols, _ := rows.ColumnTypes() + // the first four columns are metadata (opcode, commit_seq, intent_seq, op_id) + // the remaining columns contain the + numMetaColums := 4 + for rows.Next() { + count++ + mut := len(cols) + res := make([]any, mut) + ptr := make([]any, mut) + for i := range cols { + ptr[i] = &res[i] + } + err = rows.Scan(ptr...) + if err != nil { + log.Fatal(err) + } + msg := message{ + table: cdcTable, + } + // first column is the opcode + op, ok := res[0].(int32) + if !ok { + return count, errors.Errorf("invalid operation %T", res[0]) + } + msg.op = operation(op) + + // second column is the LSN of the committed transaction. + cs, ok := res[1].([]byte) + if !ok { + return count, errors.Errorf("invalid commit sequence %v", res[1]) + } + msg.lsn = lsn{Value: cs} + + msg.values = make([]any, len(res)-4) + vcols := cols[numMetaColums:] + for i, v := range res[numMetaColums:] { + if v == nil { + msg.values[i] = nil + continue + } + switch vcols[i].DatabaseTypeName() { + case "VARCHAR", "CHAR", "CLOB": + s, _ := v.([]byte) + msg.values[i] = string(s) + case "DECIMAL", "DECFLOAT": + s, _ := v.([]byte) + msg.values[i], err = decimal.NewFromString(string(s)) + if err != nil { + errors.Wrapf(err, "cannot convert decimal %s", s) + return count, err + } + default: + msg.values[i] = v + } + } + select { + case ch <- msg: + case <-ctx.Stopping(): + return -1, nil + } + } + return count, nil +} + +// getMaxLsn retrieves the latest sequence number in all the staging tables. +func (c *conn) getMaxLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, maxLsn) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + if len(v) == 0 { + return lsnZero(), nil + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +// getNextLsn retrieves the next sequence number +func (c *conn) getNextLsn(ctx *stopper.Context, db *sql.DB, current *lsn) (*lsn, error) { + // Checking the monitoring table for a batch of mutations. + rows, err := db.QueryContext(ctx, nextLsn, current.Value) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + // if we don't have a LSN from the monitoring table, + // we can check the staging tables directly. + if len(v) == 0 { + return c.getMaxLsn(ctx, db) + } + return &lsn{ + Value: v, + }, err + } + return nil, err + } + return lsnZero(), nil +} + +// fetchColumnMetadata fetches the column metadata for a given table. +func (c *conn) fetchColumnMetadata(ctx *stopper.Context, db *sql.DB, t *cdcTable) error { + sourceTable := ident.NewTable( + ident.MustSchema(ident.New(t.sourceOwner)), + ident.New(t.sourceTable)) + + if _, ok := c.columns.Get(sourceTable); ok { + return nil + } + rows, err := db.QueryContext(ctx, columnQuery, sourceTable.Schema().Raw(), sourceTable.Table().Raw()) + if err != nil { + return err + } + defer rows.Close() + colData := make([]types.ColData, 0) + primaryKeys := make(map[int]int) + idx := 0 + for rows.Next() { + var name, typ string + var pk *int + err = rows.Scan(&name, &pk, &typ) + if err != nil { + return err + } + if pk != nil { + primaryKeys[*pk] = idx + } + log.Tracef("Table %s col %q %q primary?: %t", sourceTable, name, typ, pk != nil) + colData = append(colData, types.ColData{ + Name: ident.New(name), + Primary: pk != nil, + Type: typ, + }) + idx++ + } + c.columns.Put(sourceTable, colData) + c.primaryKeys.Put(sourceTable, primaryKeys) + + return nil +} + +func queryTables(ctx *stopper.Context, db *sql.DB, schema ident.Schema) (*sql.Rows, error) { + if schema.Empty() { + return db.QueryContext(ctx, db2CdcTables) + } + return db.QueryContext(ctx, db2CdcTablesForSchema, schema.Raw()) +} + +// getTables returns the tables that have replication enabled. +func (c *conn) getTables( + ctx *stopper.Context, db *sql.DB, schema ident.Schema, +) ([]cdcTable, error) { + rows, err := queryTables(ctx, db, schema) + if err != nil { + return nil, err + } + defer rows.Close() + result := make([]cdcTable, 0) + for rows.Next() { + var res cdcTable + err = rows.Scan(&res.sourceOwner, + &res.sourceTable, + &res.cdOwner, + &res.cdTable, + &res.cdNewSyc, + &res.cdOldSyc, + &res.spaceID, + &res.tableID, + ) + if err != nil { + return nil, err + } + result = append(result, res) + } + return result, nil +} diff --git a/internal/source/db2/wire_gen.go b/internal/source/db2/wire_gen.go new file mode 100644 index 000000000..9a83c656d --- /dev/null +++ b/internal/source/db2/wire_gen.go @@ -0,0 +1,90 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/staging/memo" + "github.com/cockroachdb/cdc-sink/internal/staging/version" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" + "github.com/cockroachdb/cdc-sink/internal/util/applycfg" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" +) + +// Injectors from injector.go: + +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + diagnostics := diag.New(ctx) + scriptConfig, err := logical.ProvideUserScriptConfig(config) + if err != nil { + return nil, err + } + loader, err := script.ProvideLoader(scriptConfig) + if err != nil { + return nil, err + } + dialect, err := ProvideDialect(config, loader) + if err != nil { + return nil, err + } + baseConfig, err := logical.ProvideBaseConfig(config, loader) + if err != nil { + return nil, err + } + targetPool, err := logical.ProvideTargetPool(ctx, baseConfig, diagnostics) + if err != nil { + return nil, err + } + targetStatements, err := logical.ProvideTargetStatements(ctx, baseConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + dlqConfig := logical.ProvideDLQConfig(baseConfig) + watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(dlqConfig, targetPool, watchers) + appliers, err := apply.ProvideFactory(ctx, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + stagingPool, err := logical.ProvideStagingPool(ctx, baseConfig, diagnostics) + if err != nil { + return nil, err + } + stagingSchema, err := logical.ProvideStagingDB(baseConfig) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + checker := version.ProvideChecker(stagingPool, memoMemo) + factory, err := logical.ProvideFactory(ctx, appliers, configs, baseConfig, diagnostics, memoMemo, loader, stagingPool, targetPool, watchers, checker) + if err != nil { + return nil, err + } + loop, err := ProvideLoop(config, dialect, factory) + if err != nil { + return nil, err + } + db2 := &DB2{ + Diagnostics: diagnostics, + Loop: loop, + } + return db2, nil +} diff --git a/main.go b/main.go index 865f290b0..c42036bcf 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/cockroachdb/cdc-sink/internal/cmd/db2" "github.com/cockroachdb/cdc-sink/internal/cmd/dumphelp" "github.com/cockroachdb/cdc-sink/internal/cmd/dumptemplates" "github.com/cockroachdb/cdc-sink/internal/cmd/fslogical" @@ -104,6 +105,8 @@ func main() { f.CountVarP(&verbosity, "verbose", "v", "increase logging verbosity to debug; repeat for trace") root.AddCommand( + db2.Command(), + db2.Install(), dumphelp.Command(), dumptemplates.Command(), fslogical.Command(),