Skip to content

feat: add Apache Pinot OFFLINE segment sink connector#328

Draft
deeppatel710 wants to merge 1 commit intoDataSQRL:mainfrom
deeppatel710:feat/pinot-connector
Draft

feat: add Apache Pinot OFFLINE segment sink connector#328
deeppatel710 wants to merge 1 commit intoDataSQRL:mainfrom
deeppatel710:feat/pinot-connector

Conversation

@deeppatel710
Copy link
Copy Markdown

Summary

  • Adds a pinot Flink SQL connector that writes RowData into Apache Pinot OFFLINE segments using FlinkSegmentWriter + SegmentUploaderDefault from pinot-flink-connector 1.3.0 (no SNAPSHOT required)
  • At-least-once delivery: segments flush on every Flink checkpoint and when segment.flush.rows threshold is reached; async upload via a small executor pool
  • Supports all Flink SQL primitive types, TIMESTAMP_LTZ (→ epoch millis), and DECIMAL (→ double)

New files

File Purpose
PinotDynamicTableFactory Registers 'connector' = 'pinot'; options: controller.url, table.name, segment.flush.rows
PinotDynamicTableSink Bridges DynamicTableSinkPinotRowDataSink
PinotRowDataSink Flink 2.x Sink<RowData> with async segment upload
PinotRowDataConverter Maps Flink RowData fields to Pinot GenericRow
PinotOptions Typed connector option constants

Example DDL

CREATE TABLE my_pinot_table (
  user_id  BIGINT,
  action   STRING,
  event_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector'          = 'pinot',
  'controller.url'     = 'http://pinot-controller:9000',
  'table.name'         = 'my_pinot_table',
  'segment.flush.rows' = '500000'
);

Dependency fixes (pom.xml)

  • Calcite conflict: excluded org.apache.calcite:* from pinot-flink-connector so Flink 2.2's newer Calcite (which adds SqlTypeName.VARIANT) wins
  • Jackson conflict: pinned jackson-core 2.18.2helix-core 1.3.1 otherwise pulls in 2.12.6, which is missing StreamReadConstraints needed by jackson-databind 2.18.2
  • Chronicle Core / JDK 17+: added --add-opens argLine to Surefire for net.openhft:chronicle-core reflective JDK-internal access

Test plan

  • PinotRowDataConverterTest — 5 unit tests covering primitives, TIMESTAMP_LTZ, DECIMAL, nulls, and post-deserialization behaviour
  • PinotSinkFlinkTest.writesRowsToOfflineTable — end-to-end integration test: Flink minicluster + 4 Testcontainers (ZooKeeper + Controller + Broker + Server); generates 20 rows via datagen, inserts into Pinot OFFLINE table, asserts segment appears in controller REST API

🤖 Generated with Claude Code

Implements a Flink 2.x DynamicTableSink that writes RowData records into
Apache Pinot OFFLINE segments via FlinkSegmentWriter + SegmentUploaderDefault,
without requiring Pinot 1.6.0+ SNAPSHOT artifacts.

- PinotDynamicTableFactory: registers 'pinot' connector with controller.url,
  table.name, and segment.flush.rows options
- PinotRowDataSink: at-least-once Sink<RowData> with async segment upload;
  flushes on checkpoint (flush()) and when segmentFlushRows threshold is reached
- PinotRowDataConverter: maps Flink RowData fields to Pinot GenericRow,
  converting TIMESTAMP_LTZ → epoch millis and DECIMAL → double
- Integration test (PinotSinkFlinkTest): Flink minicluster + 4 Testcontainers
  (ZK + Controller + Broker + Server); verifies end-to-end segment upload

Dependency notes in pom.xml:
- Excludes org.apache.calcite:* from pinot-flink-connector (avoids
  SqlTypeName.VARIANT conflict with Flink 2.2's newer Calcite)
- Pins jackson-core 2.18.2 (helix-core pulls in 2.12.6 which lacks
  StreamReadConstraints needed by jackson-databind 2.18.2)
- Adds --add-opens for Chronicle Core's JDK-internal reflection on JDK 17+

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Deep Patel <deeppatel710@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant