Skip to content

datanutshell/flink-http-full-cache-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

3 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Flink HTTP Lookup Connector

CI Release Maven Central License

Apache Flink connector for HTTP-based lookups with comprehensive caching support, enabling efficient data enrichment in streaming applications.

Features

  • πŸš€ High Performance: Full cache loading with configurable refresh intervals
  • πŸ”„ Automatic Refresh: Configurable cache refresh strategies
  • πŸ›‘οΈ Fault Tolerant: Built-in retry mechanisms and error handling
  • 🎯 Easy Integration: Simple SQL DDL configuration
  • ⚑ Low Latency: In-memory caching for sub-millisecond lookups

Quick Start

Maven Dependency

<dependency>
    <groupId>com.datanutshell.flink</groupId>
    <artifactId>flink-http-lookup-connector</artifactId>
    <version>1.0.0</version>
</dependency>

Gradle Dependency

implementation 'com.datanutshell.flink:flink-http-lookup-connector:1.0.0'

Basic Usage

Create a lookup table using SQL DDL:

CREATE TABLE user_lookup (
  id INT,
  name STRING,
  email STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'http-lookup-full-cache',
  'url' = 'https://api.example.com/users',
  'cache.refresh-interval' = 'PT10M',
  'method' = 'GET'
);

Use it in a lookup join:

SELECT 
  e.user_id,
  e.event_type,
  u.name,
  u.email
FROM user_events e
LEFT JOIN user_lookup FOR SYSTEM_TIME AS OF e.proc_time AS u
  ON e.user_id = u.id;

Configuration Options

Parameter Type Required Default Description
connector String Yes - Must be http-lookup-full-cache
url String Yes - HTTP endpoint URL
method String No GET HTTP method (GET, POST, etc.)
cache.refresh-interval Duration No PT1H Cache refresh interval (ISO-8601 duration)
xpath String No `` XPath expression for data extraction
connect.timeout.seconds Integer No 10 Connection timeout in seconds
read.timeout.seconds Integer No 30 Read timeout in seconds
max.retries Integer No 3 Maximum number of retries
retry.delay.ms Long No 1000 Delay between retries in milliseconds

Examples

Basic User Lookup

CREATE TABLE user_lookup (
  id INT,
  name STRING,
  username STRING,
  email STRING,
  phone STRING,
  website STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'http-lookup-full-cache',
  'url' = 'https://jsonplaceholder.typicode.com/users',
  'cache.refresh-interval' = 'PT10M',
  'method' = 'GET',
  'connect.timeout.seconds' = '10',
  'read.timeout.seconds' = '30',
  'max.retries' = '3',
  'retry.delay.ms' = '1000'
);

Real-time Event Enrichment

-- Source table with events
CREATE TABLE user_events (
  user_id INT,
  event_type STRING,
  event_time TIMESTAMP(3),
  proc_time AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
);

-- Enriched output
INSERT INTO enriched_events
SELECT 
  e.user_id,
  e.event_type,
  e.event_time,
  u.name,
  u.email
FROM user_events e
LEFT JOIN user_lookup FOR SYSTEM_TIME AS OF e.proc_time AS u
  ON e.user_id = u.id;

Architecture

The connector implements a full-cache strategy where:

  1. Initial Load: All data is loaded from the HTTP endpoint at startup
  2. Periodic Refresh: Cache is refreshed at configurable intervals
  3. In-Memory Storage: Data is stored in memory for fast lookups
  4. Fault Tolerance: Automatic retries and error handling
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    HTTP Request    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 β”‚ ──────────────────▢│                 β”‚
β”‚  Flink Job      β”‚                    β”‚  HTTP Endpoint  β”‚
β”‚                 β”‚ ◀────────────────── β”‚                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    JSON Response   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                                       β–²
         β–Ό                                       β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                             β”‚
β”‚   In-Memory     β”‚                             β”‚
β”‚     Cache       β”‚                             β”‚
β”‚                 β”‚                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                             β”‚
         β”‚                              Periodic Refresh
         β–Ό                                       β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                             β”‚
β”‚   Lookup Join   β”‚ β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚    Operation    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Development

Prerequisites

  • Java 11 or higher
  • Apache Flink 1.17+
  • Gradle 8.0+

Building from Source

# Clone the repository
git clone https://github.com/dataengnutshell/flink-http-full-cache-connector.git
cd flink-http-full-cache-connector

# Build the project
./gradlew build

# Run tests
./gradlew test

# Run integration tests
./gradlew integrationTest

Running the Example

cd example
./gradlew run

This will start a Flink job that demonstrates the HTTP lookup connector using the JSONPlaceholder API.

Monitoring and Metrics

The connector provides comprehensive metrics for monitoring:

  • Cache Hit Rate: Percentage of successful cache lookups
  • Cache Refresh Duration: Time taken to refresh the cache
  • HTTP Request Metrics: Success/failure rates, response times
  • Error Rates: Retry attempts and failure counts

Access these metrics through Flink's metrics system and your monitoring infrastructure.

Performance Considerations

Cache Sizing

  • Monitor memory usage when caching large datasets
  • Consider the trade-off between refresh frequency and data freshness
  • Use appropriate JVM heap sizing for your cache requirements

Network Optimization

  • Set appropriate timeout values for your network conditions
  • Configure retry strategies based on endpoint reliability
  • Consider using connection pooling for high-throughput scenarios

Refresh Strategy

-- Frequent updates for critical data
'cache.refresh-interval' = 'PT1M'  -- Every minute

-- Balanced approach for most use cases
'cache.refresh-interval' = 'PT10M' -- Every 10 minutes

-- Infrequent updates for static data
'cache.refresh-interval' = 'PT1H'  -- Every hour

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Workflow

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Ensure all tests pass
  6. Submit a pull request

Code Style

This project uses Scalafmt for code formatting:

./gradlew scalafmtAll

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Support


Data Nutshell

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages