Skip to content

[Hackathon]: feat(workflow-operator): add compiled C++ and Rust UDFs?#5078

Closed
carloea2 wants to merge 3 commits into
apache:mainfrom
carloea2:c_plus_plus_udf_operator
Closed

[Hackathon]: feat(workflow-operator): add compiled C++ and Rust UDFs?#5078
carloea2 wants to merge 3 commits into
apache:mainfrom
carloea2:c_plus_plus_udf_operator

Conversation

@carloea2
Copy link
Copy Markdown
Contributor

@carloea2 carloea2 commented May 15, 2026

image

Implemented prototype Compiled C++ and Rust UDF Operators for Texera.

The operators let users write native C++ or Rust directly inside the workflow UI, configure input/output columns, compile the code, and run it as workflow operators. The MVP supports typed tuple/batch/table-style APIs, compiler errors, retained input columns, configurable compiler flags, timeout, and batching.

To demonstrate why this is useful, I benchmarked the same deterministic CPU-heavy matrix multiplication workload across Rust, C++, Java UDF, and Python UDF. Each runtime receives its own independent source with the same seed/workload.

Benchmark Results

Runtime Avg ms / row Min ms Max ms Runs Speedup vs Python
Rust 0.097 0.088 0.211 80 154.2x
C++ 0.133 0.103 0.203 80 113.3x
Java 0.656 0.272 3.445 80 22.9x
Python 15.032 10.055 21.082 80 1.0x

This shows the motivation clearly: Python is great for usability, but compiled native UDFs can provide major speedups for CPU-heavy workloads while staying inside Texera’s visual workflow model.

C++ UDF

#include <chrono>

class MatrixMultiplyOperator : public texera::UDFOperator {
public:
    texera::TupleOutput process_tuple(const texera::Tuple& tuple, int port) override {
        int trial = tuple.get("trial").as_int();
        int n = tuple.get("matrix_size").as_int();
        long long seed = tuple.get("seed").as_long();

        auto start = std::chrono::high_resolution_clock::now();

        double checksum = 0.0;
        for (int i = 0; i < n; i++) {
            for (int j = 0; j < n; j++) {
                double cell = 0.0;
                for (int k = 0; k < n; k++) {
                    double a = ((seed + trial * 97LL + i * 31LL + k * 17LL) % 1000LL) / 1000.0;
                    double b = ((seed + trial * 53LL + k * 13LL + j * 29LL) % 1000LL) / 1000.0;
                    cell += a * b;
                }
                checksum += cell * ((i + 1) * 0.001 + (j + 1) * 0.0001);
            }
        }

        auto end = std::chrono::high_resolution_clock::now();
        double elapsed_ms = std::chrono::duration<double, std::milli>(end - start).count();

        return { texera::TupleLike{
            texera::Value::string_value("cpp"),
            texera::Value::double_value(checksum),
            texera::Value::double_value(elapsed_ms)
        }};
    }
};

using TexeraUDFOperator = MatrixMultiplyOperator;

Rust UDF

use std::time::Instant;

#[derive(Default)]
struct MatrixMultiplyOperator;

impl texera::UDFOperator for MatrixMultiplyOperator {
    fn process_tuple(
        &mut self,
        tuple: &texera::Tuple,
        _port: i32,
    ) -> Result<texera::TupleOutput, String> {
        let trial = tuple.get_by_name("trial")?.as_int()?;
        let n = tuple.get_by_name("matrix_size")?.as_int()?;
        let seed = tuple.get_by_name("seed")?.as_long()?;

        let start = Instant::now();

        let mut checksum = 0.0;
        for i in 0..n {
            for j in 0..n {
                let mut cell = 0.0;
                for k in 0..n {
                    let a = ((seed + trial as i64 * 97 + i as i64 * 31 + k as i64 * 17) % 1000) as f64 / 1000.0;
                    let b = ((seed + trial as i64 * 53 + k as i64 * 13 + j as i64 * 29) % 1000) as f64 / 1000.0;
                    cell += a * b;
                }
                checksum += cell * (((i + 1) as f64) * 0.001 + ((j + 1) as f64) * 0.0001);
            }
        }

        let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;

        Ok(vec![vec![
            texera::Value::string_value("rust"),
            texera::Value::double_value(checksum),
            texera::Value::double_value(elapsed_ms),
        ]])
    }
}

