You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We use testcontainer to mock end to end test.
We will start four container(e.g. test for kafka): flink:1.13.5-scala_2.11(jobmanager)、flink:1.13.5-scala_2.11(taskmanager)、confluentinc/cp-kafka:6.2.1(kafka server)、mysql:5.7(mock input data and output data).
Data flow is: mysql input table -> kafka -> mysql output table
so we start two insert task in one flink job, we test kafka load node and extract node in the same time.
input data it sql file
e.g.:
CREATE TABLE test_input (
`id` INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
enum_c STRING,
json_c STRING,
point_c STRING
) WITH (
'connector' = 'mysql-cdc-inlong',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'inlong',
'password' = 'inlong',
'database-name' = 'test',
'table-name' = 'test_input',
'append-mode' = 'true',
'scan.incremental.snapshot.chunk.size' = '4',
'scan.incremental.snapshot.enabled' = 'false'
);
CREATE TABLE kafka_load (
`id` INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
enum_c STRING,
json_c STRING,
point_c STRING
) WITH (
'connector' = 'kafka-inlong',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'csv'
);
CREATE TABLE kafka_extract (
`id` INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
enum_c STRING,
json_c STRING,
point_c STRING
) WITH (
'connector' = 'kafka-inlong',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
CREATE TABLE test_output (
`id` INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
enum_c STRING,
json_c STRING,
point_c STRING
) WITH (
'connector' = 'jdbc-inlong',
'url' = 'jdbc:mysql://mysql:3306/test',
'table-name' = 'test_output',
'username' = 'inlong',
'password' = 'inlong'
);
INSERT INTO kafka_load select * from test_input;
INSERT INTO test_output select * from kafka_extract
thesumery
changed the title
[Feature][Sort] Import sort end2end sql file unit test
[Feature][Sort] Import sort end2end unit test with sql file input
Jun 24, 2022
Description
We will start four container(e.g. test for kafka):
flink:1.13.5-scala_2.11(jobmanager)
、flink:1.13.5-scala_2.11(taskmanager)
、confluentinc/cp-kafka:6.2.1(kafka server)
、mysql:5.7(mock input data and output data)
.mysql input table -> kafka -> mysql output table
so we start two insert task in one flink job, we test kafka load node and extract node in the same time.
e.g.:
Use case
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: