diff --git a/.github/db2/Dockerfile b/.github/db2/Dockerfile new file mode 100644 index 00000000..57988f08 --- /dev/null +++ b/.github/db2/Dockerfile @@ -0,0 +1,31 @@ +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +FROM icr.io/db2_community/db2 + +LABEL maintainer="Debezium Community" + +RUN mkdir -p /asncdctools/src + +ADD asncdc_UDF.sql /asncdctools/src +ADD asncdcaddremove.sql /asncdctools/src +ADD asncdctables.sql /asncdctools/src +ADD dbsetup.sh /asncdctools/src +ADD startup-agent.sql /asncdctools/src +ADD startup-cdc-demo.sql /asncdctools/src +ADD inventory.sql /asncdctools/src +ADD asncdc.c /asncdctools/src + +RUN mkdir /var/custom && \ + chmod -R 777 /asncdctools && \ + chmod -R 777 /var/custom + +ADD cdcsetup.sh /var/custom +ADD custom-init /var/custom-init + +RUN chmod -R 777 /var/custom-init + +ADD openshift_entrypoint.sh /var/db2_setup/lib + +RUN chmod 777 /var/custom/cdcsetup.sh && \ + chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh diff --git a/.github/db2/README.md b/.github/db2/README.md new file mode 100644 index 00000000..c80921dd --- /dev/null +++ b/.github/db2/README.md @@ -0,0 +1,8 @@ +# DB2 container + +This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables. + +Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. + +Original Code: +https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server diff --git a/.github/db2/asncdc.c b/.github/db2/asncdc.c new file mode 100644 index 00000000..4be00b96 --- /dev/null +++ b/.github/db2/asncdc.c @@ -0,0 +1,164 @@ +// Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +// https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server +#include +#include +#include +#include +#include +#include + +void SQL_API_FN asncdcservice( + SQLUDF_VARCHAR *asnCommand, /* input */ + SQLUDF_VARCHAR *asnService, + SQLUDF_CLOB *fileData, /* output */ + /* null indicators */ + SQLUDF_NULLIND *asnCommand_ind, /* input */ + SQLUDF_NULLIND *asnService_ind, + SQLUDF_NULLIND *fileData_ind, + SQLUDF_TRAIL_ARGS, + struct sqludf_dbinfo *dbinfo) +{ + + int fd; + char tmpFileName[] = "/tmp/fileXXXXXX"; + fd = mkstemp(tmpFileName); + + int strcheck = 0; + char cmdstring[256]; + + + char* szDb2path = getenv("HOME"); + + + + char str[20]; + int len = 0; + char c; + char *buffer = NULL; + FILE *pidfile; + + char dbname[129]; + memset(dbname, '\0', 129); + strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen); + dbname[dbinfo->dbnamelen] = '\0'; + + int pid; + if (strcmp(asnService, "asncdc") == 0) + { + strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName); + int callcheck; + callcheck = system(cmdstring); + pidfile = fopen(tmpFileName, "r"); + while ((c = fgetc(pidfile)) != EOF) + { + if (c == '\n') + { + break; + } + len++; + } + buffer = (char *)malloc(sizeof(char) * len); + fseek(pidfile, 0, SEEK_SET); + fread(buffer, sizeof(char), len, pidfile); + fclose(pidfile); + pidfile = fopen(tmpFileName, "w"); + if (strcmp(asnCommand, "start") == 0) + { + if (len == 0) // is not running + { + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname); + fprintf(pidfile, "start --> %s \n", cmdstring); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is already running"); + } + } + if ((strcmp(asnCommand, "prune") == 0) || + (strcmp(asnCommand, "reinit") == 0) || + (strcmp(asnCommand, "suspend") == 0) || + (strcmp(asnCommand, "resume") == 0) || + (strcmp(asnCommand, "status") == 0) || + (strcmp(asnCommand, "stop") == 0)) + { + if (len > 0) + { + //buffer[len] = '\0'; + //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer); + //fprintf(pidfile, "stop --> %s", cmdstring); + //callcheck = system(cmdstring); + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName); + //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is not running"); + } + } + + fclose(pidfile); + } + /* system(cmdstring); */ + + int rc = 0; + long fileSize = 0; + size_t readCnt = 0; + FILE *f = NULL; + + f = fopen(tmpFileName, "r"); + if (!f) + { + strcpy(SQLUDF_MSGTX, "Could not open file "); + strncat(SQLUDF_MSGTX, tmpFileName, + SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1); + strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN); + return; + } + + rc = fseek(f, 0, SEEK_END); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN); + return; + } + + /* verify the file size */ + fileSize = ftell(f); + if (fileSize > fileData->length) + { + strcpy(SQLUDF_MSGTX, "File too large"); + strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN); + return; + } + + /* go to the beginning and read the entire file */ + rc = fseek(f, 0, 0); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN); + return; + } + + readCnt = fread(fileData->data, 1, fileSize, f); + if (readCnt != fileSize) + { + /* raise a warning that something weird is going on */ + sprintf(SQLUDF_MSGTX, "Could not read entire file " + "(%d vs %d)", + readCnt, fileSize); + strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN); + *fileData_ind = -1; + } + else + { + fileData->length = readCnt; + *fileData_ind = 0; + } + // remove temorary file + rc = remove(tmpFileName); + //fclose(pFile); +} diff --git a/.github/db2/asncdc_UDF.sql b/.github/db2/asncdc_UDF.sql new file mode 100644 index 00000000..06d51770 --- /dev/null +++ b/.github/db2/asncdc_UDF.sql @@ -0,0 +1,20 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +DROP SPECIFIC FUNCTION ASNCDC.asncdcservice; + +CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8)) + RETURNS CLOB(100K) + SPECIFIC asncdcservice + EXTERNAL NAME 'asncdc!asncdcservice' + LANGUAGE C + PARAMETER STYLE SQL + DBINFO + DETERMINISTIC + NOT FENCED + RETURNS NULL ON NULL INPUT + NO SQL + NO EXTERNAL ACTION + NO SCRATCHPAD + ALLOW PARALLEL + NO FINAL CALL; \ No newline at end of file diff --git a/.github/db2/asncdcaddremove.sql b/.github/db2/asncdcaddremove.sql new file mode 100644 index 00000000..eef38eef --- /dev/null +++ b/.github/db2/asncdcaddremove.sql @@ -0,0 +1,196 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +-- +-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE() +-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table +-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table +-- + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE stmtSQL VARCHAR(2048); + +DECLARE SQLCODE INT; +DECLARE SQLSTATE CHAR(5); +DECLARE RC_SQLCODE INT DEFAULT 0; +DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000'; + +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE; + +-- delete ASN.IBMSNAP_PRUNCTL entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_Register entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- drop CD Table / source +SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename ; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUBS_COLS entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_COLVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2'; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_TABVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE SQLSTATE CHAR(5); + +DECLARE stmtSQL VARCHAR(2048); + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' AS ( SELECT ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' || + ' CAST ('''' AS CHAR(1)) ' || + ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' || + tableschema || '_' || tablename || + ' ON ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' VOLATILE CARDINALITY'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' || + 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' || + 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' || + 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' || + 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' || + 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' || + 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' || + 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''N'', ' || + '1, ' || + '''Y'', ' || + '''Y'', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + 'null, ' || + 'null, ' || + '0, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + '''0801'', ' || + 'null, ' || + 'null, ' || + '''0'', ' || + '''Y'', ' || + '''N'', ' || + '''Y'', ' || + '''NNNN'', ' || + '''Y'', ' || + '''A'',' || + 'null ) '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || + 'TARGET_SERVER, ' || + 'TARGET_OWNER, ' || + 'TARGET_TABLE, ' || + 'SYNCHTIME, ' || + 'SYNCHPOINT, ' || + 'SOURCE_OWNER, ' || + 'SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, ' || + 'APPLY_QUAL, ' || + 'SET_NAME, ' || + 'CNTL_SERVER , ' || + 'TARGET_STRUCTURE , ' || + 'CNTL_ALIAS , ' || + 'PHYS_CHANGE_OWNER , ' || + 'PHYS_CHANGE_TABLE , ' || + 'MAP_ID ' || + ') VALUES ( ' || + '''KAFKA'', ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + 'NULL, ' || + 'NULL, ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''KAFKAQUAL'', ' || + '''SET001'', ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '8, ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || + ' )'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; diff --git a/.github/db2/asncdctables.sql b/.github/db2/asncdctables.sql new file mode 100644 index 00000000..a8695364 --- /dev/null +++ b/.github/db2/asncdctables.sql @@ -0,0 +1,482 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150 + +CREATE TABLE ASNCDC.IBMQREP_COLVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +POSITION SMALLINT NOT NULL, +NAME VARCHAR(128) NOT NULL, +TYPE SMALLINT NOT NULL, +LENGTH INTEGER NOT NULL, +NULLS CHAR( 1) NOT NULL, +DEFAULT VARCHAR(1536), +CODEPAGE INTEGER, +SCALE INTEGER, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX +ON ASNCDC.IBMQREP_COLVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +POSITION ASC); + +CREATE INDEX ASNCDC.IX2COLVERSION +ON ASNCDC.IBMQREP_COLVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE TABLE ASNCDC.IBMQREP_TABVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +VERSION INTEGER NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_NAME VARCHAR(128) NOT NULL, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX +ON ASNCDC.IBMQREP_TABVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +VERSION ASC); + +CREATE INDEX ASNCDC.IX2TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE INDEX ASNCDC.IX3TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +SOURCE_OWNER ASC, +SOURCE_NAME ASC); + +CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL( +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021') +ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES ( +'1021'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME TIMESTAMP NOT NULL, +RESTART_TIME TIMESTAMP NOT NULL, +CURRENT_MEMORY INT NOT NULL, +CD_ROWS_INSERTED INT NOT NULL, +RECAP_ROWS_SKIPPED INT NOT NULL, +TRIGR_ROWS_SKIPPED INT NOT NULL, +CHG_ROWS_SKIPPED INT NOT NULL, +TRANS_PROCESSED INT NOT NULL, +TRANS_SPILLED INT NOT NULL, +MAX_TRANS_SIZE INT NOT NULL, +LOCKING_RETRIES INT NOT NULL, +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +LOGREADLIMIT INT NOT NULL, +CAPTURE_IDLE INT NOT NULL, +SYNCHTIME TIMESTAMP NOT NULL, +CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT , +LAST_EOL_TIME TIMESTAMP, +RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +LOGREAD_API_TIME INT, +NUM_LOGREAD_CALLS INT, +NUM_END_OF_LOGS INT, +LOGRDR_SLEEPTIME INT, +NUM_LOGREAD_F_CALLS INT, +TRANS_QUEUED INT, +NUM_WARNTXS INT, +NUM_WARNLOGAPI INT) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX +ON ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT INT, +LAG_LIMIT INT, +COMMIT_INTERVAL INT, +PRUNE_INTERVAL INT, +TRACE_LIMIT INT, +MONITOR_LIMIT INT, +MONITOR_INTERVAL INT, +MEMORY_LIMIT SMALLINT, +REMOTE_SRC_SERVER CHAR( 18), +AUTOPRUNE CHAR( 1), +TERM CHAR( 1), +AUTOSTOP CHAR( 1), +LOGREUSE CHAR( 1), +LOGSTDOUT CHAR( 1), +SLEEP_INTERVAL SMALLINT, +CAPTURE_PATH VARCHAR(1040), +STARTMODE VARCHAR( 10), +LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256, +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021', +COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021') + ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT, +LAG_LIMIT, +COMMIT_INTERVAL, +PRUNE_INTERVAL, +TRACE_LIMIT, +MONITOR_LIMIT, +MONITOR_INTERVAL, +MEMORY_LIMIT, +SLEEP_INTERVAL, +AUTOPRUNE, +TERM, +AUTOSTOP, +LOGREUSE, +LOGSTDOUT, +CAPTURE_PATH, +STARTMODE, +COMPATIBILITY) +VALUES ( +10080, +10080, +30, +300, +10080, +10080, +300, +32, +5, +'Y', +'Y', +'N', +'N', +'N', +NULL, +'WARMSI', +'1021' +); + +CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS ( + CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL + ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX + ON ASNCDC.IBMSNAP_CAPSCHEMAS + (CAP_SCHEMA_NAME ASC); + +INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES ( +'ASNCDC'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE( +OPERATION CHAR( 8) NOT NULL, +TRACE_TIME TIMESTAMP NOT NULL, +DESCRIPTION VARCHAR(1024) NOT NULL) + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX +ON ASNCDC.IBMSNAP_CAPTRACE( +TRACE_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL( +TARGET_SERVER CHAR(18) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +CNTL_SERVER CHAR( 18) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +CNTL_ALIAS CHAR( 8), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +MAP_ID VARCHAR(10) NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX +ON ASNCDC.IBMSNAP_PRUNCNTL( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC, +TARGET_TABLE ASC, +TARGET_OWNER ASC); + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1 +ON ASNCDC.IBMSNAP_PRUNCNTL( +MAP_ID ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2 +ON ASNCDC.IBMSNAP_PRUNCNTL( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3 +ON ASNCDC.IBMSNAP_PRUNCNTL( +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK( +DUMMY CHAR( 1)) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER CHAR( 18) NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX +ON ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER ASC, +APPLY_QUAL ASC, +SET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +GLOBAL_RECORD CHAR( 1) NOT NULL, +SOURCE_STRUCTURE SMALLINT NOT NULL, +SOURCE_CONDENSED CHAR( 1) NOT NULL, +SOURCE_COMPLETE CHAR( 1) NOT NULL, +CD_OWNER VARCHAR(128), +CD_TABLE VARCHAR(128), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +DISABLE_REFRESH SMALLINT NOT NULL, +CCD_OWNER VARCHAR(128), +CCD_TABLE VARCHAR(128), +CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CCD_CONDENSED CHAR( 1), +CCD_COMPLETE CHAR( 1), +ARCH_LEVEL CHAR( 4) NOT NULL, +DESCRIPTION CHAR(254), +BEFORE_IMG_PREFIX VARCHAR( 4), +CONFLICT_LEVEL CHAR( 1), +CHG_UPD_TO_DEL_INS CHAR( 1), +CHGONLY CHAR( 1), +RECAPTURE CHAR( 1), +OPTION_FLAGS CHAR( 4) NOT NULL, +STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y', +STATE CHAR( 1) WITH DEFAULT 'I', +STATE_INFO CHAR( 8)) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX +ON ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1 +ON ASNCDC.IBMSNAP_REGISTER( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2 +ON ASNCDC.IBMSNAP_REGISTER( +GLOBAL_RECORD ASC); + +ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_RESTART( +MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +MAX_COMMIT_TIME TIMESTAMP NOT NULL, +MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +CURR_COMMIT_TIME TIMESTAMP NOT NULL, +CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT , +SIGNAL_TYPE VARCHAR( 30) NOT NULL, +SIGNAL_SUBTYPE VARCHAR( 30), +SIGNAL_INPUT_IN VARCHAR(500), +SIGNAL_STATE CHAR( 1) NOT NULL, +SIGNAL_LSN VARCHAR( 16) FOR BIT DATA) +DATA CAPTURE CHANGES + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_SIGNALX +ON ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +COL_TYPE CHAR( 1) NOT NULL, +TARGET_NAME VARCHAR(128) NOT NULL, +IS_KEY CHAR( 1) NOT NULL, +COLNO SMALLINT NOT NULL, +EXPRESSION VARCHAR(1024) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX +ON ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC, +TARGET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY; + +--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX +--ON ASNCDC.IBMSNAP_SUBS_EVENT( +--EVENT_NAME ASC, +--EVENT_TIME ASC); + + +--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +TARGET_CONDENSED CHAR( 1) NOT NULL, +TARGET_COMPLETE CHAR( 1) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +PREDICATES VARCHAR(1024), +MEMBER_STATE CHAR( 1), +TARGET_KEY_CHG CHAR( 1) NOT NULL, +UOW_CD_PREDICATES VARCHAR(1024), +JOIN_UOW_CD CHAR( 1), +LOADX_TYPE SMALLINT, +LOADX_SRC_N_OWNER VARCHAR( 128), +LOADX_SRC_N_TABLE VARCHAR(128)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX +ON ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SET_TYPE CHAR( 1) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +ACTIVATE SMALLINT NOT NULL, +SOURCE_SERVER CHAR( 18) NOT NULL, +SOURCE_ALIAS CHAR( 8), +TARGET_SERVER CHAR( 18) NOT NULL, +TARGET_ALIAS CHAR( 8), +STATUS SMALLINT NOT NULL, +LASTRUN TIMESTAMP NOT NULL, +REFRESH_TYPE CHAR( 1) NOT NULL, +SLEEP_MINUTES INT, +EVENT_NAME CHAR( 18), +LASTSUCCESS TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CAPTURE_SCHEMA VARCHAR(128) NOT NULL, +TGT_CAPTURE_SCHEMA VARCHAR(128), +FEDERATED_SRC_SRVR VARCHAR( 18), +FEDERATED_TGT_SRVR VARCHAR( 18), +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +OPTION_FLAGS CHAR( 4) NOT NULL, +COMMIT_COUNT SMALLINT, +MAX_SYNCH_MINUTES SMALLINT, +AUX_STMTS SMALLINT NOT NULL, +ARCH_LEVEL CHAR( 4) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX +ON ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +BEFORE_OR_AFTER CHAR( 1) NOT NULL, +STMT_NUMBER SMALLINT NOT NULL, +EI_OR_CALL CHAR( 1) NOT NULL, +SQL_STMT VARCHAR(1024), +ACCEPT_SQLSTATES VARCHAR( 50)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX +ON ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +BEFORE_OR_AFTER ASC, +STMT_NUMBER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_UOW( +IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL, +IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +IBMSNAP_LOGMARKER TIMESTAMP NOT NULL, +IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL, +IBMSNAP_AUTHID VARCHAR(128) NOT NULL, +IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT , +IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX +ON ASNCDC.IBMSNAP_UOW( +IBMSNAP_COMMITSEQ ASC, +IBMSNAP_LOGMARKER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPENQ ( + LOCK_NAME CHAR(9 OCTETS) + ) + ORGANIZE BY ROW + DATA CAPTURE NONE + COMPRESS NO; diff --git a/.github/db2/cdcsetup.sh b/.github/db2/cdcsetup.sh new file mode 100755 index 00000000..f7c27fb5 --- /dev/null +++ b/.github/db2/cdcsetup.sh @@ -0,0 +1,21 @@ +#/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +if [ ! -f /asncdctools/src/asncdc.nlk ]; then +rc=1 +echo "waiting for db2inst1 exists ." +while [ "$rc" -ne 0 ] +do + sleep 5 + id db2inst1 + rc=$? + echo '.' +done + +su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1 +fi +touch /asncdctools/src/asncdc.nlk + +echo "CDC setup completed." diff --git a/.github/db2/custom-init/cleanup_storage.sh b/.github/db2/custom-init/cleanup_storage.sh new file mode 100644 index 00000000..8624e0d1 --- /dev/null +++ b/.github/db2/custom-init/cleanup_storage.sh @@ -0,0 +1,17 @@ +#!/bin/bash + + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + + +################################################################################ +# Wipes the storage directory. +# Note that this essentially makes the database non-persistent when pod is deleted +################################################################################ + +echo "Inspecting database directory" +ls $STORAGE_DIR + +echo "Wiping database directory" +rm -rf $STORAGE_DIR/* diff --git a/.github/db2/dbsetup.sh b/.github/db2/dbsetup.sh new file mode 100755 index 00000000..3cbd8134 --- /dev/null +++ b/.github/db2/dbsetup.sh @@ -0,0 +1,56 @@ +#/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + + +echo "Compile ASN tool ..." +cd /asncdctools/src +/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc + +DBNAME=$1 +DB2DIR=/opt/ibm/db2/V11.5 +rc=1 +echo "Waiting for DB2 start ( $DBNAME ) ." +while [ "$rc" -ne 0 ] +do + sleep 5 + db2 connect to $DBNAME + rc=$? + echo '.' +done + +# enable metacatalog read via JDBC +cd $HOME/sqllib/bnd +db2 bind db2schema.bnd blocking all grant public sqlerror continue + +# do a backup and restart the db +db2 backup db $DBNAME to /dev/null +db2 restart db $DBNAME + +db2 connect to $DBNAME + +cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function +chmod 777 /database/config/db2inst1/sqllib/function + +# add UDF / start stop asncap +db2 -tvmf /asncdctools/src/asncdc_UDF.sql + +# create asntables +db2 -tvmf /asncdctools/src/asncdctables.sql + +# add UDF / add remove asntables + +db2 -tvmf /asncdctools/src/asncdcaddremove.sql + + + + +# create sample table and datat +db2 -tvmf /asncdctools/src/inventory.sql +db2 -tvmf /asncdctools/src/startup-agent.sql +sleep 10 +db2 -tvmf /asncdctools/src/startup-cdc-demo.sql + + +echo "done" diff --git a/.github/db2/inventory.sql b/.github/db2/inventory.sql new file mode 100644 index 00000000..1d441d59 --- /dev/null +++ b/.github/db2/inventory.sql @@ -0,0 +1,77 @@ + +-- Create and populate our products using a single insert with many rows +CREATE TABLE DB2INST1.PRODUCTS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('scooter','Small 2-wheel scooter',3.14); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('car battery','12V car battery',8.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','12oz carpenter''s hammer',0.75); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','14oz carpenter''s hammer',0.875); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('hammer','16oz carpenter''s hammer',1.0); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('rocks','box of assorted rocks',5.3); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('jacket','water resistent black wind breaker',0.1); +INSERT INTO DB2INST1.PRODUCTS(name,description,weight) + VALUES ('spare tire','24 inch spare tire',22.2); + +CREATE TABLE DB2INST1.PRODUCTS_ON_HAND ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (101,3); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (102,8); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (103,18); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (104,4); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (105,5); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (106,0); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (107,44); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (108,2); +INSERT INTO DB2INST1.PRODUCTS_ON_HAND VALUES (109,5); + +CREATE TABLE DB2INST1.CUSTOMERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 1001, INCREMENT BY 1) PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE +); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('George','Bailey','gbailey@foobar.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Edward','Walker','ed@walker.com'); +INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) + VALUES ('Anne','Kretchmar','annek@noanswer.org'); + +CREATE TABLE DB2INST1.ORDERS ( + id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY + (START WITH 10001, INCREMENT BY 1) PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY (purchaser) REFERENCES DB2INST1.CUSTOMERS(id), + FOREIGN KEY (product_id) REFERENCES DB2INST1.PRODUCTS(id) +); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-16', 1001, 1, 102); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-01-17', 1002, 2, 105); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-19', 1002, 2, 106); +INSERT INTO DB2INST1.ORDERS(order_date,purchaser,quantity,product_id) + VALUES ('2016-02-21', 1003, 1, 107); diff --git a/.github/db2/openshift_entrypoint.sh b/.github/db2/openshift_entrypoint.sh new file mode 100755 index 00000000..38974dea --- /dev/null +++ b/.github/db2/openshift_entrypoint.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +# https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +################################################################################ +# Runs custom init scripts followed by the original entry point +############################################################################### + +source ${SETUPDIR?}/include/db2_constants +source ${SETUPDIR?}/include/db2_common_functions + +if [[ -d /var/custom-init ]]; then + echo "(*) Running user-provided init scripts ... " + chmod -R 777 /var/custom-init + for script in `ls /var/custom-init`; do + echo "(*) Running $script ..." + /var/custom-init/$script + done +fi + +echo "Running original entry point" +/var/db2_setup/lib/setup_db2_instance.sh diff --git a/.github/db2/startup-agent.sql b/.github/db2/startup-agent.sql new file mode 100644 index 00000000..d72c09b9 --- /dev/null +++ b/.github/db2/startup-agent.sql @@ -0,0 +1,4 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +VALUES ASNCDC.ASNCDCSERVICES('start','asncdc'); \ No newline at end of file diff --git a/.github/db2/startup-cdc-demo.sql b/.github/db2/startup-cdc-demo.sql new file mode 100644 index 00000000..4c7c5c8a --- /dev/null +++ b/.github/db2/startup-cdc-demo.sql @@ -0,0 +1,11 @@ +-- Copyright Debezium Authors. Licensed under the Apache License, Version 2.0. +-- https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server + +VALUES ASNCDC.ASNCDCSERVICES('status','asncdc'); + +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS_ON_HAND' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'CUSTOMERS' ); +CALL ASNCDC.ADDTABLE('DB2INST1', 'ORDERS' ); + +VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc'); \ No newline at end of file diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index 701e7e58..8bd85c03 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -73,6 +73,25 @@ services: entrypoint: /cockroach/cockroach command: start-single-node --insecure --store type=mem,size=2G --listen-addr :5401 --http-addr :8082 + db2-v11.5.9: + # Using a DB2 image with additional SQL replication utilities. + # To build the image use .github/db2 + # Original is at icr.io/db2_community/db2 + build: ./db2 + privileged: True + ports: + - 50000:50000 + environment: + - PERSISTENT_HOME=false + - LICENSE=accept + - DBNAME=TESTDB + - DB2INST1_PASSWORD=SoupOrSecret + healthcheck: + # The setup scripts creates /asncdctools/src/asncdc.nlk when CDC is up. + # TODO (silvano) improve start up time + test: bash -c '[ -f /asncdctools/src/asncdc.nlk ] ' + interval: 1s + retries: 600 mysql-v5.7: image: mysql:5.7 platform: linux/x86_64 diff --git a/.github/workflows/go-test-db2.yaml b/.github/workflows/go-test-db2.yaml new file mode 100644 index 00000000..92874a90 --- /dev/null +++ b/.github/workflows/go-test-db2.yaml @@ -0,0 +1,207 @@ +# Copyright 2023 The Cockroach Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +name: DB2 Tests +permissions: + contents: read + packages: read +on: + workflow_call: + workflow_dispatch: +# on: +# push: +# branches: [ sr8_db2 ] +jobs: + # Static code-quality checks. + code-quality: + name: Code Quality + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + - name: Ensure binary starts + if: ${{ !cancelled() }} + run: go run . help + + # Integration matrix tests for all supported CRDB and source DBs. + tests: + name: Integration Tests + runs-on: ${{ matrix.runs-on || 'ubuntu-latest-8-core' }} + timeout-minutes: 20 + strategy: + fail-fast: false + # Refer to the CRDB support policy when determining how many + # older releases to support. + # https://www.cockroachlabs.com/docs/releases/release-support-policy.html + # + # This matrix is explicit, since we have a few axes (target vs + # integration) that can't be expressed with the automatic + # cross-product behavior provided by the matrix operator. + matrix: + include: + - cockroachdb: v23.1 + integration: db2-v11.5.9 + + env: + COVER_OUT: coverage-${{ strategy.job-index }}.out + DOCKER_LOGS_OUT: docker-${{ strategy.job-index }}.log + FIRESTORE_EMULATOR_HOST: 127.0.0.1:8181 + JUNIT_OUT: junit-${{ strategy.job-index }}.xml + TEST_OUT: go-test-${{ strategy.job-index }}.json + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + + # Ensure we can grab any private images we need for testing. + - name: Log in to GitHub Package Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Start Containers + working-directory: .github + run: > + docker compose up -d --wait + cockroachdb-${{ matrix.cockroachdb }} + ${{ matrix.integration }} + ${{ matrix.source }} + ${{ matrix.target }} + + # The go test json output will be written into a pipeline to + # create a JUnit.xml file. The test reports are aggregated later + # on to produce a nicer summary of the test output in the GitHub + # Actions UI. + # + # Inspired by + # https://www.cloudwithchris.com/blog/githubactions-testsummary-go/ + - name: Go Tests + env: + COCKROACH_DEV_LICENSE: ${{ secrets.COCKROACH_DEV_LICENSE }} + CDC_INTEGRATION: ${{ matrix.integration }} + TEST_SOURCE_CONNECT: ${{ matrix.sourceConn }} + TEST_TARGET_CONNECT: ${{ matrix.targetConn }} + IBM_DRIVER: /tmp/drivers/clidriver + CGO_CFLAGS: -I${IBM_DRIVER}/include + CGO_LDFLAGS: -L${IBM_DRIVER}/lib + run: > + set -o pipefail; + make testdb2 2>&1 | + go run github.com/jstemmer/go-junit-report/v2 + -iocopy + -out ${{ env.JUNIT_OUT }} + -p cockroachdb=${{ matrix.cockroachdb }} + -p integration=${{ matrix.integration }} + -package-name ${{ matrix.cockroachdb }}-${{ matrix.integration }} | + tee ${{ env.TEST_OUT }} + + - name: Upload coverage + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ${{ env.COVER_OUT }} + + # Capture container logs in case they're needed for diagnostics. + - name: Docker container logs + if: always() + working-directory: .github + run: docker-compose logs --no-color > ${{ env.DOCKER_LOGS_OUT }} + + # Upload all test reports to a common artifact name, to make them + # available to the summarization step. The go test json is + # uploaded as a developer convenience. + - name: Stash test logs + uses: actions/upload-artifact@v3 + if: always() + with: + name: integration-reports + path: | + ${{ env.COVER_OUT }} + ${{ env.DOCKER_LOGS_OUT }} + ${{ env.JUNIT_OUT }} + ${{ env.TEST_OUT }} + retention-days: 7 + + # Aggregate the results of multiple jobs within this workflow into a + # single status object that we can use for branch protection. + # + # https://docs.github.com/en/rest/commits/statuses + status: + name: Create status objects + runs-on: ubuntu-latest + permissions: + statuses: write + needs: # Update failure case below + - code-quality + - tests + if: ${{ always() }} + env: + CONTEXT: Workflow Golang + GH_TOKEN: ${{ github.token }} + MERGE_SHA: ${{ github.event.merge_group.head_sha }} + PR_SHA: ${{ github.event.pull_request.head.sha }} + STATE: success + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + steps: + - name: Failure + if: ${{ needs.code-quality.result != 'success' || needs.tests.result != 'success' }} + run: echo "STATE=failure" >> $GITHUB_ENV + - name: Report + run: | + set -eo pipefail + + if [ ! -z "$PR_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$PR_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + if [ ! -z "$MERGE_SHA" ]; then + gh api \ + repos/${{ github.repository }}/statuses/$MERGE_SHA \ + -f "state=$STATE" \ + -f "context=$CONTEXT" \ + -f "target_url=$RUN_URL" + fi + + # This job downloads the test log files generated in the integration + # job and summarizes them into the GitHub Actions UI. + summarize-tests: + name: Test summaries + runs-on: ubuntu-latest + needs: tests + if: ${{ always() }} + steps: + - name: Download reports + uses: actions/download-artifact@v3 + with: + name: integration-reports + - name: Summarize + uses: test-summary/action@v2 + with: + paths: junit-*.xml diff --git a/.github/workflows/go-tests.yaml b/.github/workflows/go-tests.yaml index 5f174ce7..2e326e1c 100644 --- a/.github/workflows/go-tests.yaml +++ b/.github/workflows/go-tests.yaml @@ -43,7 +43,7 @@ jobs: - name: Copyright headers if: ${{ !cancelled() }} - run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore '**/testdata/**/*.sql' . + run: go run github.com/google/addlicense -c "The Cockroach Authors" -l apache -s -v -check -ignore ".github/db2/**" -ignore '**/testdata/**/*.sql' . - name: Lint if: ${{ !cancelled() }} diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index ea98c18a..637cbac5 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -82,6 +82,19 @@ jobs: statuses: write secrets: inherit + go-db2: + uses: ./.github/workflows/go-test-db2.yaml + needs: + - go-build-cache + # We use the merge queue prior to pushing to a branch, so there's no + # reason to repeat tests that just ran. + if: ${{ github.event_name != 'push' }} + permissions: + contents: read + packages: read + statuses: write + secrets: inherit + go-wiki: uses: ./.github/workflows/go-wiki.yaml needs: diff --git a/.gitignore b/.gitignore index 48edd5f2..d9badf39 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,12 @@ cdc-sink # Dependency directories (remove the comment below to include it) # vendor/ +# Temporary directory +tmp/ + +# Downloaded drivers +drivers/ + # CockroachDB data directory cockroach-data/ goroutine_dump/ diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..80114b31 --- /dev/null +++ b/Makefile @@ -0,0 +1,34 @@ + +PWD = $(shell pwd) +COVER_OUT ?= cover.out +IBM_DRIVER = ${PWD}/drivers/clidriver +export CGO_CFLAGS=-I${IBM_DRIVER}/include +export CGO_LDFLAGS=-L${IBM_DRIVER}/lib +export DYLD_LIBRARY_PATH=${IBM_DRIVER}/lib +export LD_LIBRARY_PATH=${IBM_DRIVER}/lib + +.PHONY: all db2 clean realclean testdb2 + +all: cdc-sink + +cdc-sink: + go build + +drivers/clidriver: + go run . db2install --dest drivers + +db2: drivers/clidriver + go build -tags db2 + +clean: + rm cdc-sink + +realclean: clean + rm -rf drivers + +testdb2: drivers/clidriver + go test -tags db2 -count 1 -coverpkg=./internal/source/db2 \ + -covermode=atomic \ + -coverprofile=${COVER_OUT} \ + -race \ + -v ./internal/source/db2 diff --git a/go.mod b/go.mod index dca2823e..fb8d9693 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,19 @@ require ( require ( github.com/google/go-cmp v0.6.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/codeclysm/extract/v3 v3.1.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/h2non/filetype v1.1.3 // indirect + github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 // indirect + github.com/klauspost/compress v1.15.13 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/ulikunitz/xz v0.5.11 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect ) require ( @@ -55,6 +68,9 @@ require ( github.com/google/licenseclassifier v0.0.0-20210722185704-3043a050f148 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/google/subcommands v1.2.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/ibmdb/go_ibm_db v0.4.5 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index c742acb5..be830046 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/cockroachdb/gostdlib v1.19.0 h1:cSISxkVnTlWhTkyple/T6NXzOi5659FkhxvUg github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTymPZUKMfURjBtY= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8 h1:Hli+oX84dKq44sLVCcsGKqifm5Lg9J8VoJ2P3h9iPdI= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8/go.mod h1:75wnig8+TF6vst9hChkpcFO7YrRLddouJ5is8uqpfv0= +github.com/codeclysm/extract/v3 v3.1.1 h1:iHZtdEAwSTqPrd+1n4jfhr1qBhUWtHlMTjT90+fJVXg= +github.com/codeclysm/extract/v3 v3.1.1/go.mod h1:ZJi80UG2JtfHqJI+lgJSCACttZi++dHxfWuPaMhlOfQ= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= @@ -266,11 +268,15 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg= +github.com/h2non/filetype v1.1.3/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/ibmdb/go_ibm_db v0.4.5 h1:a0qKWbA5shCRo5HRRnAuENwhjg6AkgfPNr9xx0IZ160= +github.com/ibmdb/go_ibm_db v0.4.5/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP49UTmEvg= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= @@ -299,9 +305,13 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report/v2 v2.1.0 h1:X3+hPYlSczH9IMIpSC9CQSZA0L+BipYafciZUWHEmsc= github.com/jstemmer/go-junit-report/v2 v2.1.0/go.mod h1:mgHVr7VUo5Tn8OLVr1cKnLuEy0M92wdRntM99h7RkgQ= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 h1:rhqTjzJlm7EbkELJDKMTU7udov+Se0xZkWmugr6zGok= +github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v2wtGp9Gmz1Ze3eVRAWJMLokvN3QjdzCHLY= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0= +github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -398,6 +408,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= +github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/cmd/db2/db2.go b/internal/cmd/db2/db2.go new file mode 100644 index 00000000..ca9bf7ee --- /dev/null +++ b/internal/cmd/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains a command to perform logical replication +// from an IBM DB2 server. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/source/db2" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/spf13/cobra" +) + +// Command returns the pglogical subcommand. +func Command() *cobra.Command { + cfg := &db2.Config{} + return stdlogical.New(&stdlogical.Template{ + Config: cfg, + Short: "start a DB2 replication feed", + Start: func(ctx *stopper.Context, cmd *cobra.Command) (any, error) { + return db2.Start(ctx, cfg) + }, + Use: "db2logical", + }) +} diff --git a/internal/cmd/db2/install.go b/internal/cmd/db2/install.go new file mode 100644 index 00000000..f3fba1b3 --- /dev/null +++ b/internal/cmd/db2/install.go @@ -0,0 +1,161 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "runtime" + "strings" + + "github.com/codeclysm/extract/v3" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +const ( + defaultURL = "https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/" + defaultDest = "drivers" +) + +// Install downloads and installs the IBM DB2 ODBC driver. +// Because of license restrictions, the driver cannot be under +// source control. It needs to be downloaded when used. +func Install() *cobra.Command { + var downloadURL, dest string + var dryrun bool + cmd := &cobra.Command{ + Args: cobra.NoArgs, + Short: "installs the db2 driver", + Use: "db2install", + // We are hiding it, since it's mainly used for testing. + Hidden: true, + RunE: func(cmd *cobra.Command, args []string) error { + driverPath := path.Join(dest, "clidriver") + _, err := os.Stat(driverPath) + _, includeErr := os.Stat(path.Join(driverPath, "include")) + _, libErr := os.Stat(path.Join(driverPath, "lib")) + if os.IsNotExist(err) || os.IsNotExist(includeErr) || os.IsNotExist(libErr) { + pkgName, err := install(cmd.Context(), dest, downloadURL, dryrun) + if err != nil { + return err + } + if pkgName != "" { + fmt.Printf("Installed %s into %s.\n", pkgName, dest) + } + return nil + } + if err != nil { + return err + } + if includeErr != nil { + return includeErr + } + if libErr != nil { + return libErr + } + fmt.Println("Driver already installed.") + return nil + }, + } + cmd.Flags().BoolVar(&dryrun, "dryrun", false, + "show the instructions to install the package manually") + cmd.Flags().StringVar(&dest, "dest", defaultDest, + "destination dir") + cmd.Flags().StringVar(&downloadURL, "url", defaultURL, + "url to download the driver") + return cmd +} + +// download a package to specified temp destination. +func download(tmpFile, baseURL, pkg string) (downloadError error) { + out, err := os.Create(tmpFile) + if err != nil { + return err + } + defer func() { + if err := out.Close(); downloadError != nil && err != nil { + downloadError = err + } + }() + pkgURL, err := url.JoinPath(baseURL, pkg) + if err != nil { + return err + } + resp, err := http.Get(pkgURL) + if err != nil { + return err + } + defer resp.Body.Close() + _, downloadError = io.Copy(out, resp.Body) + return +} + +// install the IBM DB2 driver for the runtime platform into +// the specified directory +func install(ctx context.Context, target, baseURL string, dryrun bool) (string, error) { + var pkgName string + switch runtime.GOOS { + case "darwin": + pkgName = "macos64_odbc_cli.tar.gz" + case "linux": + switch runtime.GOARCH { + case "amd64": + pkgName = "linuxx64_odbc_cli.tar.gz" + default: + return "", errors.Errorf("unknown arch %q", runtime.GOARCH) + } + default: + return "", errors.Errorf("unknown OS %q", runtime.GOOS) + } + if dryrun { + fmt.Printf("Download %s%s in %s.\n", baseURL, pkgName, target) + return "", nil + } + pkg := filepath.Join(target, pkgName) + _, err := os.Stat(pkg) + if os.IsNotExist(err) { + fmt.Printf("Installing %s%s in %s.\n", baseURL, pkgName, target) + tmpFile := path.Join(os.TempDir(), pkgName) + defer os.Remove(tmpFile) + err := download(tmpFile, baseURL, pkgName) + if err != nil { + return "", err + } + fmt.Println("Download successful.") + pkg = tmpFile + } else { + fmt.Printf("Found %s in %s\n", pkgName, target) + } + f, err := os.Open(pkg) + if err != nil { + return "", err + } + if strings.HasSuffix(pkg, ".zip") { + return pkgName, extract.Zip(ctx, f, target, nil) + } + if strings.HasSuffix(pkg, ".gz") { + return pkgName, extract.Gz(ctx, f, target, nil) + } + return "", errors.Errorf("unknown file type %s", pkgName) +} diff --git a/internal/sinktest/all/integration.go b/internal/sinktest/all/integration.go index d05d07a0..11051226 100644 --- a/internal/sinktest/all/integration.go +++ b/internal/sinktest/all/integration.go @@ -29,6 +29,10 @@ const ( // integration tests for some databases. We expect this to be of the // format "database-v123". IntegrationEnvName = "CDC_INTEGRATION" + // DB2Name must be kept in alignment with the + // .github/docker-compose.yml file and the integration matrix + // variable in workflows/tests.yaml. + DB2Name = "db2" // MySQLName must be kept in alignment with the // .github/docker-compose.yml file and the integration matrix // variable in workflows/tests.yaml. diff --git a/internal/sinktest/scripttest/testdata/logical_test_db2.ts b/internal/sinktest/scripttest/testdata/logical_test_db2.ts new file mode 100644 index 00000000..781eba49 --- /dev/null +++ b/internal/sinktest/scripttest/testdata/logical_test_db2.ts @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; +import {Document, Table} from "cdc-sink@v1"; + +// The sentinel name will be replaced by the test rig. It would normally be +// "my_db.public" or "my_db" depending on the target product. +api.configureSource("{{ SCHEMA }}", { + dispatch: (doc: Document, meta: Document): Record => { + console.trace(JSON.stringify(doc), JSON.stringify(meta)); + let ret: Record = {}; + ret[meta.table] = [ + { + PK: doc.PK, + OK: doc.OK, + ignored: 'by_configuration_below', + v_dispatched: doc.V, // Rename the property + } + ]; + return ret + } +}) + +// We introduce an unknown column in the dispatch function above. +// We'll add an extra configuration here to ignore it. +// The sentinel name would be replaced by "my_table". +api.configureTable("{{ TABLE }}", { + map: (doc: Document): Document => { + console.trace("map", JSON.stringify(doc)); + if (doc.v_dispatched === undefined) { + throw "did not find expected property"; + } + doc.v_mapped = doc.v_dispatched; + delete doc.v_dispatched; // Avoid schema-drift error due to unknown column. + return doc; + }, + ignore: { + "ignored": true, + } +}) diff --git a/internal/source/db2/config.go b/internal/source/db2/config.go new file mode 100644 index 00000000..05e4f107 --- /dev/null +++ b/internal/source/db2/config.go @@ -0,0 +1,137 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "net/url" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +// EagerConfig is a hack to get Wire to move userscript evaluation to +// the beginning of the injector. This allows CLI flags to be set by the +// script. +type EagerConfig Config + +// Config contains the configuration necessary for creating a +// replication connection to a DB2 server. SourceConn is mandatory. +type Config struct { + DLQ dlq.Config + Script script.Config + Sequencer sequencer.Config + Staging sinkprod.StagingConfig + Target sinkprod.TargetConfig + + // InitialLsn is the Log Sequence Number used to start the replication + InitialLSN string + // PollingInterval defines how often we should check for new data, if idle. + PollingInterval time.Duration + // Connection string for the source db. Mandatory. + SourceConn string + // The schema we are replicating. If not provided, all the schemas will be replicated. + SourceSchema ident.Schema + + // The schema for SQL replication in DB2. + SQLReplicationSchema ident.Schema + // The SQL schema in the target cluster to write into. + TargetSchema ident.Schema + // The fields below are extracted by Preflight. + host string + password string + port uint16 + user string + database string +} + +// defaultSQLReplicationSchema is the schema in DB2 where the staging tables are located +var defaultSQLReplicationSchema = ident.MustSchema(ident.New("ASNCDC")) + +// Bind adds flags to the set. It delegates to the embedded Config.Bind. +func (c *Config) Bind(f *pflag.FlagSet) { + c.DLQ.Bind(f) + c.Script.Bind(f) + c.Sequencer.Bind(f) + c.Staging.Bind(f) + c.Target.Bind(f) + + f.StringVar(&c.InitialLSN, "defaultLSN", "", + "default log sequence number. Used if no state is persisted") + f.StringVar(&c.SourceConn, "sourceConn", "", + "the source database's connection string") + f.Var(ident.NewSchemaFlag(&c.SourceSchema), "sourceSchema", + "the SQL database schema in the source cluster") + f.Var(ident.NewSchemaFlag(&c.TargetSchema), "targetSchema", + "the SQL database schema in the target cluster to update") + f.Var(ident.NewSchemaFlag(&c.SQLReplicationSchema), "replicationSchema", + "the SQL database schema used by the DB2 SQL replication") + f.DurationVar(&c.PollingInterval, "pollingInterval", time.Second, "how often we check for new mutations") + +} + +// Preflight updates the configuration with sane defaults or returns an +// error if there are missing options for which a default cannot be +// provided. +func (c *Config) Preflight() error { + if err := c.DLQ.Preflight(); err != nil { + return err + } + if err := c.Script.Preflight(); err != nil { + return err + } + if err := c.Sequencer.Preflight(); err != nil { + return err + } + if err := c.Staging.Preflight(); err != nil { + return err + } + if err := c.Target.Preflight(); err != nil { + return err + } + if c.SourceConn == "" { + return errors.New("no SourceConn was configured") + } + if c.TargetSchema.Empty() { + return errors.New("no target schema specified") + } + if c.SQLReplicationSchema.Empty() { + c.SQLReplicationSchema = defaultSQLReplicationSchema + } + u, err := url.Parse(c.SourceConn) + if err != nil { + return err + } + port, err := strconv.ParseUint(u.Port(), 0, 16) + if err != nil { + return errors.Wrapf(err, "invalid port %s", u.Port()) + } + c.host = u.Hostname() + c.port = uint16(port) + c.user = u.User.Username() + c.password, _ = u.User.Password() + c.database, _ = strings.CutPrefix(u.Path, "/") + + return nil +} diff --git a/internal/source/db2/conn.go b/internal/source/db2/conn.go new file mode 100644 index 00000000..d2397eca --- /dev/null +++ b/internal/source/db2/conn.go @@ -0,0 +1,516 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/cockroachdb/cdc-sink/internal/util/stopvar" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/shopspring/decimal" + log "github.com/sirupsen/logrus" +) + +// Conn is responsible to pull the mutations from the DB2 staging tables +// and applying them to the target database via the logical.Batch interface. +// The process leverages the DB2 SQL replication, similar to +// the debezium DB2 connector. +// To set up DB2 for SQL replication, see the instructions at +// https://debezium.io/documentation/reference/stable/connectors/db2.html#setting-up-db2 +// Note: In DB2 identifiers (e.g. table names, column names) are converted to +// uppercase, unless they are in quotes. The target schema must use the +// same convention. +type Conn struct { + // The destination for writes. + acceptor types.MultiAcceptor + // The database connection. + db *sql.DB + // Memoize column metadata for each table. + columns *ident.TableMap[[]types.ColData] + // Configuration of the connector. + config *Config + // Memoize of primary keys position for each table. + // Maps the column, based on the order of ColData, to its position in the primary key. + primaryKeys *ident.TableMap[map[int]int] + // Memoize queries to check SQL replication. + replQueries map[string]string + // Access to the staging cluster. + stagingDB *types.StagingPool + // Persistent storage for WAL data. + memo types.Memo + // Memoization of queries to fetch change events. + tableQueries *ident.TableMap[string] + // The destination for writes. + target ident.Schema + // Access to the target database. + targetDB *types.TargetPool + // Consistent Point, managed by persistWALOffset. + walOffset notify.Var[*lsn] +} + +var ( + _ diag.Diagnostic = (*Conn)(nil) +) + +// Start the DB2 connector. +func (c *Conn) Start(ctx *stopper.Context) error { + // Call this first to load the previous offset. We want to reset our + // state before starting the main copier routine. + if err := c.persistWALOffset(ctx); err != nil { + return err + } + + // Start a process to copy data to the target. + ctx.Go(func() error { + for !ctx.IsStopping() { + if err := c.copyMessages(ctx); err != nil { + log.WithError(err).Warn("error while copying messages; will retry") + select { + case <-ctx.Stopping(): + case <-time.After(time.Second): + } + } + } + return nil + }) + + return nil +} + +// Diagnostic implements diag.Diagnostic. +func (c *Conn) Diagnostic(_ context.Context) any { + return map[string]any{ + "hostname": c.config.host, + "database": c.config.database, + "defaultLsn": c.config.InitialLSN, + "columns": c.columns, + } +} + +// accumulateBatch finds all the rows that have been changed on the given table +// and adds them to the batch. +func (c *Conn) accumulateBatch( + ctx *stopper.Context, sourceTable ident.Table, lsnRange lsnRange, batch *types.MultiBatch, +) (int, error) { + count := 0 + query, ok := c.tableQueries.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unknown table %q", sourceTable) + } + // TODO (silvano): we might want to run in batches with a limit. + rows, err := c.db.QueryContext(ctx, query, lsnRange.From.Value[:], lsnRange.To.Value[:]) + if err != nil { + return 0, err + } + defer rows.Close() + cols, err := rows.ColumnTypes() + if err != nil { + return 0, err + } + // the first four columns are metadata (opcode, commit_seq, intent_seq, op_id) + numMetaColumns := 5 + numCols := len(cols) + if numCols <= numMetaColumns { + return 0, errors.Errorf("error in query template. Not enough columns. %q", query) + } + dataCols := cols[numMetaColumns:] + for rows.Next() { + count++ + res := make([]any, numCols) + ptr := make([]any, numCols) + for i := range cols { + ptr[i] = &res[i] + } + err = rows.Scan(ptr...) + if err != nil { + return count, err + } + // first column is the opcode + opcode, ok := res[0].(int32) + if !ok || !validOperation(opcode) { + return count, errors.Errorf("invalid operation %T", res[0]) + } + op := operation(opcode) + // second column is a timestamp + ts, ok := res[1].(time.Time) + if !ok { + return count, errors.Errorf("invalid timestamp %v", res[1]) + } + values := make([]any, len(res)-numMetaColumns) + for i, v := range res[numMetaColumns:] { + if v == nil { + values[i] = nil + continue + } + switch dataCols[i].DatabaseTypeName() { + case "VARCHAR", "CHAR", "CLOB": + s, ok := v.([]byte) + if !ok { + return count, errors.Errorf("invalid type %T, expected []byte", s) + } + values[i] = string(s) + case "DECIMAL", "DECFLOAT": + s, ok := v.([]byte) + if !ok { + return count, errors.Errorf("invalid type %T, expected []byte", s) + } + values[i], err = decimal.NewFromString(string(s)) + if err != nil { + errors.Wrapf(err, "cannot convert decimal %s", s) + return count, err + } + default: + values[i] = v + } + } + targetTable := ident.NewTable( + c.target, + ident.New(sourceTable.Table().Raw())) + targetCols, ok := c.columns.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unable to retrieve target columns for %s", sourceTable) + } + primaryKeys, ok := c.primaryKeys.Get(sourceTable) + if !ok { + return 0, errors.Errorf("unable to retrieve primary keys for %s", sourceTable) + } + key := make([]any, len(primaryKeys)) + enc := make(map[string]any) + for idx, sourceCol := range values { + targetCol := targetCols[idx] + switch s := sourceCol.(type) { + case nil: + enc[targetCol.Name.Raw()] = nil + default: + enc[targetCol.Name.Raw()] = s + } + if targetCol.Primary { + key[primaryKeys[idx]] = sourceCol + } + } + var mut types.Mutation + mut.Time = hlc.New(ts.UnixNano(), 0) + if mut.Key, err = json.Marshal(key); err != nil { + return count, err + } + if op != deleteOp { + mut.Data, err = json.Marshal(enc) + if err != nil { + return count, err + } + } + mutationCount.With(prometheus.Labels{ + "table": sourceTable.Raw(), + "op": op.String()}).Inc() + log.Infof("(%s,%v) %s => %s \n", op, key, sourceTable.Raw(), targetTable.Raw()) + + script.AddMeta("db2", targetTable, &mut) + if err = batch.Accumulate(targetTable, mut); err != nil { + return count, err + } + } + return count, nil +} + +// copyMessages read messages from the DB2 staging tables, +// accumulates them, and commits data to the target +func (c *Conn) copyMessages(ctx *stopper.Context) error { + var err error + c.db, err = c.open() + if err != nil { + return errors.Wrap(err, "failed to open a connection to the target db") + } + defer func() { + err := c.db.Close() + if err != nil { + log.Errorf("Error closing the database. %q", err) + } + }() + previousLsn, _ := c.walOffset.Get() + if previousLsn == nil { + return errors.New("missing log sequence number") + } + log.Infof("DB2 connector starting at %x", previousLsn.Value) + for { + nextLsn, err := c.getNextLsn(ctx, previousLsn) + if err != nil { + return errors.Wrap(err, "cannot retrieve next sequence number") + } + log.Tracef("NEXT [%x]\n", nextLsn.Value) + if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) { + select { + case <-time.After(c.config.PollingInterval): + continue + case <-ctx.Stopping(): + return nil + } + } + start := time.Now() + log.Tracef("BEGIN [%x]\n", previousLsn.Value) + tables, stagingTables, err := c.getTables(ctx, c.config.SourceSchema) + if err != nil { + return errors.Wrap(err, "cannot get DB2 CDC tables") + } + currentLsnRange := lsnRange{ + From: previousLsn, + To: nextLsn, + } + batch := &types.MultiBatch{} + for idx, table := range tables { + start := time.Now() + stagingTable := stagingTables[idx] + err := c.populateTableMetadata(ctx, table, stagingTable) + if err != nil { + return errors.Wrap(err, "cannot retrieve table metadata") + } + count, err := c.accumulateBatch(ctx, table, currentLsnRange, batch) + if err != nil { + return errors.Wrap(err, "cannot post mutation") + } + if ctx.IsStopping() { + return nil + } + log.Tracef("post mutations for %s starting at %x. Count %d. Time %d.", table, + currentLsnRange.From.Value, count, + time.Since(start).Milliseconds()) + queryLatency.With(prometheus.Labels{ + "table": table.Canonical().Raw()}).Observe(time.Since(start).Seconds()) + } + if batch.Count() == 0 { + log.Trace("skipping empty transaction") + } else { + log.Debugf("COMMIT [%x]-[%x]\n", previousLsn.Value, nextLsn.Value) + if err := c.commit(ctx, batch); err != nil { + return errors.WithStack(err) + } + } + batchLatency.Observe(float64(time.Since(start).Seconds())) + previousLsn = nextLsn + c.walOffset.Set(previousLsn) + } +} + +// commit the current batch to the target database. +func (c *Conn) commit(ctx *stopper.Context, batch *types.MultiBatch) error { + batchSize.Observe(float64(batch.Count())) + tx, err := c.targetDB.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return errors.WithStack(err) + } + defer tx.Rollback() + if err := c.acceptor.AcceptMultiBatch(ctx, batch, &types.AcceptOptions{ + TargetQuerier: tx, + }); err != nil { + return errors.WithStack(err) + } + return tx.Commit() +} + +// getFirstLsnFound returns the first row that contains a log sequence number. +func getFirstLsnFound(rows *sql.Rows) (*lsn, error) { + if rows.Next() { + var v []byte + err := rows.Scan(&v) + if err != nil { + return nil, err + } + return newLSN(v) + } + return lsnZero(), nil +} + +// getLsnFromMonitoring retrieves the sequence number from +// the monitoring table +func (c *Conn) getLsnFromMonitoring(ctx *stopper.Context, current *lsn) (*lsn, error) { + // Checking the monitoring table for a batch of mutations. + rows, err := c.db.QueryContext(ctx, c.replQueries["nextLsn"], current.Value[:]) + if err != nil { + return nil, err + } + defer rows.Close() + return getFirstLsnFound(rows) +} + +// getMaxLsn retrieves the latest sequence number in all the DB2 staging tables. +func (c *Conn) getMaxLsn(ctx *stopper.Context) (*lsn, error) { + rows, err := c.db.QueryContext(ctx, c.replQueries["maxLsn"]) + if err != nil { + return nil, err + } + defer rows.Close() + return getFirstLsnFound(rows) +} + +// getNextLsn retrieves the next sequence number +func (c *Conn) getNextLsn(ctx *stopper.Context, current *lsn) (*lsn, error) { + // Checking the monitoring table first, maybe we can process a smaller batch. + lsn, err := c.getLsnFromMonitoring(ctx, current) + if err != nil { + return nil, err + } + // If we don't find a lsn in the monitoring table, then look in the + // DB2 staging tables. + if lsn.Equal(lsnZero()) { + return c.getMaxLsn(ctx) + } + return lsn, nil +} + +// getTables returns the source tables and corresponding staging tables in DB2. +// The DB2 staging tables store the committed transactional data +// for all the change events tracked by the SQL replication in the source database. +func (c *Conn) getTables( + ctx *stopper.Context, schema ident.Schema, +) ([]ident.Table, []ident.Table, error) { + var rows *sql.Rows + var err error + if schema.Empty() { + rows, err = c.db.QueryContext(ctx, c.replQueries["stagingTables"]) + } else { + rows, err = c.db.QueryContext(ctx, c.replQueries["stagingTablesForSchema"], schema.Raw()) + } + if err != nil { + return nil, nil, err + } + defer rows.Close() + tables := make([]ident.Table, 0) + stagingTables := make([]ident.Table, 0) + for rows.Next() { + var sourceSchema, sourceTable string + var stagingSchema, stagingTable string + err = rows.Scan(&sourceSchema, &sourceTable, &stagingSchema, &stagingTable) + if err != nil { + return nil, nil, err + } + tables = append(tables, ident.NewTable( + ident.MustSchema(ident.New(sourceSchema)), + ident.New(sourceTable))) + + stagingTables = append(stagingTables, ident.NewTable( + ident.MustSchema(ident.New(stagingSchema)), + ident.New(stagingTable))) + } + return tables, stagingTables, nil +} + +// open a new connection to the source database. +func (c *Conn) open() (*sql.DB, error) { + con := fmt.Sprintf("HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=%s", + c.config.host, + c.config.database, + c.config.port, + c.config.user, + c.config.password) + log.Debugf("DB2 connection: HOSTNAME=%s;DATABASE=%s;PORT=%d;UID=%s;PWD=...", + c.config.host, + c.config.database, + c.config.port, + c.config.user) + + return sql.Open("go_ibm_db", con) +} + +// persistLsn loads an existing value from memo into WALOffset or +// initializes to a user-provided value. It will also start a goroutine +// in the stopper to occasionally write an updated value back to the +// memo. +func (c *Conn) persistWALOffset(ctx *stopper.Context) error { + key := fmt.Sprintf("db2-lsn-%s", c.target.Raw()) + found, err := c.memo.Get(ctx, c.stagingDB, key) + if err != nil { + return err + } + cp := &lsn{} + if len(found) > 0 { + if err := cp.UnmarshalText(found); err != nil { + return err + } + } else if c.config.InitialLSN != "" { + if err := cp.UnmarshalText([]byte(c.config.InitialLSN)); err != nil { + return err + } + } + c.walOffset.Set(cp) + ctx.Go(func() error { + _, err := stopvar.DoWhenChanged(ctx, cp, &c.walOffset, + func(ctx *stopper.Context, _, cp *lsn) error { + if err := c.memo.Put(ctx, c.stagingDB, key, []byte(cp.String())); err == nil { + log.Tracef("stored WAL offset %s: %s", key, cp) + } else { + log.WithError(err).Warn("could not persist WAL offset") + } + return nil + }) + return err + }) + return nil +} + +// populateTableMetadata fetches the column metadata for a given table +// and caches it. +func (c *Conn) populateTableMetadata( + ctx *stopper.Context, sourceTable ident.Table, stagingTable ident.Table, +) error { + if _, ok := c.columns.Get(sourceTable); ok { + return nil + } + // to get the changes for the given source table, we need to query the staging table. + c.tableQueries.Put(sourceTable, + fmt.Sprintf(changeQuery, stagingTable, c.config.SQLReplicationSchema)) + // get the columns and primary keys. + rows, err := c.db.QueryContext(ctx, columnQuery, sourceTable.Schema().Raw(), sourceTable.Table().Raw()) + if err != nil { + return err + } + defer rows.Close() + colData := make([]types.ColData, 0) + primaryKeys := make(map[int]int) + idx := 0 + for rows.Next() { + var name, typ string + var pk *int + err = rows.Scan(&name, &pk, &typ) + if err != nil { + return err + } + if pk != nil { + primaryKeys[idx] = *pk - 1 + } + log.Tracef("Table %s col %q %q primary?: %t", sourceTable, name, typ, pk != nil) + colData = append(colData, types.ColData{ + Name: ident.New(name), + Primary: pk != nil, + Type: typ, + }) + idx++ + } + c.columns.Put(sourceTable, colData) + c.primaryKeys.Put(sourceTable, primaryKeys) + return nil +} diff --git a/internal/source/db2/db2.go b/internal/source/db2/db2.go new file mode 100644 index 00000000..62d96af7 --- /dev/null +++ b/internal/source/db2/db2.go @@ -0,0 +1,39 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package db2 contains support for reading a db2 sql replication feed. +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" +) + +// DB2 is a logical replication loop for the IBM DB2 database. +// It leverages SQL replication on the source. +type DB2 struct { + Conn *Conn + Diagnostics *diag.Diagnostics +} + +var ( + _ stdlogical.HasDiagnostics = (*DB2)(nil) +) + +// GetDiagnostics implements [stdlogical.HasDiagnostics]. +func (d *DB2) GetDiagnostics() *diag.Diagnostics { + return d.Diagnostics +} diff --git a/internal/source/db2/driver.go b/internal/source/db2/driver.go new file mode 100644 index 00000000..99209cca --- /dev/null +++ b/internal/source/db2/driver.go @@ -0,0 +1,22 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import _ "github.com/ibmdb/go_ibm_db" diff --git a/internal/source/db2/injector.go b/internal/source/db2/injector.go new file mode 100644 index 00000000..f33f5720 --- /dev/null +++ b/internal/source/db2/injector.go @@ -0,0 +1,55 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build wireinject +// +build wireinject + +package db2 + +import ( + "context" + + scriptRuntime "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + scriptSequencer "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/staging" + "github.com/cockroachdb/cdc-sink/internal/target" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +// Start creates a DB2 logical replication loop using the +// provided configuration. +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + panic(wire.Build( + wire.Bind(new(context.Context), new(*stopper.Context)), + wire.Struct(new(DB2), "*"), + wire.FieldsOf(new(*Config), "Script"), + wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Staging", "Target"), + Set, + immediate.Set, + chaos.Set, + diag.New, + scriptRuntime.Set, + scriptSequencer.Set, + sinkprod.Set, + staging.Set, + target.Set, + )) +} diff --git a/internal/source/db2/integration_test.go b/internal/source/db2/integration_test.go new file mode 100644 index 00000000..c651e5cc --- /dev/null +++ b/internal/source/db2/integration_test.go @@ -0,0 +1,782 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build db2 && cgo +// +build db2,cgo + +package db2 + +import ( + "context" + "database/sql" + "fmt" + "math" + "os" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/sinktest" + "github.com/cockroachdb/cdc-sink/internal/sinktest/all" + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + _ "github.com/ibmdb/go_ibm_db" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const defaultSourceConn = "db2://db2inst1:SoupOrSecret@localhost:50000/TESTDB" + +type fixtureConfig struct { + chaos bool + script bool +} + +type tableInfo struct { + name ident.Ident + sourceColName ident.Ident + sourceColType string + targetColName ident.Ident + targetColType string +} + +func TestMain(m *testing.M) { + all.IntegrationMain(m, all.DB2Name) +} + +func TestB2Logical(t *testing.T) { + t.Run("consistent", func(t *testing.T) { testDB2Logical(t, &fixtureConfig{}) }) + t.Run("consistent-chaos", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{chaos: true}) + }) + t.Run("consistent-script", func(t *testing.T) { + testDB2Logical(t, &fixtureConfig{script: true}) + }) +} + +func testDB2Logical(t *testing.T, fc *fixtureConfig) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDB2LogicalInt(t, fixture, fc) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} +func testDB2LogicalInt(t *testing.T, fixture *base.Fixture, fc *fixtureConfig) { + a := assert.New(t) + r := require.New(t) + ctx := fixture.Context + // Using uppercase for consistency + tableName := ident.New("T") + targetColName := "V" + if fc.script { + targetColName = "v_mapped" + } + table := tableInfo{ + name: tableName, + sourceColName: ident.New("V"), + sourceColType: "varchar(20)", + targetColName: ident.New(targetColName), + targetColType: "string", + } + + repl, config, err := createRepl(ctx, fixture, fc, tableName) + r.NoError(err) + defer repl.cleanUp(ctx) + + tableMap, cancel, err := repl.createTables(ctx, table) + r.NoError(err) + defer cancel() + start := time.Now() + // CDC needs to be initialized again on DB2. + // TODO (silvano): switch to reInit, once we move the + // COMMIT_INTERVAL,SLEEP_INTERVAL settings to the image. + // Changing them requires a restart. + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + const rowCount = 128 + values := make([]any, rowCount) + for i := 0; i < rowCount; i++ { + values[i] = fmt.Sprintf("v=%d", i) + } + // Insert data into source table. + start = time.Now() + err = repl.insertValues(ctx, tableMap.source, values, false) + r.NoError(err) + log.Infof("Inserted rows in %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + start = time.Now() + for { + var count int + if err := repl.target.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", tableMap.target)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + log.Infof("Rows replicated %s. %d ms", tableMap.source, time.Since(start).Milliseconds()) + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET %s= 'updated'`, tableMap.source, table.sourceColName)) + r.NoError(err) + // Wait for the update to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount { + break + } + time.Sleep(1 * time.Second) + } + + _, err = repl.source.ExecContext(ctx, fmt.Sprintf(`DELETE FROM %s WHERE "PK" < 50`, tableMap.source)) + r.NoError(err) + // Wait for the deletes to propagate. + for { + var count int + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s WHERE %s = 'updated'", tableMap.target, table.targetColName)).Scan(&count); !a.NoError(err) { + return + } + if count == rowCount-50 { + break + } + time.Sleep(1 * time.Second) + } + + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) + +} + +// TestMetadata verifies that we are able to get table metadata from the source +func TestMetadata(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testMetadata(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testMetadata(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{}, ident.Ident{}) + r.NoError(err) + sourceSchema := repl.sourceSchema + + defer repl.cleanUp(ctx) + + tcs := []struct { + name string + table ident.Ident + stmt string + columns []types.ColData + keys map[int]int + }{ + { + name: "simple", + table: ident.New("ONE"), + stmt: "CREATE TABLE %s (k INT PRIMARY KEY NOT NULL, v INT)", + columns: []types.ColData{ + { + Name: ident.New("K"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("V"), + Primary: false, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0}, + }, + { + name: "few cols", + table: ident.New("TWO"), + stmt: "CREATE TABLE %s (k INT PRIMARY KEY NOT NULL, a VARCHAR(20),b TIME, c INT, d INT)", + columns: []types.ColData{ + { + Name: ident.New("K"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("A"), + Primary: false, + Type: "VARCHAR", + }, + { + Name: ident.New("B"), + Primary: false, + Type: "TIME", + }, + { + Name: ident.New("C"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("D"), + Primary: false, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0}, + }, + { + name: "few keys", + table: ident.New("THREE"), + stmt: "CREATE TABLE %s (k1 INT NOT NULL, a INT,b INT, c INT, d INT,k2 int NOT NULL, primary key (k1,k2))", + columns: []types.ColData{ + { + Name: ident.New("K1"), + Primary: true, + Type: "INTEGER", + }, + { + Name: ident.New("A"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("B"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("C"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("D"), + Primary: false, + Type: "INTEGER", + }, + { + Name: ident.New("K2"), + Primary: true, + Type: "INTEGER", + }, + }, + keys: map[int]int{0: 0, 5: 1}, + }, + } + loop, err := Start(ctx, config) + conn := loop.Conn + start := time.Now() + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + a := assert.New(t) + r := require.New(t) + table := ident.NewTable(sourceSchema, tc.table) + cdtable := ident.NewTable(defaultSQLReplicationSchema, tc.table) + log.Infof("CREATING %s %s", sourceSchema, tc.table) + _, err := repl.source.ExecContext(ctx, fmt.Sprintf(tc.stmt, table)) + r.NoError(err) + // enable CDC for the table + _, err = repl.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", + sourceSchema.Raw(), table.Table().Raw())) + r.NoError(err) + defer func() error { + _, err := repl.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", + sourceSchema.Raw(), table.Table().Raw())) + return err + }() + err = conn.populateTableMetadata(ctx, table, cdtable) + r.NoError(err) + a.Equal(tc.keys, conn.primaryKeys.GetZero(table)) + columns := conn.columns.GetZero(table) + r.Equal(len(tc.columns), len(columns)) + for idx, col := range columns { + a.Equal(tc.columns[idx].Name, col.Name) + a.Equal(tc.columns[idx].Primary, col.Primary) + a.Equal(tc.columns[idx].Type, col.Type) + } + }) + } + +} +func TestDataTypes(t *testing.T) { + r := require.New(t) + // Create a basic test fixture. + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + testDataTypes(t, fixture) + ctx.Stop(time.Second) + r.NoError(ctx.Wait()) +} + +func testDataTypes(t *testing.T, fixture *base.Fixture) { + r := require.New(t) + // Build a long value for string and byte types. + var sb strings.Builder + shortString := "0123456789ABCDEF" + for sb.Len() < 1<<16 { + sb.WriteString(shortString) + } + longString := sb.String() + ctx := fixture.Context + repl, config, err := createRepl(ctx, fixture, &fixtureConfig{}, ident.Ident{}) + r.NoError(err) + defer repl.cleanUp(ctx) + + tcs := []struct { + source string + size string + crdb string + values []any // We automatically test for NULL below. + }{ + + // BIGINT(size) + {`bigint`, ``, `bigint`, []any{0, -1, 112358}}, + // BINARY(size) + {`binary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + + // BLOB(size) + {`blob`, `(65536)`, `blob`, []any{[]byte(longString)}}, + + // BOOLEAN + // Currently SQL Replication on Db2 does not support BOOLEAN. + //{`boolean`, ``, `int`, []any{true, false}}, + // CHAR(size) + {`char`, `(128)`, `string`, []any{[]byte(shortString)}}, + // CLOB(size) + {`clob`, `(128)`, `string`, []any{[]byte(shortString)}}, + // DATE + {`date`, ``, `date`, []any{"1000-01-01", "9999-12-31"}}, + // DATETIME(fsp) + {`datetime`, ``, `timestamp`, []any{"1000-01-01 00:00:00", "9999-12-31 23:59:59"}}, + + {`dbclob`, `(128)`, `string`, []any{shortString}}, + + // DECFLOAT + // DECIMAL(size, d) + {`decfloat`, ``, `decimal`, []any{math.E, math.Phi, math.Pi}}, + {`decimal`, `(11,10)`, `decimal`, []any{math.E, math.Phi, math.Pi}}, + + {`double`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + {`float`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // INT(size) + // INTEGER(size) + {`int`, ``, `int`, []any{0, -1, 112358}}, + + {`real`, ``, `float`, []any{math.E, math.Phi, math.Pi}}, + // SMALLINT(size) + {`smallint`, ``, `int`, []any{0, -1, 127}}, + + // TIME(fsp) + {`time`, ``, `time`, []any{"16:20:00"}}, + // TIMESTAMP(fsp) + {`timestamp`, ``, `timestamp`, []any{"1970-01-01 00:00:01", "2038-01-19 03:14:07"}}, + // VARBINARY(size) + {`varbinary`, `(128)`, `bytes`, []any{[]byte(shortString)}}, + // VARCHAR(size) + {`varchar`, `(128)`, `text`, []any{shortString}}, + + // Sentinel + {`char`, `(4)`, `text`, []any{`done`}}, + + // XML replication is not supported on LUW + //{`xml`, ``, `text`, `a`}, + } + + // Create a dummy table for each type. + tables := make([]*tableMapping, len(tcs)) + for idx, tc := range tcs { + tbl := tableInfo{ + name: ident.New(fmt.Sprintf("tgt_%s_%d", tc.source, idx)), + sourceColName: ident.New("V"), + sourceColType: tc.source + tc.size, + targetColName: ident.New("V"), + targetColType: tc.crdb, + } + tgt, cancel, err := repl.createTables(ctx, tbl) + r.NoError(err) + tables[idx] = tgt + defer cancel() + } + start := time.Now() + // CDC needs to be initialized again on DB2. + err = repl.cdcRestart(ctx) + r.NoError(err) + log.Infof("Restarted CDC in %d ms", time.Since(start).Milliseconds()) + // Insert data in the source tables. + for idx, tc := range tcs { + err := repl.insertValues(ctx, tables[idx].source, tc.values, true) + r.NoError(err) + } + + // Wait for the CDC source tables to be fully populated. + lastidx := len(tcs) - 1 + expected := len(tcs[lastidx].values) + 1 + var count int + query := fmt.Sprintf("SELECT count(*) FROM %s", tables[lastidx].sourceCdc.Raw()) + for count < expected { + err := repl.source.QueryRowContext(ctx, fmt.Sprintf(query)).Scan(&count) + r.NoError(err) + if count < expected { + time.Sleep(1 * time.Second) + } + } + log.Infof("Tables fully populated in %d ms", time.Since(start).Milliseconds()) + // Start logical loop + loop, err := Start(ctx, config) + r.NoError(err) + + // Wait for rows to show up. + for idx, tc := range tcs { + tc := tc + t.Run(tc.source, func(t *testing.T) { + expected := len(tc.values) + 1 + a := assert.New(t) + var count int + for count < expected { + if err := repl.target.QueryRowContext(ctx, + fmt.Sprintf("SELECT count(*) FROM %s", tables[idx].target)).Scan(&count); !a.NoError(err) { + return + } + if count < expected { + time.Sleep(100 * time.Millisecond) + + } + } + // Expect the test data and a row with a NULL value. + a.Equalf(expected, count, "mismatch in %s", tables[idx].target) + }) + } + sinktest.CheckDiagnostics(ctx, t, loop.Diagnostics) +} + +// UTILITIES + +// replAdmin provides utilites to manage source and target database. +type replAdmin struct { + source *sql.DB + sourceSchema ident.Schema + target *sql.DB + targetSchema ident.Schema + restarted bool +} + +type tableMapping struct { + source ident.Table + sourceCdc ident.Table + target ident.Table +} + +// cdcCmd issues a command to the CDC service on the source +// database, waiting for the given string to appear in CDC status, if supplied. +func (r *replAdmin) cdcCmd(ctx context.Context, command string, expect string) (bool, error) { + query := fmt.Sprintf("VALUES ASNCDC.ASNCDCSERVICES('%s','asncdc') ", command) + out, err := r.source.QueryContext(ctx, query) + if err != nil { + return false, err + } + defer out.Close() + for out.Next() { + var msg string + err := out.Scan(&msg) + if err != nil { + return false, err + } + done := strings.Contains(msg, expect) + if done { + return true, nil + } + } + return false, nil +} + +// cdcWaitFor waits until the given string is seen in the CDC status. +func (r *replAdmin) cdcWaitFor(ctx context.Context, expect string) error { + done := false + var err error + for !done { + done, err = r.cdcCmd(ctx, "status", expect) + if err != nil { + return err + } + if !done { + time.Sleep(1000 * time.Millisecond) + } + } + return nil +} + +// cdcRestart restarts CDC in the source database +func (r *replAdmin) cdcRestart(ctx context.Context) error { + err := r.cdcStop(ctx) + if err != nil { + return err + } + return r.cdcStart(ctx) +} + +// cdcStart starts CDC in the source database +func (r *replAdmin) cdcStart(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "start", "") + if err != nil { + return err + } + r.restarted = true + return r.cdcWaitFor(ctx, "is doing work") +} + +// cdcReinit re-initializes CDC in the source database +func (r *replAdmin) cdcReinit(ctx context.Context) error { + up, err := r.cdcCmd(ctx, "status", "is doing work") + if err != nil { + return err + } + // We try to re-init first, since it's faster. + // If that fails, we restart it. + if up { + reinit, err := r.cdcCmd(ctx, "reinit", "REINIT") + if err != nil { + return err + } + if reinit { + log.Info("CDC was already up, we re-init it") + return r.cdcWaitFor(ctx, "is doing work") + } + } + return r.cdcRestart(ctx) +} + +// cdcStop stops CDC in the source database +func (r *replAdmin) cdcStop(ctx context.Context) error { + _, err := r.cdcCmd(ctx, "stop", "") + if err != nil { + return err + } + return r.cdcWaitFor(ctx, "asncap is not running") +} + +func (r *replAdmin) cleanUp(ctx context.Context) error { + // if we restarted CDC, we need it to stop it. + if r.restarted { + err := r.cdcStop(ctx) + if err != nil { + return err + } + } + return r.source.Close() +} + +// createRepl sets up the replication environment. +// It creates a schema in the DB2 source. +// If tbl is provided, it sets up a userscript to process events for the table. +func createRepl( + ctx *stopper.Context, fixture *base.Fixture, fc *fixtureConfig, tblName ident.Ident, +) (*replAdmin, *Config, error) { + + dbName, _ := fixture.TargetSchema.Schema().Split() + // For now, we are using Debezium stored procedure to add tables to CDC. + // The stored procedure does not deal with case sensitivity well. + // TODO (silvano): fix stored procedure. + sourceSchema := ident.MustSchema(ident.New( + strings.ToUpper(strings.ReplaceAll(dbName.Raw(), "-", "_")))) + + var tbl ident.Table + if !tblName.Empty() { + tbl = ident.NewTable(fixture.TargetSchema.Schema(), tblName) + } + config, err := getConfig(fixture, fc, tbl, + sourceSchema, fixture.TargetSchema.Schema()) + if err != nil { + return nil, nil, err + } + + conn := &Conn{ + columns: &ident.TableMap[[]types.ColData]{}, + primaryKeys: &ident.TableMap[map[int]int]{}, + config: config, + } + db, err := conn.open() + if err != nil { + return nil, nil, err + } + // Create a schema on the source to store all the tables. + _, err = db.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA %s", sourceSchema)) + if err != nil { + return nil, nil, err + } + // To speed up testing, we reduced commit/sleep/monitor interval to decrease latency. + // With this settings the Capture program has a significant impact on foreground + // activities, and they are NOT recommended in a production env. + // COMMIT_INTERVAL = interval in seconds for how often the Capture log reader thread commits. + // SLEEP_INTERVAL = interval in seconds that the Capture program sleeps when idle. + _, err = db.ExecContext(ctx, "UPDATE ASNCDC.IBMSNAP_CAPPARMS SET COMMIT_INTERVAL=2,SLEEP_INTERVAL=1") + if err != nil { + return nil, nil, err + } + // We start the replication from the current max log sequence number + lsn, err := getCurrentLsn(ctx, db) + if err != nil { + return nil, nil, err + } + config.InitialLSN = fmt.Sprintf("%x", lsn.Value) + log.Infof("Starting consistent point %s", config.InitialLSN) + return &replAdmin{ + source: db, + sourceSchema: sourceSchema, + target: fixture.TargetPool.DB, + targetSchema: fixture.TargetSchema.Schema(), + }, config, nil +} + +func (r *replAdmin) createTables( + ctx context.Context, table tableInfo, +) (*tableMapping, func() error, error) { + // TODO (silvano): fix case sensitivity. + tb := ident.New(strings.ToUpper(table.name.Raw())) + tgt := ident.NewTable(r.targetSchema, tb) + src := ident.NewTable(r.sourceSchema, tb) + + cdc := ident.NewTable(defaultSQLReplicationSchema, + ident.New(strings.ToUpper(fmt.Sprintf("CDC_%s_%s", + r.sourceSchema.Raw(), tb.Raw())))) + res := &tableMapping{ + source: src, + sourceCdc: cdc, + target: tgt, + } + // Create the schema in both locations. + stmt := `CREATE TABLE %s ("PK" INT not null, %s %s, "OK" INT NOT NULL, PRIMARY KEY ("PK","OK"))` + _, err := r.source.ExecContext(ctx, fmt.Sprintf(stmt, src, table.sourceColName, table.sourceColType)) + if err != nil { + return &tableMapping{}, nil, err + } + _, err = r.target.ExecContext(ctx, fmt.Sprintf(stmt, tgt, table.targetColName, table.targetColType)) + if err != nil { + return &tableMapping{}, nil, err + } + // enable CDC for the table + _, err = r.source.ExecContext(ctx, fmt.Sprintf("CALL ASNCDC.ADDTABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw())) + if err != nil { + return &tableMapping{}, nil, err + } + return res, func() error { + query := fmt.Sprintf("CALL ASNCDC.REMOVETABLE('%s','%s'); ", src.Schema().Raw(), src.Table().Raw()) + _, err := r.source.ExecContext(ctx, query) + return err + }, nil +} + +// getConfig is an helper function to create a configuration for the connector +func getConfig( + fixture *base.Fixture, + fc *fixtureConfig, + tbl ident.Table, + dbName ident.Schema, + targetSchema ident.Schema, +) (*Config, error) { + + crdbPool := fixture.TargetPool + conn := defaultSourceConn + if found := os.Getenv("TEST_DB2_CONN"); len(found) > 0 { + conn = found + } + + config := &Config{ + Staging: sinkprod.StagingConfig{ + Schema: fixture.StagingDB.Schema(), + }, + Target: sinkprod.TargetConfig{ + ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier. + Conn: crdbPool.ConnectionString, + }, + + TargetSchema: targetSchema, + SourceConn: conn, + SourceSchema: dbName, + } + if fc.chaos { + config.Sequencer.Chaos = 0.0005 + } + if fc.script { + config.Script = script.Config{ + FS: scripttest.ScriptFSFor(tbl), + MainPath: "/testdata/logical_test_db2.ts", + } + } + return config, config.Preflight() +} + +// getCurrentLsn retrieves the latest sequence number in the monitoring tables. +func getCurrentLsn(ctx *stopper.Context, db *sql.DB) (*lsn, error) { + rows, err := db.QueryContext(ctx, "select MAX(RESTART_SEQ) from ASNCDC.IBMSNAP_CAPMON") + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var v []byte + err = rows.Scan(&v) + if err == nil { + return newLSN(v) + } + return nil, err + } + return lsnZero(), nil +} + +func (r *replAdmin) insertValues( + ctx context.Context, tbl ident.Table, values []any, insertNull bool, +) error { + tx, err := r.source.Begin() + if err != nil { + return err + } + defer tx.Rollback() + for valIdx, value := range values { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (?, ?, ?)`, tbl), valIdx, value, valIdx*2); err != nil { + return err + } + } + if insertNull { + if _, err := tx.ExecContext(ctx, + fmt.Sprintf(`INSERT INTO %s VALUES (-1, NULL, -2)`, tbl)); err != nil { + return err + } + } + return tx.Commit() +} diff --git a/internal/source/db2/lsn.go b/internal/source/db2/lsn.go new file mode 100644 index 00000000..62e58ab1 --- /dev/null +++ b/internal/source/db2/lsn.go @@ -0,0 +1,114 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/util/stamp" + "github.com/pkg/errors" +) + +// lsn is the Log Sequence Number. +// It represents the offset, in bytes, of a log record from the beginning of a database log file +type lsn struct { + Value [16]byte +} + +type lsnRange struct { + From *lsn + To *lsn +} + +var _ stamp.Stamp = (*lsn)(nil) +var _ fmt.Stringer = (*lsn)(nil) + +func newLSN(v []byte) (*lsn, error) { + if len(v) == 0 { + return lsnZero(), nil + } + if len(v) != 16 { + return nil, errors.Errorf("invalid lsn %s %d", string(v), len(v)) + } + return &lsn{ + Value: ([16]byte)(v), + }, nil +} + +// Less implements stamp.Stamp. +func (l *lsn) Less(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Compare(l.Value[:], o.Value[:]) < 0 +} + +// Equal check if two log sequence numbers are the same. +func (l *lsn) Equal(other stamp.Stamp) bool { + o := other.(*lsn) + return bytes.Equal(l.Value[:], o.Value[:]) +} + +// String implements fmt.Stringer. +func (l *lsn) String() string { + return fmt.Sprintf("%x", l.Value) +} + +// lsnMemo is used to store the lsn as JSON object in the memo table. +type lsnMemo struct { + Value []byte `json:"value,omitempty"` +} + +func (l *lsn) MarshalJSON() ([]byte, error) { + p := lsnMemo{Value: l.Value[:]} + return json.Marshal(p) +} + +func (l *lsn) UnmarshalJSON(data []byte) error { + var p lsnMemo + if err := json.Unmarshal(data, &p); err != nil { + return errors.WithStack(err) + } + if len(p.Value) != 16 { + return errors.Errorf("invalid lsn %s %d", string(data), len(p.Value)) + } + l.Value = ([16]byte)(p.Value) + return nil +} + +// UnmarshalText supports CLI flags and default values. +func (l *lsn) UnmarshalText(data []byte) (err error) { + if data == nil { + l.Value = lsnZero().Value + return + } + if len(data) != 32 { + return errors.Errorf("invalid lsn %s", string(data)) + } + var v []byte + v, err = hex.DecodeString(string(data)) + l.Value = ([16]byte)(v) + return +} + +func lsnZero() *lsn { + z := make([]byte, 16) + return &lsn{ + Value: ([16]byte)(z), + } +} diff --git a/internal/source/db2/lsn_test.go b/internal/source/db2/lsn_test.go new file mode 100644 index 00000000..d2800585 --- /dev/null +++ b/internal/source/db2/lsn_test.go @@ -0,0 +1,150 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "encoding/hex" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + zero = "00000000000000000000000000000000" + one = "00000000000000000000000000000001" + max = "7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF" + zerob = [16]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0} + oneb = [16]byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1} + maxb = [16]byte{0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} +) + +func parse(t *testing.T, s string) *lsn { + o, err := hex.DecodeString(s) + require.NoError(t, err) + lsn, err := newLSN(o) + require.NoError(t, err) + return lsn +} + +// TestLess verifies stamp.Stamp.Less +func TestLess(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, false}, + {"zero_one", zero, one, true}, + {"one_zero", one, zero, false}, + {"zero_max", zero, max, true}, + {"max_zero", max, zero, false}, + {"max", max, max, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Less(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestLess verifies equality between two Log Sequence Number +func TestEqual(t *testing.T) { + tests := []struct { + name string + one string + other string + want bool + }{ + {"zero", zero, zero, true}, + {"zero_one", zero, one, false}, + {"one_zero", one, zero, false}, + {"zero_max", zero, max, false}, + {"max_zero", max, zero, false}, + {"max", max, max, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := parse(t, tt.one) + other := parse(t, tt.other) + got := one.Equal(other) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestJSON verifies marshalling to JSON and unmarshalling from JSON. +func TestJSON(t *testing.T) { + tests := []struct { + name string + one [16]byte + want []byte + }{ + {"zero", zerob, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x3d, 0x3d, 0x22, 0x7d}}, + {"one", oneb, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x51, 0x3d, 0x3d, 0x22, 0x7d}}, + {"max", maxb, []byte{0x7b, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x3a, 0x22, 0x66, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x39, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x2f, 0x77, 0x3d, 0x3d, 0x22, 0x7d}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + one := &lsn{Value: tt.one} + got, err := one.MarshalJSON() + require.NoError(t, err) + assert.Equal(t, tt.want, got) + // verify roundtrip + rt := &lsn{} + err = rt.UnmarshalJSON(got) + require.NoError(t, err) + assert.Equal(t, one.Value, rt.Value) + }) + } +} + +// TestUnmarshalText verifies unmarshalling from a text string. +func TestUnmarshalText(t *testing.T) { + tests := []struct { + name string + text string + want [16]byte + err string + }{ + {"zero", zero, zerob, ""}, + {"one", one, oneb, ""}, + {"max", max, maxb, ""}, + {"odd", "0", zerob, "invalid"}, + {"short", "00", zerob, "invalid"}, + {"long", strings.Repeat("0", 34), zerob, "invalid"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := assert.New(t) + rt := &lsn{} + err := rt.UnmarshalText([]byte(tt.text)) + if tt.err != "" { + a.Error(err) + a.ErrorContains(err, tt.err) + } else { + a.NoError(err) + a.Equal(tt.want, rt.Value) + } + }) + } +} diff --git a/internal/source/db2/metrics.go b/internal/source/db2/metrics.go new file mode 100644 index 00000000..41669138 --- /dev/null +++ b/internal/source/db2/metrics.go @@ -0,0 +1,48 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + batchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_latency", + Help: "the length of time it took to process a batch", + Buckets: metrics.LatencyBuckets, + }) + batchSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "db2_batch_size", + Help: "the size of batch", + Buckets: metrics.Buckets(1, 10000), + }) + mutationCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "db2_mutations_total", + Help: "Total number of mutations by source table.", + }, + []string{"table", "op"}, + ) + queryLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "db2_query_latency", + Help: "the length of time it took to process a query", + Buckets: metrics.LatencyBuckets, + }, []string{"table"}) +) diff --git a/internal/source/db2/operation.go b/internal/source/db2/operation.go new file mode 100644 index 00000000..f82984d5 --- /dev/null +++ b/internal/source/db2/operation.go @@ -0,0 +1,36 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +//go:generate go run golang.org/x/tools/cmd/stringer -type=operation + +// operation represents the type of operation in the staging table. +type operation int + +const ( + unknownOp operation = iota + deleteOp + insertOp + updateOp + beginOp + endOp + rollbackOp +) + +func validOperation(value int32) bool { + return value >= int32(unknownOp) && value <= int32(rollbackOp) +} diff --git a/internal/source/db2/operation_string.go b/internal/source/db2/operation_string.go new file mode 100644 index 00000000..df1d306c --- /dev/null +++ b/internal/source/db2/operation_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=operation"; DO NOT EDIT. + +package db2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[unknownOp-0] + _ = x[deleteOp-1] + _ = x[insertOp-2] + _ = x[updateOp-3] + _ = x[beginOp-4] + _ = x[endOp-5] + _ = x[rollbackOp-6] +} + +const _operation_name = "unknownOpdeleteOpinsertOpupdateOpbeginOpendOprollbackOp" + +var _operation_index = [...]uint8{0, 9, 17, 25, 33, 40, 45, 55} + +func (i operation) String() string { + if i < 0 || i >= operation(len(_operation_index)-1) { + return "operation(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _operation_name[_operation_index[i]:_operation_index[i+1]] +} diff --git a/internal/source/db2/provider.go b/internal/source/db2/provider.go new file mode 100644 index 00000000..4298fd4a --- /dev/null +++ b/internal/source/db2/provider.go @@ -0,0 +1,107 @@ +// Copyright 2025 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +import ( + "fmt" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + scriptSeq "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/google/wire" +) + +// Set is used by Wire. +var Set = wire.NewSet( + ProvideConn, + ProvideEagerConfig, +) + +// ProvideConn is called by Wire to construct this package's +// logical.Dialect implementation. There's a fake dependency on +// the script loader so that flags can be evaluated first. +func ProvideConn( + ctx *stopper.Context, + acc *apply.Acceptor, + immediate *immediate.Immediate, + chaos *chaos.Chaos, + config *Config, + memo types.Memo, + scriptSeq *scriptSeq.Sequencer, + stagingPool *types.StagingPool, + targetPool *types.TargetPool, + watchers types.Watchers, +) (*Conn, error) { + if err := config.Preflight(); err != nil { + return nil, err + } + seq, err := scriptSeq.Wrap(ctx, immediate) + if err != nil { + return nil, err + } + seq, err = chaos.Wrap(ctx, seq) // No-op if probability is 0. + if err != nil { + return nil, err + } + connAcceptor, _, err := seq.Start(ctx, &sequencer.StartOptions{ + Delegate: types.OrderedAcceptorFrom(acc, watchers), + Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. + Group: &types.TableGroup{ + Name: ident.New(config.TargetSchema.Raw()), + Enclosing: config.TargetSchema, + }, + }) + if err != nil { + return nil, err + } + + opQueries := make(map[string]string) + for k, v := range opQueryTemplates { + opQueries[k] = fmt.Sprintf(v, config.SQLReplicationSchema) + } + ret := &Conn{ + + acceptor: connAcceptor, + columns: &ident.TableMap[[]types.ColData]{}, + config: config, + memo: memo, + primaryKeys: &ident.TableMap[map[int]int]{}, + replQueries: opQueries, + stagingDB: stagingPool, + tableQueries: &ident.TableMap[string]{}, + target: config.TargetSchema, + targetDB: targetPool, + walOffset: notify.Var[*lsn]{}, + } + + return ret, ret.Start(ctx) +} + +// ProvideEagerConfig is a hack to move up the evaluation of the user +// script so that the options callbacks can set any non-script-related +// CLI flags. +func ProvideEagerConfig(cfg *Config, _ *script.Loader) (*EagerConfig, error) { + return (*EagerConfig)(cfg), cfg.Preflight() +} diff --git a/internal/source/db2/queries.go b/internal/source/db2/queries.go new file mode 100644 index 00000000..9a563736 --- /dev/null +++ b/internal/source/db2/queries.go @@ -0,0 +1,111 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package db2 + +var ( + opQueryTemplates = map[string]string{ + // maxLsn is the max Log Sequence Number stored in the staging tables. + // This is used during live capture of events + "maxLsn": ` + SELECT Max(t.synchpoint) + FROM ( + SELECT cd_new_synchpoint AS synchpoint + FROM %[1]s.ibmsnap_register + UNION ALL + SELECT synchpoint AS synchpoint + FROM %[1]s.ibmsnap_register) t`, + + // nexLsn is the Log Sequence Number of the next batch from monitoring table. + // This is used during catch up, to batch mutations. + // The monitoring table is refreshed periodically based on the MONITOR_INTERVAL parameter as + // set it the ASNCDC.IBMSNAP_CAPPARMS table. By default, it set to 300 seconds. + "nextLsn": ` + SELECT Min(restart_seq) + FROM %[1]s.ibmsnap_capmon + WHERE cd_rows_inserted > 0 + AND restart_seq > ?`, + // get tables with mutations across all the schemata. + "stagingTables": ` + SELECT r.source_owner, + r.source_table, + r.cd_owner, + r.cd_table + FROM %[1]s.ibmsnap_register r + LEFT JOIN syscat.tables t + ON r.source_owner = t.tabschema + AND r.source_table = t.tabname + WHERE r.source_owner <> ''`, + // get tables with mutations within a schema + "stagingTablesForSchema": ` + SELECT r.source_owner, + r.source_table, + r.cd_owner, + r.cd_table + FROM %[1]s.ibmsnap_register r + LEFT JOIN syscat.tables t + ON r.source_owner = t.tabschema + AND r.source_table = t.tabname + WHERE r.source_owner = ? + `, + } + + // The IBMSNAP_UOW table provides additional information about transactions + // that have been committed to a source table, such as a timestamp. + // We join the IBMSNAP_UOW and change data (CD) tables based on matching + // IBMSNAP_COMMITSEQ values when to propose changes to the target tables. + // + // Note: we don't consider deletes before updates + // (we only look at the values after the update) + changeQuery = ` + SELECT * + FROM (SELECT CASE + WHEN ibmsnap_operation = 'D' + AND ( Lead(cdc.ibmsnap_operation, 1, 'X') + OVER ( + PARTITION BY cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_intentseq) ) = 'I' THEN 0 + WHEN ibmsnap_operation = 'I' + AND ( Lag(cdc.ibmsnap_operation, 1, 'X') + OVER ( + PARTITION BY cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_intentseq) ) = 'D' THEN 3 + WHEN ibmsnap_operation = 'D' THEN 1 + WHEN ibmsnap_operation = 'I' THEN 2 + END OPCODE, + uow.IBMSNAP_LOGMARKER, + cdc.* + FROM %[1]s cdc , %[2]s.IBMSNAP_UOW uow + WHERE cdc.ibmsnap_commitseq > ? + AND cdc.ibmsnap_commitseq <= ? + AND uow.ibmsnap_commitseq=cdc.ibmsnap_commitseq + ORDER BY cdc.ibmsnap_commitseq, + cdc.ibmsnap_intentseq) + WHERE opcode != 0 + ` + columnQuery = ` + SELECT c.colname, + c.keyseq, + c.typename + FROM syscat.tables AS t + INNER JOIN syscat.columns AS c + ON t.tabname = c.tabname + AND t.tabschema = c.tabschema + WHERE t.tabschema = ? + AND t.tabname = ? + ORDER BY c.colno + ` +) diff --git a/internal/source/db2/wire_gen.go b/internal/source/db2/wire_gen.go new file mode 100644 index 00000000..2e2bfb59 --- /dev/null +++ b/internal/source/db2/wire_gen.go @@ -0,0 +1,90 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package db2 + +import ( + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" + script2 "github.com/cockroachdb/cdc-sink/internal/sequencer/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/staging/memo" + "github.com/cockroachdb/cdc-sink/internal/target/apply" + "github.com/cockroachdb/cdc-sink/internal/target/dlq" + "github.com/cockroachdb/cdc-sink/internal/target/schemawatch" + "github.com/cockroachdb/cdc-sink/internal/util/applycfg" + "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" +) + +// Injectors from injector.go: + +// Start creates a DB2 logical replication loop using the +// provided configuration. +func Start(ctx *stopper.Context, config *Config) (*DB2, error) { + diagnostics := diag.New(ctx) + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + scriptConfig := &config.Script + loader, err := script.ProvideLoader(ctx, configs, scriptConfig, diagnostics) + if err != nil { + return nil, err + } + eagerConfig, err := ProvideEagerConfig(config, loader) + if err != nil { + return nil, err + } + targetConfig := &eagerConfig.Target + targetPool, err := sinkprod.ProvideTargetPool(ctx, targetConfig, diagnostics) + if err != nil { + return nil, err + } + targetStatements, err := sinkprod.ProvideStatementCache(targetConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlqConfig := &eagerConfig.DLQ + watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(dlqConfig, targetPool, watchers) + acceptor, err := apply.ProvideAcceptor(ctx, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + immediateImmediate := &immediate.Immediate{} + sequencerConfig := &eagerConfig.Sequencer + chaosChaos := &chaos.Chaos{ + Config: sequencerConfig, + } + stagingConfig := &eagerConfig.Staging + stagingPool, err := sinkprod.ProvideStagingPool(ctx, stagingConfig, diagnostics, targetConfig) + if err != nil { + return nil, err + } + stagingSchema, err := sinkprod.ProvideStagingDB(stagingConfig) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + sequencer := script2.ProvideSequencer(loader, targetPool, watchers) + conn, err := ProvideConn(ctx, acceptor, immediateImmediate, chaosChaos, config, memoMemo, sequencer, stagingPool, targetPool, watchers) + if err != nil { + return nil, err + } + db2 := &DB2{ + Conn: conn, + Diagnostics: diagnostics, + } + return db2, nil +} diff --git a/main.go b/main.go index f9f5b327..90c036d5 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "syscall" "time" + "github.com/cockroachdb/cdc-sink/internal/cmd/db2" "github.com/cockroachdb/cdc-sink/internal/cmd/dumphelp" "github.com/cockroachdb/cdc-sink/internal/cmd/dumptemplates" "github.com/cockroachdb/cdc-sink/internal/cmd/licenses" @@ -103,6 +104,8 @@ func main() { f.CountVarP(&verbosity, "verbose", "v", "increase logging verbosity to debug; repeat for trace") root.AddCommand( + db2.Command(), + db2.Install(), dumphelp.Command(), dumptemplates.Command(), licenses.Command(),