Skip to content

Commit

Permalink
First rough draft for the DB2 connector.
Browse files Browse the repository at this point in the history
Uses SQL replication, leveraging staging tables in the source database.
The connector periodically pull new mutations from the staging tables.
  • Loading branch information
sravotto committed Jan 24, 2024
1 parent 7a3bcb1 commit 96b424b
Show file tree
Hide file tree
Showing 38 changed files with 3,449 additions and 2 deletions.
28 changes: 28 additions & 0 deletions .github/db2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM icr.io/db2_community/db2

LABEL maintainer="Debezium Community"

RUN mkdir -p /asncdctools/src

ADD asncdc_UDF.sql /asncdctools/src
ADD asncdcaddremove.sql /asncdctools/src
ADD asncdctables.sql /asncdctools/src
ADD dbsetup.sh /asncdctools/src
ADD startup-agent.sql /asncdctools/src
ADD startup-cdc-demo.sql /asncdctools/src
ADD inventory.sql /asncdctools/src
ADD asncdc.c /asncdctools/src

RUN mkdir /var/custom && \
chmod -R 777 /asncdctools && \
chmod -R 777 /var/custom

ADD cdcsetup.sh /var/custom
ADD custom-init /var/custom-init

RUN chmod -R 777 /var/custom-init

ADD openshift_entrypoint.sh /var/db2_setup/lib

RUN chmod 777 /var/custom/cdcsetup.sh && \
chmod 777 /var/db2_setup/lib/openshift_entrypoint.sh
3 changes: 3 additions & 0 deletions .github/db2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# DB2 container

This directory builds a DB2 container with utilities provided by Debezium to enable CDC on specific tables.
162 changes: 162 additions & 0 deletions .github/db2/asncdc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sqludf.h>
#include <sqlstate.h>

void SQL_API_FN asncdcservice(
SQLUDF_VARCHAR *asnCommand, /* input */
SQLUDF_VARCHAR *asnService,
SQLUDF_CLOB *fileData, /* output */
/* null indicators */
SQLUDF_NULLIND *asnCommand_ind, /* input */
SQLUDF_NULLIND *asnService_ind,
SQLUDF_NULLIND *fileData_ind,
SQLUDF_TRAIL_ARGS,
struct sqludf_dbinfo *dbinfo)
{

int fd;
char tmpFileName[] = "/tmp/fileXXXXXX";
fd = mkstemp(tmpFileName);

int strcheck = 0;
char cmdstring[256];


char* szDb2path = getenv("HOME");



char str[20];
int len = 0;
char c;
char *buffer = NULL;
FILE *pidfile;

char dbname[129];
memset(dbname, '\0', 129);
strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen);
dbname[dbinfo->dbnamelen] = '\0';

int pid;
if (strcmp(asnService, "asncdc") == 0)
{
strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName);
int callcheck;
callcheck = system(cmdstring);
pidfile = fopen(tmpFileName, "r");
while ((c = fgetc(pidfile)) != EOF)
{
if (c == '\n')
{
break;
}
len++;
}
buffer = (char *)malloc(sizeof(char) * len);
fseek(pidfile, 0, SEEK_SET);
fread(buffer, sizeof(char), len, pidfile);
fclose(pidfile);
pidfile = fopen(tmpFileName, "w");
if (strcmp(asnCommand, "start") == 0)
{
if (len == 0) // is not running
{
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname);
fprintf(pidfile, "start --> %s \n", cmdstring);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is already running");
}
}
if ((strcmp(asnCommand, "prune") == 0) ||
(strcmp(asnCommand, "reinit") == 0) ||
(strcmp(asnCommand, "suspend") == 0) ||
(strcmp(asnCommand, "resume") == 0) ||
(strcmp(asnCommand, "status") == 0) ||
(strcmp(asnCommand, "stop") == 0))
{
if (len > 0)
{
//buffer[len] = '\0';
//strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer);
//fprintf(pidfile, "stop --> %s", cmdstring);
//callcheck = system(cmdstring);
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName);
//fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is not running");
}
}

fclose(pidfile);
}
/* system(cmdstring); */

int rc = 0;
long fileSize = 0;
size_t readCnt = 0;
FILE *f = NULL;

f = fopen(tmpFileName, "r");
if (!f)
{
strcpy(SQLUDF_MSGTX, "Could not open file ");
strncat(SQLUDF_MSGTX, tmpFileName,
SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1);
strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN);
return;
}

rc = fseek(f, 0, SEEK_END);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN);
return;
}

/* verify the file size */
fileSize = ftell(f);
if (fileSize > fileData->length)
{
strcpy(SQLUDF_MSGTX, "File too large");
strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN);
return;
}

/* go to the beginning and read the entire file */
rc = fseek(f, 0, 0);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN);
return;
}

readCnt = fread(fileData->data, 1, fileSize, f);
if (readCnt != fileSize)
{
/* raise a warning that something weird is going on */
sprintf(SQLUDF_MSGTX, "Could not read entire file "
"(%d vs %d)",
readCnt, fileSize);
strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN);
*fileData_ind = -1;
}
else
{
fileData->length = readCnt;
*fileData_ind = 0;
}
// remove temorary file
rc = remove(tmpFileName);
//fclose(pFile);
}
17 changes: 17 additions & 0 deletions .github/db2/asncdc_UDF.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP SPECIFIC FUNCTION ASNCDC.asncdcservice;

CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8))
RETURNS CLOB(100K)
SPECIFIC asncdcservice
EXTERNAL NAME 'asncdc!asncdcservice'
LANGUAGE C
PARAMETER STYLE SQL
DBINFO
DETERMINISTIC
NOT FENCED
RETURNS NULL ON NULL INPUT
NO SQL
NO EXTERNAL ACTION
NO SCRATCHPAD
ALLOW PARALLEL
NO FINAL CALL;

0 comments on commit 96b424b

Please sign in to comment.