Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,53 @@
- name: Build and Test Bindings
run: ./scripts/build_python_bindings.sh --test

kotlin-bindings:
name: Kotlin Bindings
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8

- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'

- name: Cache Gradle dependencies
uses: actions/cache@v4
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: |
${{ runner.os }}-gradle-

- name: Cache Cargo dependencies
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-kotlin-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-kotlin-

- name: Build and Verify Kotlin Bindings
run: ./scripts/build_kotlin_bindings.sh

- name: Verify Examples Build
run: |
cd kotlin
./gradlew :examples:simple_client:build
./gradlew :examples:quic_client:build

coverage:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
name: Code Coverage
runs-on: ubuntu-latest

Expand All @@ -161,7 +207,7 @@
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
components: llvm-tools-preview

Expand Down Expand Up @@ -210,7 +256,7 @@
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8

- name: Install protoc
uses: arduino/setup-protoc@v3
Expand Down Expand Up @@ -248,7 +294,7 @@
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
targets: ${{ matrix.target }}

Expand Down
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@ python/examples/*.so
python/examples/*.dll

python/**/*.pyc

# Kotlin Examples (generated files)
kotlin/package/src/main/resources/libflowsdk_ffi.dylib
kotlin/.gradle/
kotlin/build/
kotlin/**/build/

# TLS keylog files (for debugging)
sslkeylog.txt
**/sslkeylog.txt
7 changes: 7 additions & 0 deletions examples/c_ffi_example/quic_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ typedef struct {
const char *client_key_file;
const char *alpn;
uint8_t insecure_skip_verify;
uint8_t enable_key_log;
} MqttTlsOptionsC;

