Skip to content

Add write_parquet operation to offload parquet writing to worker#327

Merged
Edwardvaneechoud merged 1 commit intofeauture/kernel-implementationfrom
claude/fix-add-python-script-ngqCx
Feb 9, 2026
Merged

Add write_parquet operation to offload parquet writing to worker#327
Edwardvaneechoud merged 1 commit intofeauture/kernel-implementationfrom
claude/fix-add-python-script-ngqCx

Conversation

@Edwardvaneechoud
Copy link
Owner

Summary

This PR adds support for a new write_parquet operation type that offloads the collection and writing of LazyFrames to parquet files from the core process to the worker process. This keeps the core process lightweight and prevents race conditions when reading parquet files immediately after writing.

Key Changes

  • Added write_parquet operation type to the OperationType literal in models, enabling a new operation mode for subprocess operations
  • Implemented write_parquet function in the worker that deserializes a LazyFrame, collects it, and writes it to a parquet file with proper disk flushing
  • Extended trigger_df_operation and ExternalDfFetcher to support passing additional kwargs via HTTP headers (X-Kwargs), allowing callers to specify operation-specific parameters like output paths
  • Updated kernel execution flow in flow_graph.py to use the new write_parquet operation instead of collecting and writing parquet files in the core process, reducing memory pressure and preventing file race conditions
  • Added kwargs parameter throughout the operation chain (models, routes, funcs) to support passing operation-specific configuration

Implementation Details

  • The write_parquet operation accepts an output_path parameter via kwargs to specify where the parquet file should be written
  • Proper error handling and logging is included in the worker function
  • The worker ensures files are flushed to disk using os.fsync() to prevent race conditions when the kernel process immediately reads the file
  • The change maintains backward compatibility by making kwargs optional with a default of None

The add_python_script method was calling collect().write_parquet() directly
on the core process, which is undesirable for performance. This change
offloads the collect and parquet writing to the worker process using the
existing ExternalDfFetcher infrastructure.

Changes:
- Add write_parquet operation to worker funcs.py that deserializes a
  LazyFrame, collects it, and writes to a specified parquet path with fsync
- Add write_parquet to OperationType in both worker and core models
- Add kwargs support to ExternalDfFetcher and trigger_df_operation so
  custom parameters (like output_path) can be passed through both
  WebSocket streaming and REST fallback paths
- Update REST /submit_query/ endpoint to read kwargs from X-Kwargs header
- Replace direct collect().write_parquet() in add_python_script with
  ExternalDfFetcher using the new write_parquet operation type

https://claude.ai/code/session_01RNWTER2V7VJAgPeYEusNoC
@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 20.00000% with 24 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
flowfile_worker/flowfile_worker/funcs.py 5.00% 19 Missing ⚠️
flowfile_core/flowfile_core/flowfile/flow_graph.py 0.00% 3 Missing ⚠️
...ine/subprocess_operations/subprocess_operations.py 33.33% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

@Edwardvaneechoud Edwardvaneechoud merged commit 7802641 into feauture/kernel-implementation Feb 9, 2026
13 checks passed
@Edwardvaneechoud Edwardvaneechoud deleted the claude/fix-add-python-script-ngqCx branch February 9, 2026 05:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants