Skip to content

model training with air#17303

Merged
HxpSerein merged 3 commits intoapache:research/airreplicationfrom
RkGrit:air
Mar 17, 2026
Merged

model training with air#17303
HxpSerein merged 3 commits intoapache:research/airreplicationfrom
RkGrit:air

Conversation

@RkGrit
Copy link
Contributor

@RkGrit RkGrit commented Mar 17, 2026

model training with ai

Copilot AI review requested due to automatic review settings March 17, 2026 03:08
@HxpSerein HxpSerein merged commit 35048a9 into apache:research/airreplication Mar 17, 2026
2 checks passed
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces two large feature areas: (1) a new “AirReplication” consensus/replication implementation on the DataNode side (RPC service, metrics, replication manager/interfaces), and (2) a new standalone Python fine-tuning framework (model registry/loading, hyperparameter parsing, datasets, trainer, task executor, and example configs).

Changes:

  • Add DataNode Java implementation scaffolding for AirReplication RPC handling, peer management, and sync-lag metrics.
  • Add a standalone Python fine-tuning package with CLI-driven training, adapters (LoRA/DualWeaver), datasets (CSV/Parquet/TsFile/IoTDB-tree), and checkpointing/resume.
  • Add multiple YAML example configs for running fine-tuning across supported model types and adaptation methods.

Reviewed changes