type TexeraUDFOperator = MatrixMultiplyOperator;

Java UDF

public TupleLike processTuple(Tuple tuple) {
    int trial = ((Number) tuple.getField("trial")).intValue();
    int n = ((Number) tuple.getField("matrix_size")).intValue();
    long seed = ((Number) tuple.getField("seed")).longValue();

    long start = System.nanoTime();

    double checksum = 0.0;
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            double cell = 0.0;
            for (int k = 0; k < n; k++) {
                double a = ((seed + trial * 97L + i * 31L + k * 17L) % 1000L) / 1000.0;
                double b = ((seed + trial * 53L + k * 13L + j * 29L) % 1000L) / 1000.0;
                cell += a * b;
            }
            checksum += cell * ((i + 1) * 0.001 + (j + 1) * 0.0001);
        }
    }

    double elapsedMs = (System.nanoTime() - start) / 1_000_000.0;

    Object[] inputFields = tuple.getFields();
    Object[] outputFields = Arrays.copyOf(inputFields, inputFields.length + 3);
    outputFields[inputFields.length] = "java";
    outputFields[inputFields.length + 1] = checksum;
    outputFields[inputFields.length + 2] = elapsedMs;

    return TupleLike$.MODULE$.apply(Arrays.asList(outputFields));
}

Python UDF

from pytexera import *
import time

class ProcessTupleOperator(UDFOperatorV2):

    @overrides
    def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]:
        trial = int(tuple_["trial"])
        n = int(tuple_["matrix_size"])
        seed = int(tuple_["seed"])

        start = time.perf_counter()

        checksum = 0.0
        for i in range(n):
            for j in range(n):
                cell = 0.0
                for k in range(n):
                    a = ((seed + trial * 97 + i * 31 + k * 17) % 1000) / 1000.0
                    b = ((seed + trial * 53 + k * 13 + j * 29) % 1000) / 1000.0
                    cell += a * b
                checksum += cell * ((i + 1) * 0.001 + (j + 1) * 0.0001)

        elapsed_ms = (time.perf_counter() - start) * 1000.0

        output = tuple_.as_dict()
        output["runtime"] = "python"
        output["checksum"] = checksum
        output["elapsed_ms"] = elapsed_ms
        yield output
Texera.-.Google.Chrome.2026-05-14.23-49-35.github-under-10mb-final-no-audio.mp4

@github-actions github-actions Bot added dependencies Pull requests that update a dependency file frontend Changes related to the frontend GUI common labels May 15, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 15, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 42.80%. Comparing base (6060877) to head (f5fa7e8).

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5078      +/-   ##
============================================
- Coverage     43.07%   42.80%   -0.27%     
+ Complexity     2211     2204       -7     
============================================
  Files          1045     1045              
  Lines         40220    40085     -135     
  Branches       4244     4235       -9     
============================================
- Hits          17323    17159     -164     
- Misses        21827    21864      +37     
+ Partials       1070     1062       -8     
Flag Coverage Δ *Carryforward flag
access-control-service 39.53% <ø> (ø)
agent-service 33.72% <ø> (ø) Carriedforward from ac55403
amber 43.67% <ø> (-0.14%) ⬇️ Carriedforward from ac55403
computing-unit-managing-service 0.00% <ø> (ø)
config-service 0.00% <ø> (ø)
file-service 32.18% <ø> (ø)
frontend 33.93% <ø> (+<0.01%) ⬆️ Carriedforward from ac55403
python 89.10% <ø> (-1.32%) ⬇️ Carriedforward from ac55403
workflow-compiling-service 47.72% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@carloea2 carloea2 changed the title [Hackathon]: feat(workflow-operator): C++ UDF? [Hackathon]: feat(workflow-operator): add compiled C++ and Rust UDFs? May 15, 2026
@carloea2 carloea2 closed this May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common dependencies Pull requests that update a dependency file frontend Changes related to the frontend GUI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants