A Kafka Streams application that attaches inline JSON schemas to messages from Materialize sinks, enabling seamless integration with the Confluent JDBC Sink Connector.
Materialize (exactly-once)
→ Redpanda (JSON without schema)
→ Kafka Streams App (attaches schema, exactly-once)
→ Redpanda (JSON with inline schema)
→ JDBC Sink Connector (upsert mode)
→ Postgres
- Exactly-once semantics throughout the pipeline
- Simple configuration - no code changes needed to add new topics
- Upsert support - handles updates correctly using primary keys
- Multiple topic support - transform multiple streams simultaneously
- Fully tested - comprehensive unit tests included
- Docker-based - entire pipeline runs in Docker Compose
- Java 17+
- Maven 3.6+
- Docker & Docker Compose
- Python 3.8+ (for test data generation)
mvn clean packageThis creates a fat JAR: target/json-schema-attacher-1.0.0-SNAPSHOT.jar
docker compose up -dThis starts:
- Redpanda (Kafka-compatible broker)
- Redpanda Console (Web UI on http://localhost:8080)
- Postgres (data destination)
- Materialize (streaming database)
- Kafka Connect (JDBC sink connector)
- Schema Attacher App (this application)
Wait for all services to be healthy (~30-60 seconds):
docker compose ps./scripts/deploy_connectors.shThis deploys three JDBC sink connectors with upsert mode enabled.
Install Python dependencies:
pip install -r scripts/requirements.txtRun the data generator:
python3 scripts/generate_test_data.pyThis generates:
- 10 user records
- 20 order records
- 30 event records
- Updates to test upsert behavior
./scripts/verify_pipeline.shOr manually check Postgres:
docker exec -it postgres psql -U postgres -d sink_dbSELECT * FROM users;
SELECT * FROM orders;
SELECT * FROM events;Edit config/application.properties:
# Pattern: schema.topic.<input-topic>=<schema-json>
# output.topic.<input-topic>=<output-topic>
schema.topic.my-new-topic={"type": "struct", "fields": [...], "name": "..."}
output.topic.my-new-topic=my-new-topic-with-schemaNeed help creating schemas? See the Schema Configuration Guide for detailed instructions on:
- Mapping Materialize types to Kafka Connect schema types
- Step-by-step schema creation from materialized views
- Timestamp and logical type handling
- Common patterns and troubleshooting
Restart the schema attacher:
docker compose restart schema-attacherThe application is configured for exactly-once processing:
processing.guarantee=exactly_once_v2Combined with:
- Materialize's exactly-once sinks
- JDBC connector's upsert mode
This provides end-to-end exactly-once delivery.
Schemas must be in Kafka Connect JSON format:
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": false,
"name": "schema.name"
}Supported types:
int32,int64float32,float64stringbooleanbytes- Nested
struct array
For complex schemas or multiple sinks, use the included schema generator tool to automatically create schema configurations from your Materialize sinks:
# Install dependencies
pip install -r tools/requirements.txt
# Generate schemas for all Kafka sinks
python3 tools/generate_schemas.py \
--host localhost \
--port 6875 \
--output config/generated.properties
# Generate for specific sinks
python3 tools/generate_schemas.py \
--host mz.example.com \
--sink users_json_sink \
--sink orders_json_sinkThe tool:
- Queries Materialize's catalog to discover all Kafka sinks
- Extracts column definitions with types and nullability
- Automatically maps Materialize types to Kafka Connect types
- Handles temporal types with proper logical type annotations
- Supports properties and JSON output formats
See tools/README.md for complete documentation.
If you want to run only the schema-attacher container (not the full docker-compose stack), use the production Docker image.
Create a custom application.properties file for your environment:
# Kafka/Redpanda connection settings
bootstrap.servers=your-kafka-broker:9092
application.id=schema-attacher
# Kafka Streams configuration
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
# Processing guarantees - exactly-once semantics
processing.guarantee=exactly_once_v2
# Topic mappings
schema.topic.your-input-topic={"type": "struct", "fields": [...], "name": "your.schema"}
output.topic.your-input-topic=your-output-topicKey Settings to Customize:
bootstrap.servers: Your Kafka/Redpanda broker addressesapplication.id: Unique identifier for this Kafka Streams app- Topic mappings: Define your input topics, schemas, and output topics
Pull the production image:
docker pull ghcr.io/materializeinclabs/materialize-json-inline-schema:latestRun with your custom configuration:
docker run -d \
--name schema-attacher \
--restart unless-stopped \
--network your-network \
-v /path/to/your/application.properties:/app/config/application.properties \
-e JAVA_OPTS="-Xmx2g -Xms2g" \
ghcr.io/materializeinclabs/materialize-json-inline-schema:latestImportant:
- Replace
/path/to/your/application.propertieswith your actual config file path - Adjust
--networkto match your Kafka cluster's network - Tune
JAVA_OPTSmemory settings based on your throughput needs
To update the configuration:
- Edit your
application.propertiesfile - Restart the container:
docker restart schema-attacher
The application will reload with the new configuration.
View logs:
docker logs -f schema-attacherCheck health:
docker ps --filter name=schema-attacherVerify it reached RUNNING state:
docker logs schema-attacher | grep "State transition.*RUNNING"bootstrap.servers=pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
application.id=schema-attacher-prod
# SASL/SSL configuration for Confluent Cloud
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="YOUR_API_KEY" \
password="YOUR_API_SECRET";
# Processing guarantee
processing.guarantee=exactly_once_v2
# Your topic mappings
schema.topic.users-from-materialize={"type": "struct", ...}
output.topic.users-from-materialize=users-with-schemabootstrap.servers=b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092,b-2.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092
application.id=schema-attacher-msk
# IAM authentication (if using MSK IAM)
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# Processing guarantee
processing.guarantee=exactly_once_v2
# Your topic mappings
schema.topic.users-from-materialize={"type": "struct", ...}
output.topic.users-from-materialize=users-with-schema.
├── src/
│ ├── main/java/com/materialize/schema/
│ │ ├── SchemaAttacherApp.java # Main application
│ │ ├── SchemaWrapper.java # Schema wrapping logic
│ │ ├── ConfigParser.java # Configuration parser
│ │ └── TopicConfig.java # Topic configuration model
│ └── test/java/com/materialize/schema/
│ ├── SchemaWrapperTest.java
│ ├── ConfigParserTest.java
│ └── TopicConfigTest.java
├── docker/
│ ├── materialize/init.sql # Materialize setup
│ ├── postgres/init.sql # Postgres schema
│ └── connect/ # JDBC connector configs
│ ├── users-sink.json
│ ├── orders-sink.json
│ └── events-sink.json
├── scripts/
│ ├── generate_test_data.py # Test data generator
│ ├── deploy_connectors.sh # Deploy JDBC connectors
│ └── verify_pipeline.sh # Verify data flow
├── config/
│ └── application.properties # Kafka Streams config
├── docker-compose.yml
├── Dockerfile
└── pom.xml
mvn testThe entire pipeline serves as an integration test:
- Start the infrastructure
- Generate test data
- Verify data landed in Postgres correctly
- Check for duplicates (should be none with exactly-once)
- Generate updates and verify upserts work
Monitor topics in Redpanda Console:
Check Materialize views:
docker exec -it materialize psql -h localhost -p 6875 -U materializeSHOW SOURCES;
SHOW SINKS;
SELECT * FROM users_processed LIMIT 5;View connector status:
curl http://localhost:8083/connectors/users-postgres-sink/status | jq '.'Check logs:
docker compose logs schema-attacherCommon issues:
- Redpanda not ready yet (wait and restart)
- Invalid schema JSON in configuration
-
Check if data is in Materialize:
docker exec -it materialize psql -h localhost -p 6875 -U materialize SELECT * FROM users_processed;
-
Check if schema attacher is processing:
docker-compose logs schema-attacher
-
Check JDBC connector status:
curl http://localhost:8083/connectors/users-postgres-sink/status
-
Check connector logs:
docker compose logs kafka-connect
Topics are auto-created by Redpanda. If needed, manually create:
docker exec redpanda rpk topic create users-json-noschema
docker exec redpanda rpk topic create users-json-withschemaFor production deployments:
-
Kafka Streams:
- Increase
num.stream.threadsfor parallelism - Tune
cache.max.bytes.bufferingfor batching
- Increase
-
Redpanda:
- Increase resources (memory, CPU)
- Adjust
--smpand--memoryflags
-
Postgres:
- Add appropriate indexes
- Tune connection pool size in JDBC connector
- Consider partitioning for large tables
-
Start only infrastructure:
docker compose up -d redpanda postgres materialize kafka-connect
-
Update
config/application.properties:bootstrap.servers=localhost:19092 -
Run the application:
java -jar target/json-schema-attacher-1.0.0-SNAPSHOT.jar config/application.properties
The codebase is structured for easy extension:
- SchemaWrapper: Modify to support different schema formats
- ConfigParser: Add support for external schema files
- SchemaAttacherApp: Add custom transformations or filtering
[Your License Here]
[Your Contributing Guidelines Here]