typedef struct {
Expand Down Expand Up @@ -88,6 +89,7 @@ int main(int argc, char **argv) {
*/
const char *broker_host = "broker.emqx.io";
const char *broker_port = "14567";
const char *sslkeylogfile = getenv("SSLKEYLOGFILE");

if (argc > 1)
broker_host = argv[1];
Expand Down Expand Up @@ -143,6 +145,11 @@ int main(int argc, char **argv) {

MqttTlsOptionsC q_opts = {0};
q_opts.insecure_skip_verify = 1;
q_opts.enable_key_log = sslkeylogfile != NULL;

if (q_opts.enable_key_log) {
printf("TLS key logging enabled -> %s\n", sslkeylogfile);
}

if (mqtt_quic_engine_connect(engine, server_addr_str, broker_host, &q_opts) !=
0) {
Expand Down
7 changes: 7 additions & 0 deletions examples/c_ffi_example/tls_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct {
const char *client_key_file;
const char *alpn;
uint8_t insecure_skip_verify;
uint8_t enable_key_log;
} MqttTlsOptionsC;

TlsMqttEngineFFI *mqtt_tls_engine_new(const char *client_id,
Expand Down Expand Up @@ -77,6 +78,7 @@ int main(int argc, char **argv) {
*/
const char *broker_host = "broker.emqx.io";
const char *broker_port = "8883";
const char *sslkeylogfile = getenv("SSLKEYLOGFILE");

if (argc > 1)
broker_host = argv[1];
Expand Down Expand Up @@ -114,6 +116,11 @@ int main(int argc, char **argv) {
// Initialize TLS Engine
MqttTlsOptionsC q_opts = {0};
q_opts.insecure_skip_verify = 1;
q_opts.enable_key_log = sslkeylogfile != NULL;

if (q_opts.enable_key_log) {
printf("TLS key logging enabled -> %s\n", sslkeylogfile);
}

char client_id[32];
snprintf(client_id, sizeof(client_id), "c_ffi_tls_%u",
Expand Down
13 changes: 13 additions & 0 deletions flowsdk_ffi/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ impl TlsMqttEngineFFI {
config.alpn_protocols = vec![b"mqtt".to_vec()];
}

if tls_opts.enable_key_log {
config.key_log = Arc::new(rustls::KeyLogFile::new());
}

let engine = TlsMqttEngine::new(options, &server_name, Arc::new(config)).unwrap();
TlsMqttEngineFFI {
engine: Mutex::new(engine),
Expand Down Expand Up @@ -506,6 +510,10 @@ impl QuicMqttEngineFFI {
config.alpn_protocols = vec![b"mqtt".to_vec()];
}

if tls_opts.enable_key_log {
config.key_log = Arc::new(rustls::KeyLogFile::new());
}

self.engine
.lock()
.unwrap()
Expand Down Expand Up @@ -907,6 +915,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_new(
client_key_file: None,
insecure_skip_verify: false,
alpn_protocols: vec!["mqtt".to_string()],
enable_key_log: false,
}
} else {
let r = &*tls_opts;
Expand Down Expand Up @@ -948,6 +957,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_new(
client_key_file,
insecure_skip_verify: r.insecure_skip_verify != 0,
alpn_protocols,
enable_key_log: r.enable_key_log != 0,
}
};

Expand Down Expand Up @@ -1203,6 +1213,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_connect(
client_key_file: None,
insecure_skip_verify: false,
alpn_protocols: vec!["mqtt".to_string()],
enable_key_log: false,
}
} else {
let r = &*tls_opts;
Expand Down Expand Up @@ -1239,6 +1250,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_connect(
client_key_file,
insecure_skip_verify: r.insecure_skip_verify != 0,
alpn_protocols: vec!["mqtt".to_string()],
enable_key_log: r.enable_key_log != 0,
}
};

Expand Down Expand Up @@ -1455,6 +1467,7 @@ pub struct MqttTlsOptionsC {
pub client_key_file: *const c_char,
pub alpn: *const c_char,
pub insecure_skip_verify: u8,
pub enable_key_log: u8,
}

#[repr(C)]
Expand Down
1 change: 1 addition & 0 deletions flowsdk_ffi/src/engine/ffi_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct MqttTlsOptionsFFI {
pub client_key_file: Option<String>,
pub insecure_skip_verify: bool,
pub alpn_protocols: Vec<String>,
pub enable_key_log: bool,
}

#[derive(uniffi::Record)]
Expand Down
9 changes: 9 additions & 0 deletions kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
plugins {
kotlin("jvm") version "1.9.22" apply false
}

allprojects {
repositories {
mavenCentral()
}
}
106 changes: 106 additions & 0 deletions kotlin/examples/quic_client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# FlowSDK QUIC MQTT Client Example

A Kotlin example demonstrating QUIC-based MQTT connectivity using FlowSDK's FFI bindings.

## Prerequisites

- **JDK 17** or later (Temurin recommended)
- **Rust toolchain** (stable)
- **QUIC-enabled MQTT broker** (e.g., `broker.emqx.io:14567`)

## Build & Run

### 1. Build the FFI Bindings

From the repository root:

```bash
./scripts/build_kotlin_bindings.sh
```

This compiles the Rust FFI library with QUIC support and generates Kotlin bindings.

### 2. Run the Example

```bash
cd kotlin
./gradlew :examples:quic_client:run
```

### 3. (Optional) Enable TLS Key Logging for Wireshark

To capture and decrypt QUIC traffic with Wireshark, enable TLS key logging:

```bash
cd kotlin
SSLKEYLOGFILE=$PWD/sslkeylog.txt ./gradlew :examples:quic_client:run
```

The TLS session keys will be written to `sslkeylog.txt` in the kotlin directory. In Wireshark:
1. Go to **Edit** → **Preferences** → **Protocols** → **TLS**
2. Set **(Pre)-Master-Secret log filename** to the absolute path of `sslkeylog.txt`
3. Start capturing on your network interface
4. Filter for `udp.port == 14567` to see the QUIC traffic
5. Wireshark will automatically decrypt the QUIC packets using the logged keys

**Note:** Make sure to use an absolute path for `SSLKEYLOGFILE` as Gradle may change the working directory.

Or from the example directory:

```bash
cd kotlin/examples/quic_client
../../gradlew run
```

## What It Does

1. **Connects** to `broker.emqx.io:14567` via QUIC (MQTT over UDP)
2. **Subscribes** to topic `test/kotlin/quic` (QoS 1)
3. **Publishes** message `"Hello from Kotlin QUIC!"` to the same topic
4. **Receives** the echoed message back from the broker
5. **Runs** for 10 seconds demonstrating the QUIC event loop
6. **Disconnects** gracefully sending QUIC close frames

## Implementation Highlights

- **Java NIO**: Uses `DatagramChannel` + `Selector` for non-blocking UDP I/O
- **Event-driven**: Polls `engine.handleTick()` every 10ms to drive QUIC timers and process MQTT events
- **Relative Timing**: Uses milliseconds elapsed since engine creation (not absolute UNIX time) for proper timeout detection
- **TLS Key Logging**: Supports SSLKEYLOGFILE environment variable for Wireshark debugging
- **Datagram flow**:
- Outgoing: `engine.takeOutgoingDatagrams()` → UDP send
- Incoming: UDP receive → `engine.handleDatagram()`

## Troubleshooting

**Build fails with "QuicMqttEngineFfi not found"**
→ Run `./scripts/build_kotlin_bindings.sh` from repository root first

**Connection timeout**
→ Ensure the broker supports QUIC and is reachable on UDP port 14567

**Connection instability / Messages not received**
→ Public QUIC brokers may experience connection stability issues. The example successfully connects, subscribes, and publishes, but message delivery may be affected by connection drops. For production use, consider:
- Using a dedicated QUIC broker with stable configuration
- Implementing connection state monitoring and reconnection logic
- Testing with a local QUIC-enabled MQTT broker

**Native library not found**
→ Check `kotlin/package/src/main/resources/` contains `libflowsdk_ffi.dylib` (macOS) or `.so` (Linux)
→ If missing, run `./scripts/build_kotlin_bindings.sh` from repository root to build and copy the library

## Configuration

Edit [Main.kt](src/main/kotlin/Main.kt) constants to customize:

```kotlin
private const val BROKER_HOST = "broker.emqx.io"
private const val BROKER_PORT = 14567
private const val RUN_DURATION_MS = 10_000L // Run for 10 seconds
```

## See Also

- [Simple TCP Client Example](../simple_client/) — TCP-based MQTT with coroutines
- [C FFI Example](../../../examples/c_ffi_example/) — C client using the same FFI
- [FlowSDK Documentation](../../../README.md)
26 changes: 26 additions & 0 deletions kotlin/examples/quic_client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
kotlin("jvm")
id("application")
}

group = "io.emqx.examples"
version = "0.1.0"

repositories {
mavenCentral()
}

dependencies {
implementation(project(":package"))
implementation(kotlin("stdlib"))
}

application {
mainClass.set("example.quic.MainKt")
}

// Ensure native library is available
tasks.run.configure {
// The 'package' project copies native library to src/main/resources
// JNA usually finds it if it's in resource root
}
Loading
Loading