Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JDBC examples #824

Closed
oscerd opened this issue Jan 4, 2021 · 6 comments
Closed

Add JDBC examples #824

oscerd opened this issue Jan 4, 2021 · 6 comments
Assignees

Comments

@oscerd
Copy link
Contributor

oscerd commented Jan 4, 2021

to camel-kafka-connector-examples repo

@oscerd oscerd self-assigned this Jan 4, 2021
@orpiske
Copy link
Contributor

orpiske commented Jan 4, 2021

Providing these, as taken from the test code for a kick-off. Maybe we can leverage it ...

Base configuration using Postgres, but should work in the same way for other DBs:

connector.class=org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector
tasks.max=1
camel.sink.path.dataSourceName=someName
topics=org.apache.camel.kafkaconnector.jdbc.sink.CamelSinkJDBCITCase
name=CamelJDBCSinkConnector
value.converter=org.apache.kafka.connect.storage.StringConverter
camel.component.jdbc.dataSource=#class:org.apache.camel.kafkaconnector.jdbc.services.TestDataSource
key.converter=org.apache.kafka.connect.storage.StringConverter
camel.sink.endpoint.useHeadersAsParameters=true

You need to provide a data source. This should setup the JDBC url, username and password. Here's an example from the test code:

public class TestDataSource extends PGSimpleDataSource {
    private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class);

    private static final String URL;

    static {
        URL = System.getProperty(JDBCProperties.JDBC_CONNECTION_URL);
    }

    public TestDataSource() {
        super();
        setUrl(URL);

        setUser("ckc");
        setPassword("ckcDevel123");

    }
}

The database insertion is handled by the body of the message and the rows will be resolved using Camel headers, such as:

String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; 

... 


Map<String, String> jdbcParameters = new HashMap<>();

// The prefix 'CamelHeader' is removed by the SinkTask
jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName1");
jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data ");


// Send the data
kafkaClient.produce("topic-name", body, jdbcParameters);

@catchkuo
Copy link

catchkuo commented Jan 5, 2021

Implement the TestDataSource is necessary?
Can I set the JDBC_CONNECTION_URL include DB account,password in the properties file? Like normal JDBC or other KAFKA connectors. Because that's mean have to compile again when I want change the DB config.

@oscerd
Copy link
Contributor Author

oscerd commented Jan 5, 2021

@catchkuo
Copy link

catchkuo commented Jan 5, 2021

Thanks. I will try it.

@oscerd
Copy link
Contributor Author

oscerd commented Jan 5, 2021

@oscerd
Copy link
Contributor Author

oscerd commented Jan 11, 2021

Done.

@oscerd oscerd closed this as completed Jan 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants