Skip to content

Commit

Permalink
First draft for the DB2 connector.
Browse files Browse the repository at this point in the history
Uses DB2 SQL replication, leveraging staging tables in the source database.
For instructions on how to put table in capture mode in DB2, so that changes can
be sent to the staging tables, see
https://debezium.io/documentation/reference/stable/connectors/db2.html#putting-tables-in-capture-mode.

The connector periodically pulls new mutations from the staging tables and inserts them
to the target database via the logical.Batch interface.
  • Loading branch information
sravotto committed Jan 26, 2024
1 parent 8abdb6e commit c840b12
Show file tree
Hide file tree
Showing 39 changed files with 3,552 additions and 2 deletions.
28 changes: 28 additions & 0 deletions .github/db2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM icr.io/db2_community/db2

LABEL maintainer="Debezium Community"

RUN mkdir -p /asncdctools/src

ADD asncdc_UDF.sql /asncdctools/src
ADD asncdcaddremove.sql /asncdctools/src
ADD asncdctables.sql /asncdctools/src
ADD dbsetup.sh /asncdctools/src
ADD startup-agent.sql /asncdctools/src
ADD startup-cdc-demo.sql /asncdctools/src
ADD inventory.sql /asncdctools/src
ADD asncdc.c /asncdctools/src

RUN mkdir /var/custom && \
chmod -R 777 /asncdctools && \
chmod -R 777 /var/custom

ADD cdcsetup.sh /var/custom
ADD custom-init /var/custom-init

RUN chmod -R 777 /var/custom-init

ADD openshift_entrypoint.sh /var/db2_setup/lib

RUN chmod 777 /var/custom/cdcsetup.sh && \
chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh
5 changes: 5 additions & 0 deletions .github/db2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# DB2 container

This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables.

Taken from https://github.com/debezium/debezium-examples/tree/main/tutorial/debezium-db2-init/db2server
162 changes: 162 additions & 0 deletions .github/db2/asncdc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sqludf.h>
#include <sqlstate.h>

void SQL_API_FN asncdcservice(
SQLUDF_VARCHAR *asnCommand, /* input */
SQLUDF_VARCHAR *asnService,
SQLUDF_CLOB *fileData, /* output */
/* null indicators */
SQLUDF_NULLIND *asnCommand_ind, /* input */
SQLUDF_NULLIND *asnService_ind,
SQLUDF_NULLIND *fileData_ind,
SQLUDF_TRAIL_ARGS,
struct sqludf_dbinfo *dbinfo)
{

int fd;
char tmpFileName[] = "/tmp/fileXXXXXX";
fd = mkstemp(tmpFileName);

int strcheck = 0;
char cmdstring[256];


char* szDb2path = getenv("HOME");



char str[20];
int len = 0;
char c;
char *buffer = NULL;
FILE *pidfile;

char dbname[129];
memset(dbname, '\0', 129);
strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen);
dbname[dbinfo->dbnamelen] = '\0';

int pid;
if (strcmp(asnService, "asncdc") == 0)
{
strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName);
int callcheck;
callcheck = system(cmdstring);
pidfile = fopen(tmpFileName, "r");
while ((c = fgetc(pidfile)) != EOF)
{
if (c == '\n')
{
break;
}
len++;
}
buffer = (char *)malloc(sizeof(char) * len);
fseek(pidfile, 0, SEEK_SET);
fread(buffer, sizeof(char), len, pidfile);
fclose(pidfile);
pidfile = fopen(tmpFileName, "w");
if (strcmp(asnCommand, "start") == 0)
{
if (len == 0) // is not running
{
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname);
fprintf(pidfile, "start --> %s \n", cmdstring);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is already running");
}
}
if ((strcmp(asnCommand, "prune") == 0) ||
(strcmp(asnCommand, "reinit") == 0) ||
(strcmp(asnCommand, "suspend") == 0) ||
(strcmp(asnCommand, "resume") == 0) ||
(strcmp(asnCommand, "status") == 0) ||
(strcmp(asnCommand, "stop") == 0))
{
if (len > 0)
{
//buffer[len] = '\0';
//strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer);
//fprintf(pidfile, "stop --> %s", cmdstring);
//callcheck = system(cmdstring);
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName);
//fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is not running");
}
}

fclose(pidfile);
}
/* system(cmdstring); */

int rc = 0;
long fileSize = 0;
size_t readCnt = 0;
FILE *f = NULL;

f = fopen(tmpFileName, "r");
if (!f)
{
strcpy(SQLUDF_MSGTX, "Could not open file ");
strncat(SQLUDF_MSGTX, tmpFileName,
SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1);
strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN);
return;
}

rc = fseek(f, 0, SEEK_END);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN);
return;
}

/* verify the file size */
fileSize = ftell(f);
if (fileSize > fileData->length)
{
strcpy(SQLUDF_MSGTX, "File too large");
strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN);
return;
}

/* go to the beginning and read the entire file */
rc = fseek(f, 0, 0);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN);
return;
}

readCnt = fread(fileData->data, 1, fileSize, f);
if (readCnt != fileSize)
{
/* raise a warning that something weird is going on */
sprintf(SQLUDF_MSGTX, "Could not read entire file "
"(%d vs %d)",
readCnt, fileSize);
strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN);
*fileData_ind = -1;
}
else
{
fileData->length = readCnt;
*fileData_ind = 0;
}
// remove temorary file
rc = remove(tmpFileName);
//fclose(pFile);
}
17 changes: 17 additions & 0 deletions .github/db2/asncdc_UDF.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP SPECIFIC FUNCTION ASNCDC.asncdcservice;

CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8))
RETURNS CLOB(100K)
SPECIFIC asncdcservice
EXTERNAL NAME 'asncdc!asncdcservice'
LANGUAGE C
PARAMETER STYLE SQL
DBINFO
DETERMINISTIC
NOT FENCED
RETURNS NULL ON NULL INPUT
NO SQL
NO EXTERNAL ACTION
NO SCRATCHPAD
ALLOW PARALLEL
NO FINAL CALL;

0 comments on commit c840b12

Please sign in to comment.