Skip to content

Latest commit

 

History

History
172 lines (149 loc) · 4.92 KB

README.md

File metadata and controls

172 lines (149 loc) · 4.92 KB

Vertica Connector for Flink

This project is a plugin of Flink JDBC SQL Connector to allow reading data from Vertica by batch, writing data into Vertica from streams of Flink CDC Connectors for other databases like PostgreSQL/MySQL/Oracle/SQL Server and other streams or batch sources.

The architecture of Flink

Examples

Write data from MySQL to Vertica by batch

This demo can be easily run with docker-compose. Here is the key part:

CREATE TABLE test_flink_orders (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:mysql://${yourMySQLServer}:3306/${yourMySQLDB}?
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'scan.fetch-size' = '10000'
    , 'table-name' = 'test_flink_orders'
);

CREATE TABLE test_flink_orders_target (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:vertica://${yourVerticaServer}:5433/${yourVerticaDBName}'
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'sink.buffer-flush.max-rows' = '10000'
    , 'table-name' = 'test_flink_orders_target'
);

INSERT INTO test_flink_orders_target
SELECT 
    orderID
    , custName 
    , fAmount
    , dAmount
    , deAmount
    , nAmount
    , bVIP
    , dCreate
    , tCreate
    , tzCreate
    , dtCreate
    , dtzCreate
    , binPhoto
FROM test_flink_orders;

Ingesting changes of MySQL to Vertica in real-time

This demo can be easily run with docker-compose. Here is its key part:

CREATE TABLE test_flink_orders (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc'
    , 'hostname' = 'localhost'
    , 'port' = '3306'
    , 'jdbc.properties.serverTimezone' = '${TZ}'
    , 'username' = 'liudq'
    , 'password' = 'mysql'
    , 'database-name' = 'liudq'
    , 'table-name' = 'test_flink_orders'
);

CREATE TABLE test_flink_orders_target (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:vertica://${yourVerticaServer}:5433/${yourVerticaDBName}'
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'sink.buffer-flush.max-rows' = '10000'
    , 'table-name' = 'test_flink_orders_target'
);

INSERT INTO test_flink_orders_target
SELECT 
    orderID
    , custName 
    , fAmount
    , dAmount
    , deAmount
    , nAmount
    , bVIP
    , dCreate
    , tCreate
    , tzCreate
    , dtCreate
    , dtzCreate
    , binPhoto
FROM test_flink_orders;

Installation

At first, you need setup Flink cluster and its JDBC SQL Connector, and CDC Connectors optionally.

You can download the latest version of vertica-flink-connector_${FLINK_VERSON}-*.jar, or build it from source code of this project, and just put it in ${FLINK_HOME}/lib and restart you cluster..

[Optional] Build from source code

Requirements

  • Java 11+
  • Maven 3.3+

You will get vertica-flink-connector_${FLINK_VERSON}-*.jar under [target/] directory after correctly running following command under top of source code tree.

mvn -DskipTests=true clean package