Skip to content

Listen a file

Marco Alessandro Riggio edited this page Nov 6, 2019 · 4 revisions

JsonFileConsumerConfig allow you to setup the JsonFileConsumer. The following code snippet shows how use that class to read a file and handling mutation:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final JsonFileConsumerConfig<BusinessConfiguration> config = new JsonFileConsumerConfig<>(BusinessConfiguration.class)
        .setPath("s3://<YOUR_AWESOME_FILE>.json");
final JsonFileConsumer<BusinessConfiguration> consumer = new JsonFileConsumer<>(config);
env.setParallelism(parallelism);
consumer.getJsonFileStream(env)
        .addSink(...);
env.execute();

The file can be stored in all Flink supported file systems such as:

For a complete example, JsonFileConsumerTest contanins unit and integration test that explain how the library works.

Clone this wiki locally