Copilot reviewed 67 out of 74 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java New Thrift processor for AirReplication RPC calls.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceMBean.java MBean interface placeholder for RPC service.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceHandler.java Thrift server event handler wiring processor exit hook.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java New Thrift service wrapper for AirReplication RPC server thread.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java Sync-lag aggregation manager per consensus group.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java MetricSet binding for AirReplication server-level metrics.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/ReplicateProgressManager.java Interface for progress-index assignment and lag queries.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSink.java Interface for leader/follower progress reporting (for lag).
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationSelector.java Interface for enumerating AirReplication tasks and statuses.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationReceiver.java Interface for receiving replication transfer RPC payloads.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationName.java Value object encoding AirReplication identity (group/sender/receiver).
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationManager.java Dispatcher/selector wrapper to create/update/drop replication “airs”.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationGuardian.java Interface for background guard job scheduling.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/airreplication/AirReplicationDispatcher.java Interface for create/start/stop/drop replication “air” tasks.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java Adds a consensus implementation file (currently class-name mismatched).
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplicationPeerManager.java Peer set manager for replication group membership.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java Adds a consensus implementation file for AirReplication.
iotdb-core/ainode/standalone_finetune/model/utils.py Model path resolution and class/config loading helpers.
iotdb-core/ainode/standalone_finetune/model/model_storage.py In-memory model registry and fine-tune lifecycle helpers.
iotdb-core/ainode/standalone_finetune/model/model_loader.py Load/save model weights, checkpoints, and training state.
iotdb-core/ainode/standalone_finetune/model/model_info.py ModelInfo class and builtin model map for standalone mode.
iotdb-core/ainode/standalone_finetune/model/model_constants.py Shared constants/enums for model storage and states.
iotdb-core/ainode/standalone_finetune/model/init.py Standalone model package exports.
iotdb-core/ainode/standalone_finetune/manager/model_manager.py Thin wrapper around ModelStorage for CLI usage.
iotdb-core/ainode/standalone_finetune/manager/init.py Manager package export.
iotdb-core/ainode/standalone_finetune/hparams/training_args.py Training hyperparameter dataclass and validation.
iotdb-core/ainode/standalone_finetune/hparams/parser.py YAML/JSON/dict parsing into args objects with precedence rules.
iotdb-core/ainode/standalone_finetune/hparams/model_args.py Model hyperparameter dataclass (model type, token lengths, dtype, etc.).
iotdb-core/ainode/standalone_finetune/hparams/finetune_args.py Fine-tune method hyperparameter dataclasses (LoRA/Weaver/etc.).
iotdb-core/ainode/standalone_finetune/hparams/data_args.py Data hyperparameter dataclass (dataset types, scaling, splits, etc.).
iotdb-core/ainode/standalone_finetune/hparams/init.py Marks hparams as a package.
iotdb-core/ainode/standalone_finetune/finetune/train/trainer.py Core training loop with AMP, grad-accum, checkpointing, DDP barriers.
iotdb-core/ainode/standalone_finetune/finetune/train/init.py Marks train as a package.
iotdb-core/ainode/standalone_finetune/finetune/task/task_info.py Task model for fine-tune runs + (de)serialization utilities.
iotdb-core/ainode/standalone_finetune/finetune/task/task_executor.py Single/Distributed execution orchestration (mp + DDP setup).
iotdb-core/ainode/standalone_finetune/finetune/task/task_constants.py Task enums/constants for status/priority/progress/error.
iotdb-core/ainode/standalone_finetune/finetune/task/init.py Marks task as a package.
iotdb-core/ainode/standalone_finetune/finetune/examples/timer_xl_weaver_cnn.yaml Example config: Timer-XL + Weaver CNN on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/timer_xl_lora.yaml Example config: Timer-XL + LoRA on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/timer_xl_full_tsfile.yaml Example config: Timer-XL full fine-tune on TsFile.
iotdb-core/ainode/standalone_finetune/finetune/examples/timer_xl_full.yaml Example config: Timer-XL full fine-tune on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/sundial_weaver_cnn.yaml Example config: Sundial + Weaver CNN on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/sundial_lora.yaml Example config: Sundial + LoRA on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/sundial_full_iotdb.yaml Example config: Sundial full fine-tune via IoTDB table source.
iotdb-core/ainode/standalone_finetune/finetune/examples/sundial_full.yaml Example config: Sundial full fine-tune on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/moirai2_lora.yaml Example config: Moirai2 + LoRA on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/moirai2_full.yaml Example config: Moirai2 full fine-tune on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/chronos2_weaver_cnn.yaml Example config: Chronos2 + Weaver CNN on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/chronos2_lora.yaml Example config: Chronos2 + LoRA on CSV.
iotdb-core/ainode/standalone_finetune/finetune/examples/chronos2_full.yaml Example config: Chronos2 full fine-tune on CSV.
iotdb-core/ainode/standalone_finetune/finetune/adapter/weaver/weaver_mlp.py Feature Weaver MLP implementation.
iotdb-core/ainode/standalone_finetune/finetune/adapter/weaver/weaver_cnn.py Feature Weaver CNN implementation.
iotdb-core/ainode/standalone_finetune/finetune/adapter/weaver/base_weaver.py Weaver base classes + config + factory.
iotdb-core/ainode/standalone_finetune/finetune/adapter/weaver/init.py Marks weaver as a package.
iotdb-core/ainode/standalone_finetune/finetune/adapter/adapter.py Adaptation application (full/linear/LoRA/DualWeaver) + freezing logic.
iotdb-core/ainode/standalone_finetune/finetune/adapter/init.py Marks adapter as a package.
iotdb-core/ainode/standalone_finetune/finetune/init.py Marks finetune as a package.
iotdb-core/ainode/standalone_finetune/device/backend.py Device backend helper (CPU/CUDA) + DDP init helpers.
iotdb-core/ainode/standalone_finetune/device/init.py Device package export.
iotdb-core/ainode/standalone_finetune/data_provider/processor/data_scaler.py Scalers (standard/minmax/none) + RevIN scaler utility.
iotdb-core/ainode/standalone_finetune/data_provider/processor/init.py Marks processor as a package.
iotdb-core/ainode/standalone_finetune/data_provider/datasets/tsfile_tree_dataset.py Tree-model TsFile dataset reader + sliding window dataset.
iotdb-core/ainode/standalone_finetune/data_provider/datasets/iotdb_tree_dataset.py IoTDB tree dataset via Session SQL query + sliding window dataset.
iotdb-core/ainode/standalone_finetune/data_provider/datasets/csv_dataset.py CSV and Parquet dataset implementations.
iotdb-core/ainode/standalone_finetune/data_provider/datasets/base_dataset.py Base dataset abstraction + scaling hooks.
iotdb-core/ainode/standalone_finetune/data_provider/datasets/init.py Marks datasets as a package.
iotdb-core/ainode/standalone_finetune/data_provider/data_factory.py Dataset/dataloader factory functions (incl. DistributedSampler).
iotdb-core/ainode/standalone_finetune/data_provider/init.py Marks data_provider as a package.
iotdb-core/ainode/standalone_finetune/config.py Standalone config (paths and builtin module prefix via env vars).
iotdb-core/ainode/standalone_finetune/cli.py Standalone CLI entrypoint for registering and running training tasks.
iotdb-core/ainode/standalone_finetune/main.py Module entrypoint for python -m ....
iotdb-core/ainode/standalone_finetune/init.py Package docstring and usage hint.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +87 to +90
public class AirReplication implements IConsensus {
private static final String REPLICATION_AIR_GUARDIAN_TASK_ID = "replication_air_guardian";
private static final String CLASS_NAME = AirReplication.class.getSimpleName();
private static final Logger LOGGER = LoggerFactory.getLogger(AirReplication.class);
Comment on lines +43 to +46
@Override
public ServiceType getID() {
return ServiceType.AIR_REPLICATION_SERVICE;
}
Comment on lines +64 to +95
config.getRpc().isEnableSSL()
? new ThriftServiceThread(
processor,
getID().getName(),
ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
config.getRpc().getSslKeyStorePath(),
config.getRpc().getSslKeyStorePassword(),
config.getRpc().getSslTrustStorePath(),
config.getRpc().getSslTrustStorePassword(),
ZeroCopyRpcTransportFactory.INSTANCE)
: new ThriftServiceThread(
processor,
getID().getName(),
ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
ZeroCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
thriftServiceThread.setName(ThreadName.AIR_REPLICATION_RPC_SERVICE.getName());
}
Comment on lines +128 to +145
private static class AirReplicationSyncLagManagerHolder {
private static Map<String, AirReplicationSyncLagManager> REPLICATION_GROUP_ID_2_INSTANCE_MAP;

private AirReplicationSyncLagManagerHolder() {
// empty constructor
}

private static void build() {
if (REPLICATION_GROUP_ID_2_INSTANCE_MAP == null) {
REPLICATION_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
}
}
}

public static AirReplicationSyncLagManager getInstance(String groupId) {
return AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
groupId, key -> new AirReplicationSyncLagManager());
}
Comment on lines +98 to +109
public void bindAutoGauge(AbstractMetricService metricService) {
metricService.createAutoGauge(
Metric.AIR_REPLICATION.toString(),
MetricLevel.IMPORTANT,
syncLagManager,
PipeConsensusSyncLagManager::calculateSyncLag,
Tag.NAME.toString(),
IMPL,
Tag.REGION.toString(),
impl.getConsensusGroupId(),
Tag.TYPE.toString(),
"syncLag");
Comment on lines +76 to +86
public void bindGauge(AbstractMetricService metricService) {
metricService
.getOrCreateGauge(
Metric.AIR_REPLICATION_MODE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
IMPL,
Tag.TYPE.toString(),
"replicateMode")
.set(impl.getReplicateMode());
}
Comment on lines +88 to +96
public void unbindGauge(AbstractMetricService metricService) {
metricService.remove(
MetricType.GAUGE,
Metric.PIPE_CONSENSUS_MODE.toString(),
Tag.NAME.toString(),
IMPL,
Tag.TYPE.toString(),
"replicateMode");
}
Comment on lines +211 to +219
if base_model_info_data:
base_model_info = ModelInfo(
model_id=base_model_info_data.get("model_id", ""),
model_type=base_model_info_data.get("model_type", ""),
category=ModelCategory(
base_model_info_data.get("category", "FINETUNED")
),
state=ModelStates(base_model_info_data.get("state", "INACTIVE")),
pipeline_cls=base_model_info_data.get("pipeline_cls", ""),
Comment on lines +102 to +112
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client11.*;")
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client12.*;")
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client13.*;")
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client14.*;")
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client15.*;")
self._session.execute_query_statement("SELECT field0 FROM root.consensus.client116.*;")
# self._session.execute_query_statement("SELECT field0 FROM root.consensus.client117.*;")
# self._session.execute_query_statement("SELECT field0 FROM root.consensus.client118.*;")
# self._session.execute_query_statement("SELECT field0 FROM root.consensus.client119.*;")
# self._session.execute_query_statement("SELECT field0 FROM root.consensus.client110.*;")

Comment on lines +76 to +77
if isinstance(df_raw[df_raw.columns[0]][2], str):
data = df_raw[df_raw.columns[1:]].values
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants