From f5095133ef9aca09cfe25a72edbbdae10ea8e330 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 26 Aug 2024 12:56:00 +0000 Subject: [PATCH 1/6] WIP --- ydb/core/protos/flat_scheme_op.proto | 2 +- ydb/core/protos/tx_datashard.proto | 2 + ...te_restore_incremental_backup_src_unit.cpp | 251 ++++++++++ ydb/core/tx/datashard/datashard_impl.h | 1 + ydb/core/tx/datashard/execution_unit.cpp | 2 + ydb/core/tx/datashard/execution_unit_ctors.h | 1 + ydb/core/tx/datashard/execution_unit_kind.h | 1 + ydb/core/tx/datashard/ya.make | 1 + ...tion_create_restore_incremental_backup.cpp | 440 +++++++++++++++++- 9 files changed, 693 insertions(+), 8 deletions(-) create mode 100644 ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 6a0d8bff5b10..d5d926aac08f 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -422,7 +422,7 @@ message TTableDescription { // This flag is create-only, and has to be set up // on table creation to allow system column names (started with __ydb_) // It won't be present on describes and won't be preserved - optional bool SystemColumnNamesAllowed = 42; + optional bool SystemColumnNamesAllowed = 43; } message TDictionaryEncodingSettings { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 5822b02c1fb0..b529b22b4e5c 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -480,6 +480,8 @@ message TFlatSchemeTransaction { optional TAlterCdcStreamNotice AlterCdcStreamNotice = 19; optional TDropCdcStreamNotice DropCdcStreamNotice = 20; optional TMoveIndex MoveIndex = 21; + + optional NKikimrSchemeOp.TRestoreIncrementalBackup RestoreIncrementalBackupSrc = 22; } message TDistributedEraseTransaction { diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp new file mode 100644 index 000000000000..d8018f176d81 --- /dev/null +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -0,0 +1,251 @@ +#include "defs.h" +#include "execution_unit_ctors.h" +#include "datashard_active_transaction.h" +#include "datashard_impl.h" + +namespace NKikimr { +namespace NDataShard { + +using namespace NKikimrTxDataShard; + +class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { +protected: + bool IsRelevant(TActiveTransaction* tx) const { + return tx->GetSchemeTx().HasRestoreIncrementalBackupSrc(); + } + + bool IsWaiting(TOperation::TPtr op) const { + return op->IsWaitingForScan() || op->IsWaitingForRestart(); + } + + void SetWaiting(TOperation::TPtr op) { + op->SetWaitingForScanFlag(); + } + + void ResetWaiting(TOperation::TPtr op) { + op->ResetWaitingForScanFlag(); + op->ResetWaitingForRestartFlag(); + } + + bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) { + Y_UNUSED(op, txc, ctx); + // TActiveTransaction* tx = dynamic_cast(op.Get()); + // Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + // Y_ABORT_UNLESS(tx->GetSchemeTx().HasBackup()); + // const auto& backup = tx->GetSchemeTx().GetBackup(); + + // const ui64 tableId = backup.GetTableId(); + // Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); + + // const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + // Y_ABORT_UNLESS(txc.DB.GetScheme().GetTableInfo(localTableId)); + + // auto* appData = AppData(ctx); + // const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; + // std::shared_ptr<::NKikimr::NDataShard::IExport> exp; + + // if (backup.HasYTSettings()) { + // if (backup.HasCompression()) { + // Abort(op, ctx, "Exports to YT do not support compression"); + // return false; + // } + + // if (auto* exportFactory = appData->DataShardExportFactory) { + // std::shared_ptr(exportFactory->CreateExportToYt(backup, columns)).swap(exp); + // } else { + // Abort(op, ctx, "Exports to YT are disabled"); + // return false; + // } + // } else if (backup.HasS3Settings()) { + // NBackupRestoreTraits::ECompressionCodec codec; + // if (!TryCodecFromTask(backup, codec)) { + // Abort(op, ctx, TStringBuilder() << "Unsupported compression codec" + // << ": " << backup.GetCompression().GetCodec()); + // return false; + // } + + // if (auto* exportFactory = appData->DataShardExportFactory) { + // std::shared_ptr(exportFactory->CreateExportToS3(backup, columns)).swap(exp); + // } else { + // Abort(op, ctx, "Exports to S3 are disabled"); + // return false; + // } + // } else { + // Abort(op, ctx, "Unsupported backup task"); + // return false; + // } + + // auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { + // return exp->CreateUploader(self, txId); + // }; + + // THolder buffer{exp->CreateBuffer()}; + // THolder scan{CreateExportScan(std::move(buffer), createUploader)}; + + // const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); + // const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority(); + + // ui64 readAheadLo = appData->DataShardConfig.GetBackupReadAheadLo(); + // if (ui64 readAheadLoOverride = DataShard.GetBackupReadAheadLoOverride(); readAheadLoOverride > 0) { + // readAheadLo = readAheadLoOverride; + // } + + // ui64 readAheadHi = appData->DataShardConfig.GetBackupReadAheadHi(); + // if (ui64 readAheadHiOverride = DataShard.GetBackupReadAheadHiOverride(); readAheadHiOverride > 0) { + // readAheadHi = readAheadHiOverride; + // } + + // tx->SetScanTask(DataShard.QueueScan(localTableId, scan.Release(), op->GetTxId(), + // TScanOptions() + // .SetResourceBroker(taskName, taskPrio) + // .SetReadAhead(readAheadLo, readAheadHi) + // .SetReadPrio(TScanOptions::EReadPrio::Low) + // )); + + return true; + } + + bool HasResult(TOperation::TPtr op) const { + return op->HasScanResult(); + } + + bool ProcessResult(TOperation::TPtr op, const TActorContext&) { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + // auto* result = CheckedCast(op->ScanResult().Get()); + // bool done = true; + + // switch (result->Outcome) { + // case EExportOutcome::Success: + // case EExportOutcome::Error: + // if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) { + // schemeOp->Success = result->Outcome == EExportOutcome::Success; + // schemeOp->Error = std::move(result->Error); + // schemeOp->BytesProcessed = result->BytesRead; + // schemeOp->RowsProcessed = result->RowsRead; + // } else { + // Y_FAIL_S("Cannot find schema tx: " << op->GetTxId()); + // } + // break; + // case EExportOutcome::Aborted: + // done = false; + // break; + // } + + // op->SetScanResult(nullptr); + // tx->SetScanTask(0); + + // return done; + return true; + } + + void Cancel(TActiveTransaction* tx, const TActorContext&) { + if (!tx->GetScanTask()) { + return; + } + + const ui64 tableId = tx->GetSchemeTx().GetBackup().GetTableId(); + + Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); + const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + + DataShard.CancelScan(localTableId, tx->GetScanTask()); + tx->SetScanTask(0); + } + + void PersistResult(TOperation::TPtr op, TTransactionContext& txc) { + auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId()); + Y_ABORT_UNLESS(schemeOp); + + NIceDb::TNiceDb db(txc.DB); + DataShard.PersistSchemeTxResult(db, *schemeOp); + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override final { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + if (!IsRelevant(tx)) { + return EExecutionStatus::Executed; + } + + if (!IsWaiting(op)) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Starting a " << GetKind() << " operation" + << " at " << DataShard.TabletID()); + + if (!Run(op, txc, ctx)) { + return EExecutionStatus::Executed; + } + + SetWaiting(op); + Y_DEBUG_ABORT_UNLESS(!HasResult(op)); + } + + if (HasResult(op)) { + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "" << GetKind() << " complete" + << " at " << DataShard.TabletID()); + + ResetWaiting(op); + if (ProcessResult(op, ctx)) { + PersistResult(op, txc); + } else { + Y_DEBUG_ABORT_UNLESS(!HasResult(op)); + op->SetWaitingForRestartFlag(); + ctx.Schedule(TDuration::Seconds(1), new TDataShard::TEvPrivate::TEvRestartOperation(op->GetTxId())); + } + } + + while (op->HasPendingInputEvents()) { + ProcessEvent(op->InputEvents().front(), op, ctx); + op->InputEvents().pop(); + } + + if (IsWaiting(op)) { + return EExecutionStatus::Continue; + } + + return EExecutionStatus::Executed; + } + + bool IsReadyToExecute(TOperation::TPtr op) const override final { + if (!IsWaiting(op)) { + return true; + } + + if (HasResult(op)) { + return true; + } + + if (op->HasPendingInputEvents()) { + return true; + } + + return false; + } + + void Complete(TOperation::TPtr, const TActorContext&) override final { + } + + void ProcessEvent(TAutoPtr& ev, TOperation::TPtr op, const TActorContext& ctx) { + switch (ev->GetTypeRewrite()) { + // OHFunc(TEvCancel, Handle); + } + Y_UNUSED(op,ctx); + } + +public: + TRestoreIncrementalBackupSrcUnit(TDataShard& self, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::RestoreIncrementalBackupSrc, false, self, pipeline) + { + } + +}; // TRestoreIncrementalBackupSrcUnit + +THolder CreateRestoreIncrementalBackupSrcUnit(TDataShard& self, TPipeline& pipeline) { + return THolder(new TRestoreIncrementalBackupSrcUnit(self, pipeline)); +} + +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 18c07fc3a342..b291ef0ca3ce 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -315,6 +315,7 @@ class TDataShard friend class TS3DownloadsManager; friend class TS3Downloader; template friend class TBackupRestoreUnitBase; + friend class TRestoreIncrementalBackupSrcUnit; friend struct TSetupSysLocks; friend class TDataShardLocksDb; diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index bbe48c388aaa..90059482104c 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -150,6 +150,8 @@ THolder CreateExecutionUnit(EExecutionUnitKind kind, return CreateReadUnit(dataShard, pipeline); case EExecutionUnitKind::ExecuteWrite: return CreateExecuteWriteUnit(dataShard, pipeline); + case EExecutionUnitKind::RestoreIncrementalBackupSrc: + return CreateRestoreIncrementalBackupSrcUnit(dataShard, pipeline); default: Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")"); } diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index 5034770ab221..8699d57db729 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -75,6 +75,7 @@ THolder CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipelin THolder CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder CreateRestoreIncrementalBackupSrcUnit(TDataShard &dataShard, TPipeline &pipeline); } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index 84d92380d3e0..40ed2f832b43 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -75,6 +75,7 @@ enum class EExecutionUnitKind: ui32 { AlterCdcStream, DropCdcStream, MoveIndex, + RestoreIncrementalBackupSrc, Count, Unspecified }; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index afe17818cf8a..3caa07837bf9 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -42,6 +42,7 @@ SRCS( create_persistent_snapshot_unit.cpp create_table_unit.cpp create_volatile_snapshot_unit.cpp + create_restore_incremental_backup_src_unit.cpp datashard__cancel_tx_proposal.cpp datashard__column_stats.cpp datashard__compact_borrowed.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index d80e40b6c954..4918b5fd2977 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -112,6 +112,422 @@ void DoCreateAlterTable( result.push_back(CreateAlterTable(NextPartId(opId, result), outTx)); } +/////// +/////// + +namespace NIncrBackup { + +class TConfigurePartsAtTable: public TSubOperationState { + TString DebugHint() const override { + return TStringBuilder() + << "NCdcStreamState::TConfigurePartsAtTable" + << " operationId: " << OperationId; + } + + static bool IsExpectedTxType(TTxState::ETxType txType) { + switch (txType) { + case TTxState::TxCreateCdcStreamAtTable: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: + case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: + case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: + return true; + default: + return false; + } + } + +protected: + // FIXME + void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const { + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); + auto table = context.SS->Tables.at(pathId); + + auto& notice = *tx.MutableRestoreIncrementalBackupSrc(); + + Y_UNUSED(notice); + // TODO: copy op to notice + } + +public: + explicit TConfigurePartsAtTable(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {}); + } + + bool ProgressState(TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << context.SS->TabletID()); + + auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + const auto& pathId = txState->TargetPathId; + + if (NTableState::CheckPartitioningChangedForTableModification(*txState, context)) { + NTableState::UpdatePartitioningForTableModification(OperationId, *txState, context); + } + + NKikimrTxDataShard::TFlatSchemeTransaction tx; + context.SS->FillSeqNo(tx, context.SS->StartRound(*txState)); + FillNotice(pathId, tx, context); + + txState->ClearShardsInProgress(); + Y_ABORT_UNLESS(txState->Shards.size()); + + for (ui32 i = 0; i < txState->Shards.size(); ++i) { + const auto& idx = txState->Shards[i].Idx; + const auto datashardId = context.SS->ShardInfos[idx].TabletID; + auto ev = context.SS->MakeDataShardProposal(pathId, OperationId, tx.SerializeAsString(), context.Ctx); + context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, ev.Release()); + } + + txState->UpdateShardsInProgress(TTxState::ConfigureParts); + return false; + } + + bool HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply " << ev->Get()->ToString() + << ", at schemeshard: " << context.SS->TabletID()); + + if (!NTableState::CollectProposeTransactionResults(OperationId, ev, context)) { + return false; + } + + return true; + } + +private: + const TOperationId OperationId; + +}; // TConfigurePartsAtTable + +class TProposeAtTable: public TSubOperationState { + TString DebugHint() const override { + return TStringBuilder() + << "NCdcStreamState::TProposeAtTable" + << " operationId: " << OperationId; + } + + static bool IsExpectedTxType(TTxState::ETxType txType) { + switch (txType) { + case TTxState::TxCreateCdcStreamAtTable: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: + case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: + case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: + return true; + default: + return false; + } + } + +public: + explicit TProposeAtTable(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {TEvDataShard::TEvProposeTransactionResult::EventType}); + } + + bool ProgressState(TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << context.SS->TabletID()); + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + + TSet shardSet; + for (const auto& shard : txState->Shards) { + Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.Idx)); + shardSet.insert(context.SS->ShardInfos.at(shard.Idx).TabletID); + } + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, txState->MinStep, shardSet); + return false; + } + + bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " TEvDataShard::TEvSchemaChanged" + << " triggers early, save it" + << ", at schemeshard: " << context.SS->TabletID()); + + NTableState::CollectSchemaChanged(OperationId, ev, context); + return false; + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", step: " << ev->Get()->StepId + << ", at schemeshard: " << context.SS->TabletID()); + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + const auto& pathId = txState->TargetPathId; + + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); + auto table = context.SS->Tables.at(pathId); + + table->AlterVersion += 1; + + NIceDb::TNiceDb db(context.GetDB()); + context.SS->PersistTableAlterVersion(db, pathId, table); + + context.SS->ClearDescribePathCaches(path); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts); + + const auto step = TStepId(ev->Get()->StepId); + context.SS->SnapshotsStepIds[OperationId.GetTxId()] = step; + context.SS->PersistSnapshotStepId(db, OperationId.GetTxId(), step); + + context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Add(1); + return true; + } + +protected: + const TOperationId OperationId; + +}; // TProposeAtTable + +class TDoneWithInitialScan: public TDone { +public: + using TDone::TDone; + + bool ProgressState(TOperationContext& context) override { + if (!TDone::ProgressState(context)) { + return false; + } + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan); + const auto& pathId = txState->TargetPathId; + + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + TMaybe streamPathId; + for (const auto& [_, childPathId] : path->GetChildren()) { + Y_ABORT_UNLESS(context.SS->PathsById.contains(childPathId)); + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->CreateTxId != OperationId.GetTxId()) { + continue; + } + + Y_ABORT_UNLESS(childPath->IsCdcStream() && !childPath->Dropped()); + Y_ABORT_UNLESS(context.SS->CdcStreams.contains(childPathId)); + auto stream = context.SS->CdcStreams.at(childPathId); + + Y_ABORT_UNLESS(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan); + Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan" + << ": found# " << *streamPathId + << ", another# " << childPathId); + streamPathId = childPathId; + } + + if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { + return true; + } + + Y_ABORT_UNLESS(streamPathId); + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId)); + + return true; + } + +}; // TDoneWithInitialScan + +class TNewRestoreFromAtTable: public TSubOperation { + static TTxState::ETxState NextState() { + return TTxState::ConfigureParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::ProposedWaitParts; + case TTxState::ProposedWaitParts: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return MakeHolder(OperationId); + case TTxState::Propose: + return MakeHolder(OperationId); + case TTxState::ProposedWaitParts: + return MakeHolder(OperationId); + case TTxState::Done: + return MakeHolder(OperationId); + default: + return nullptr; + } + } + +public: + explicit TNewRestoreFromAtTable(TOperationId id, const TTxTransaction& tx) + : TSubOperation(id, tx) + { + } + + explicit TNewRestoreFromAtTable(TOperationId id, TTxState::ETxState state) + : TSubOperation(id, state) + { + } + + THolder Propose(const TString&, TOperationContext& context) override { + const auto& workingDir = Transaction.GetWorkingDir(); + const auto& op = Transaction.GetRestoreIncrementalBackup(); + const auto& tableName = op.GetSrcTableName(); + + // LOG_N("TNewRestoreFromAtTable Propose" + // << ": opId# " << OperationId + // << ", stream# " << workingDir << "/" << tableName << "/" << streamName); + + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + + const auto workingDirPath = TPath::Resolve(workingDir, context.SS); + // { + // const auto checks = workingDirPath.Check(); + // checks + // .NotUnderDomainUpgrade() + // .IsAtLocalSchemeShard() + // .IsResolved() + // .NotDeleted() + // .IsLikeDirectory() + // .NotUnderDeleting(); + + // if (checks && !workingDirPath.IsTableIndex()) { + // checks.IsCommonSensePath(); + // } + + // if (!checks) { + // result->SetError(checks.GetStatus(), checks.GetError()); + // return result; + // } + // } + + const auto tablePath = workingDirPath.Child(tableName); + // { + // const auto checks = tablePath.Check(); + // checks + // .NotEmpty() + // .NotUnderDomainUpgrade() + // .IsAtLocalSchemeShard() + // .IsResolved() + // .NotDeleted() + // .IsTable() + // .NotAsyncReplicaTable() + // .NotUnderDeleting(); + + // if (checks) { + // if (!tablePath.IsInsideTableIndexPath()) { + // checks.IsCommonSensePath(); + // } + // checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op + // } + + // if (!checks) { + // result->SetError(checks.GetStatus(), checks.GetError()); + // return result; + // } + // } + + TString errStr; + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + // NKikimrScheme::EStatus status; + // if (!context.SS->CanCreateSnapshot(tablePath.Base()->PathId, OperationId.GetTxId(), status, errStr)) { + // result->SetError(status, errStr); + // return result; + // } + + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); + context.DbChanges.PersistTxState(OperationId); + + // context.MemChanges.GrabNewTableSnapshot(context.SS, tablePath.Base()->PathId, OperationId.GetTxId()); + // context.DbChanges.PersistTableSnapshot(tablePath.Base()->PathId, OperationId.GetTxId()); + + // context.SS->TablesWithSnapshots.emplace(tablePath.Base()->PathId, OperationId.GetTxId()); + // context.SS->SnapshotTables[OperationId.GetTxId()].insert(tablePath.Base()->PathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + Y_ABORT_UNLESS(table->AlterVersion != 0); + Y_ABORT_UNLESS(!table->AlterData); + + const auto txType = TTxState::TxCreateCdcStreamAtTableWithInitialScan; + + Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); + auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); + txState.State = TTxState::ConfigureParts; + + tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; + tablePath.Base()->LastTxId = OperationId.GetTxId(); + + for (const auto& splitOpId : table->GetSplitOpsInFlight()) { + context.OnComplete.Dependence(splitOpId.GetTxId(), OperationId.GetTxId()); + } + + context.OnComplete.ActivateTx(OperationId); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TNewRestoreFromAtTable AbortPropose" + << ": opId# " << OperationId); + } + + void AbortUnsafe(TTxId txId, TOperationContext& context) override { + LOG_N("TNewRestoreFromAtTable AbortUnsafe" + << ": opId# " << OperationId + << ", txId# " << txId); + context.OnComplete.DoneOperation(OperationId); + } + +}; // TNewRestoreFromAtTable + +} // namespace NIncrBackup + +/////// +/////// + TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup); @@ -161,18 +577,26 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c } } - NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; - createCdcStreamOp.SetTableName(srcTableName); - auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); - streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup); - streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); - streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); + // NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + // createCdcStreamOp.SetTableName(srcTableName); + // auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + // streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); + // streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup); + // streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + // streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); TVector result; DoCreateLock(opId, workingDirPath, srcTablePath, false, result); + { + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + outTx.MutableRestoreIncrementalBackup()->CopyFrom(restoreOp); + result.push_back(MakeSubOperation(NextPartId(opId, result), outTx)); + } + + /* + DoCreateAlterTable(opId, dstTablePath, result); NCdc::DoCreateStream( @@ -194,6 +618,8 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c acceptExisted, result); + */ + return result; } From 87e8222b0c122df0815931735535141cd5c06ea6 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 27 Aug 2024 10:36:37 +0000 Subject: [PATCH 2/6] WIP --- ydb/core/base/appdata_fwd.h | 2 + ydb/core/protos/flat_scheme_op.proto | 4 +- .../tx/datashard/check_scheme_tx_unit.cpp | 3 + ...te_restore_incremental_backup_src_unit.cpp | 211 +++++++++--------- .../datashard_active_transaction.cpp | 8 +- .../datashard/datashard_active_transaction.h | 1 + ydb/core/tx/datashard/datashard_pipeline.cpp | 3 + ...tion_create_restore_incremental_backup.cpp | 13 +- 8 files changed, 136 insertions(+), 109 deletions(-) diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index b15d1cfcf880..5c4a675a5673 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -116,6 +116,7 @@ namespace NMiniKQL { namespace NDataShard { class IExportFactory; + class IRestoreIncrementalBackupFactory; } namespace NSQS { @@ -153,6 +154,7 @@ struct TAppData { const NScheme::TTypeRegistry* TypeRegistry = nullptr; const NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDataShard::IExportFactory *DataShardExportFactory = nullptr; + const NDataShard::IRestoreIncrementalBackupFactory *DataShardRestoreIncrementalBackupFactory = nullptr; const TFormatFactory* FormatFactory = nullptr; const NSQS::IEventsWriterFactory* SqsEventsWriterFactory = nullptr; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index d5d926aac08f..f44d722f782e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -422,7 +422,7 @@ message TTableDescription { // This flag is create-only, and has to be set up // on table creation to allow system column names (started with __ydb_) // It won't be present on describes and won't be preserved - optional bool SystemColumnNamesAllowed = 43; + optional bool SystemColumnNamesAllowed = 42; } message TDictionaryEncodingSettings { @@ -1015,6 +1015,8 @@ message TDropContinuousBackup { message TRestoreIncrementalBackup { optional string SrcTableName = 1; + optional NKikimrProto.TPathID SrcPathId = 3; + optional string DstTableName = 2; } diff --git a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp index 137233eb7508..197a5d10a859 100644 --- a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp @@ -380,6 +380,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx) case TSchemaOperation::ETypeDropCdcStream: res = CheckDropCdcStream(activeTx); break; + case TSchemaOperation::ETypeRestoreIncrementalBackupSrc: + res = true; // FIXME: CheckDropCdcStream(activeTx); + break; default: LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Unknown scheme tx type detected at tablet " diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp index d8018f176d81..882ff627b7f0 100644 --- a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -2,11 +2,26 @@ #include "execution_unit_ctors.h" #include "datashard_active_transaction.h" #include "datashard_impl.h" +#include "export_iface.h" +#include "export_scan.h" +#include namespace NKikimr { namespace NDataShard { using namespace NKikimrTxDataShard; +using namespace NExportScan; + +/// + +class IRestoreIncrementalBackupFactory { +public: + virtual ~IRestoreIncrementalBackupFactory() = default; + virtual IExport* CreateRestore(const ::NKikimrSchemeOp::TRestoreIncrementalBackup, const IExport::TTableColumns& columns) const = 0; + virtual void Shutdown() = 0; +}; + +/// class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { protected: @@ -27,81 +42,72 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { op->ResetWaitingForRestartFlag(); } + void Abort(TOperation::TPtr op, const TActorContext& ctx, const TString& error) { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, error); + + BuildResult(op)->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, error); + ResetWaiting(op); + + Cancel(tx, ctx); + } + bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) { - Y_UNUSED(op, txc, ctx); - // TActiveTransaction* tx = dynamic_cast(op.Get()); - // Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); - - // Y_ABORT_UNLESS(tx->GetSchemeTx().HasBackup()); - // const auto& backup = tx->GetSchemeTx().GetBackup(); - - // const ui64 tableId = backup.GetTableId(); - // Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); - - // const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; - // Y_ABORT_UNLESS(txc.DB.GetScheme().GetTableInfo(localTableId)); - - // auto* appData = AppData(ctx); - // const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; - // std::shared_ptr<::NKikimr::NDataShard::IExport> exp; - - // if (backup.HasYTSettings()) { - // if (backup.HasCompression()) { - // Abort(op, ctx, "Exports to YT do not support compression"); - // return false; - // } - - // if (auto* exportFactory = appData->DataShardExportFactory) { - // std::shared_ptr(exportFactory->CreateExportToYt(backup, columns)).swap(exp); - // } else { - // Abort(op, ctx, "Exports to YT are disabled"); - // return false; - // } - // } else if (backup.HasS3Settings()) { - // NBackupRestoreTraits::ECompressionCodec codec; - // if (!TryCodecFromTask(backup, codec)) { - // Abort(op, ctx, TStringBuilder() << "Unsupported compression codec" - // << ": " << backup.GetCompression().GetCodec()); - // return false; - // } - - // if (auto* exportFactory = appData->DataShardExportFactory) { - // std::shared_ptr(exportFactory->CreateExportToS3(backup, columns)).swap(exp); - // } else { - // Abort(op, ctx, "Exports to S3 are disabled"); - // return false; - // } - // } else { - // Abort(op, ctx, "Unsupported backup task"); - // return false; - // } - - // auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { - // return exp->CreateUploader(self, txId); - // }; - - // THolder buffer{exp->CreateBuffer()}; - // THolder scan{CreateExportScan(std::move(buffer), createUploader)}; - - // const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); - // const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority(); - - // ui64 readAheadLo = appData->DataShardConfig.GetBackupReadAheadLo(); - // if (ui64 readAheadLoOverride = DataShard.GetBackupReadAheadLoOverride(); readAheadLoOverride > 0) { - // readAheadLo = readAheadLoOverride; - // } - - // ui64 readAheadHi = appData->DataShardConfig.GetBackupReadAheadHi(); - // if (ui64 readAheadHiOverride = DataShard.GetBackupReadAheadHiOverride(); readAheadHiOverride > 0) { - // readAheadHi = readAheadHiOverride; - // } - - // tx->SetScanTask(DataShard.QueueScan(localTableId, scan.Release(), op->GetTxId(), - // TScanOptions() - // .SetResourceBroker(taskName, taskPrio) - // .SetReadAhead(readAheadLo, readAheadHi) - // .SetReadPrio(TScanOptions::EReadPrio::Low) - // )); + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + Y_ABORT_UNLESS(tx->GetSchemeTx().HasRestoreIncrementalBackupSrc()); + const auto& restoreSrc = tx->GetSchemeTx().GetRestoreIncrementalBackupSrc(); + + const ui64 tableId = restoreSrc.GetSrcPathId().GetLocalId(); + Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); + + const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + Y_ABORT_UNLESS(txc.DB.GetScheme().GetTableInfo(localTableId)); + + auto* appData = AppData(ctx); + const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; + std::shared_ptr<::NKikimr::NDataShard::IExport> exp; // TODO: decouple from export + Y_UNUSED(exp, appData, columns); + + if (auto* restoreFactory = appData->DataShardRestoreIncrementalBackupFactory) { + std::shared_ptr(restoreFactory->CreateRestore(restoreSrc, columns)).swap(exp); + } else { + Abort(op, ctx, "Restore incremental backup are disabled"); + Y_VERIFY("1"); + return false; + } + + auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { + return exp->CreateUploader(self, txId); + }; + + THolder buffer{exp->CreateBuffer()}; + THolder scan{CreateExportScan(std::move(buffer), createUploader)}; + + // FIXME: + + const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); + const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority(); + + ui64 readAheadLo = appData->DataShardConfig.GetBackupReadAheadLo(); + if (ui64 readAheadLoOverride = DataShard.GetBackupReadAheadLoOverride(); readAheadLoOverride > 0) { + readAheadLo = readAheadLoOverride; + } + + ui64 readAheadHi = appData->DataShardConfig.GetBackupReadAheadHi(); + if (ui64 readAheadHiOverride = DataShard.GetBackupReadAheadHiOverride(); readAheadHiOverride > 0) { + readAheadHi = readAheadHiOverride; + } + + tx->SetScanTask(DataShard.QueueScan(localTableId, scan.Release(), op->GetTxId(), + TScanOptions() + .SetResourceBroker(taskName, taskPrio) + .SetReadAhead(readAheadLo, readAheadHi) + .SetReadPrio(TScanOptions::EReadPrio::Low) + )); return true; } @@ -114,31 +120,30 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { TActiveTransaction* tx = dynamic_cast(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); - // auto* result = CheckedCast(op->ScanResult().Get()); - // bool done = true; - - // switch (result->Outcome) { - // case EExportOutcome::Success: - // case EExportOutcome::Error: - // if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) { - // schemeOp->Success = result->Outcome == EExportOutcome::Success; - // schemeOp->Error = std::move(result->Error); - // schemeOp->BytesProcessed = result->BytesRead; - // schemeOp->RowsProcessed = result->RowsRead; - // } else { - // Y_FAIL_S("Cannot find schema tx: " << op->GetTxId()); - // } - // break; - // case EExportOutcome::Aborted: - // done = false; - // break; - // } - - // op->SetScanResult(nullptr); - // tx->SetScanTask(0); - - // return done; - return true; + auto* result = CheckedCast(op->ScanResult().Get()); + bool done = true; + + switch (result->Outcome) { + case EExportOutcome::Success: + case EExportOutcome::Error: + if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) { + schemeOp->Success = result->Outcome == EExportOutcome::Success; + schemeOp->Error = std::move(result->Error); + schemeOp->BytesProcessed = result->BytesRead; + schemeOp->RowsProcessed = result->RowsRead; + } else { + Y_FAIL_S("Cannot find schema tx: " << op->GetTxId()); + } + break; + case EExportOutcome::Aborted: + done = false; + break; + } + + op->SetScanResult(nullptr); + tx->SetScanTask(0); + + return done; } void Cancel(TActiveTransaction* tx, const TActorContext&) { @@ -164,9 +169,14 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override final { + Y_ABORT_UNLESS(op->IsSchemeTx()); + TActiveTransaction* tx = dynamic_cast(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + const TString msg = TStringBuilder() << "Got2 " << "<" << tx->IsSchemeTx() << ">" << tx->GetTxBody() << " tx"; + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, msg); + if (!IsRelevant(tx)) { return EExecutionStatus::Executed; } @@ -232,7 +242,8 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { switch (ev->GetTypeRewrite()) { // OHFunc(TEvCancel, Handle); } - Y_UNUSED(op,ctx); + Y_VERIFY("2"); + Y_UNUSED(op, ctx); } public: diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index d5130891d25d..ed82e56ee321 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -366,7 +366,7 @@ void TActiveTransaction::FillTxData(TDataShard *self, if (DataTx->HasStreamResponse()) SetStreamSink(DataTx->GetSink()); } else if (IsSchemeTx()) { - BuildSchemeTx(); + Y_ABORT_UNLESS(BuildSchemeTx()); } else if (IsSnapshotTx()) { BuildSnapshotTx(); } else if (IsDistributedEraseTx()) { @@ -440,7 +440,8 @@ bool TActiveTransaction::BuildSchemeTx() + (ui32)SchemeTx->HasCreateCdcStreamNotice() + (ui32)SchemeTx->HasAlterCdcStreamNotice() + (ui32)SchemeTx->HasDropCdcStreamNotice() - + (ui32)SchemeTx->HasMoveIndex(); + + (ui32)SchemeTx->HasMoveIndex() + + (ui32)SchemeTx->HasRestoreIncrementalBackupSrc(); if (count != 1) return false; @@ -476,6 +477,8 @@ bool TActiveTransaction::BuildSchemeTx() SchemeTxType = TSchemaOperation::ETypeDropCdcStream; else if (SchemeTx->HasMoveIndex()) SchemeTxType = TSchemaOperation::ETypeMoveIndex; + else if (SchemeTx->HasRestoreIncrementalBackupSrc()) + SchemeTxType = TSchemaOperation::ETypeRestoreIncrementalBackupSrc; else SchemeTxType = TSchemaOperation::ETypeUnknown; @@ -858,6 +861,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded) plan.push_back(EExecutionUnitKind::CreateCdcStream); plan.push_back(EExecutionUnitKind::AlterCdcStream); plan.push_back(EExecutionUnitKind::DropCdcStream); + plan.push_back(EExecutionUnitKind::RestoreIncrementalBackupSrc); plan.push_back(EExecutionUnitKind::CompleteOperation); plan.push_back(EExecutionUnitKind::CompletedOperations); } else { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 356fae934f2e..f86a1eb9b4de 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -54,6 +54,7 @@ struct TSchemaOperation { ETypeAlterCdcStream = 14, ETypeDropCdcStream = 15, ETypeMoveIndex = 16, + ETypeRestoreIncrementalBackupSrc = 17, ETypeUnknown = Max() }; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index a5646fa379ff..aa84ff06917f 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1435,6 +1435,9 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: tx->Orbit = std::move(ev->Get()->Orbit); tx->OperationSpan = std::move(operationSpan); + const TString msg = TStringBuilder() << "Got " << "<" << tx->IsSchemeTx() << ">" << rec.GetTxBody() << " tx"; + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, msg); + auto malformed = [&](const TStringBuf txType, const TString& txBody) { const TString error = TStringBuilder() << "Malformed " << txType << " tx" << " at tablet " << Self->TabletID() diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index 4918b5fd2977..b7d7b5ddd0b4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -148,8 +148,8 @@ class TConfigurePartsAtTable: public TSubOperationState { auto table = context.SS->Tables.at(pathId); auto& notice = *tx.MutableRestoreIncrementalBackupSrc(); + PathIdFromPathId(pathId, notice.MutableSrcPathId()); - Y_UNUSED(notice); // TODO: copy op to notice } @@ -343,12 +343,13 @@ class TDoneWithInitialScan: public TDone { streamPathId = childPathId; } - if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { - return true; - } + // if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { + // return true; + // } - Y_ABORT_UNLESS(streamPathId); - context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId)); + // FIXME(+active) + // Y_ABORT_UNLESS(streamPathId); + // context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId)); return true; } From e01b2caba5373c57837d7490a7fc7bb6bde8e683 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 29 Aug 2024 14:29:06 +0000 Subject: [PATCH 3/6] WIP --- ...te_restore_incremental_backup_src_unit.cpp | 312 +++++++++++++++++- ...tion_create_restore_incremental_backup.cpp | 4 +- 2 files changed, 301 insertions(+), 15 deletions(-) diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp index 882ff627b7f0..b8f6fdaf651b 100644 --- a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -21,6 +21,291 @@ class IRestoreIncrementalBackupFactory { virtual void Shutdown() = 0; }; +class TTableExport: public IExport { +public: + explicit TTableExport(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& task, const TTableColumns& columns) + : Task(task) + , Columns(columns) + { + } + + IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override { + // FIXME + Y_UNUSED(dataShard, txId); + return nullptr; + } + + IBuffer* CreateBuffer() const override { + // using namespace NBackupRestoreTraits; + + // const auto& scanSettings = Task.GetScanSettings(); + // const ui64 maxRows = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max(); + // const ui64 maxBytes = scanSettings.GetBytesBatchSize(); + // const ui64 minBytes = Task.GetS3Settings().GetLimits().GetMinWriteBatchSize(); + + // switch (CodecFromTask(Task)) { + // case ECompressionCodec::None: + // return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes); + // case ECompressionCodec::Zstd: + // return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes); + // case ECompressionCodec::Invalid: + // Y_ABORT("unreachable"); + // } + // FIXME + return nullptr; + } + + void Shutdown() const override {} + +protected: + const ::NKikimrSchemeOp::TRestoreIncrementalBackup Task; + const TTableColumns Columns; +}; + +class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IScan { + enum EStateBits { + ES_REGISTERED = 0, // Actor is registered + ES_INITIALIZED, // Seek(...) was called + ES_UPLOADER_READY, + ES_BUFFER_SENT, + ES_NO_MORE_DATA, + + ES_COUNT, + }; + + struct TStats: public IBuffer::TStats { + TStats() + : IBuffer::TStats() + { + auto counters = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "store_to_yt"); + + MonRows = counters->GetCounter("Rows", true); + MonBytesRead = counters->GetCounter("BytesRead", true); + MonBytesSent = counters->GetCounter("BytesSent", true); + } + + void Aggr(ui64 rows, ui64 bytesRead, ui64 bytesSent) { + Rows += rows; + BytesRead += bytesRead; + BytesSent += bytesSent; + + *MonRows += rows; + *MonBytesRead += bytesRead; + *MonBytesSent += bytesSent; + } + + void Aggr(const IBuffer::TStats& stats) { + Aggr(stats.Rows, stats.BytesRead, stats.BytesSent); + } + + TString ToString() const { + return TStringBuilder() + << "Stats { " + << " Rows: " << Rows + << " BytesRead: " << BytesRead + << " BytesSent: " << BytesSent + << " }"; + } + + private: + ::NMonitoring::TDynamicCounters::TCounterPtr MonRows; + ::NMonitoring::TDynamicCounters::TCounterPtr MonBytesRead; + ::NMonitoring::TDynamicCounters::TCounterPtr MonBytesSent; + }; + + bool IsReady() const { + return State.Test(ES_REGISTERED) && State.Test(ES_INITIALIZED); + } + + void MaybeReady() { + if (IsReady()) { + Send(Uploader, new TEvExportScan::TEvReady()); + } + } + + EScan MaybeSendBuffer() { + const bool noMoreData = State.Test(ES_NO_MORE_DATA); + + if (!noMoreData && !Buffer->IsFilled()) { + return EScan::Feed; + } + + if (!State.Test(ES_UPLOADER_READY) || State.Test(ES_BUFFER_SENT)) { + Spent->Alter(false); + return EScan::Sleep; + } + + IBuffer::TStats stats; + THolder ev{Buffer->PrepareEvent(noMoreData, stats)}; + + if (!ev) { + Success = false; + Error = Buffer->GetError(); + return EScan::Final; + } + + Send(Uploader, std::move(ev)); + State.Set(ES_BUFFER_SENT); + Stats->Aggr(stats); + + if (noMoreData) { + Spent->Alter(false); + return EScan::Sleep; + } + + return EScan::Feed; + } + + void Handle(TEvExportScan::TEvReset::TPtr&) { + Y_ABORT_UNLESS(IsReady()); + + EXPORT_LOG_D("Handle TEvExportScan::TEvReset" + << ": self# " << SelfId()); + + Stats.Reset(new TStats); + State.Reset(ES_UPLOADER_READY).Reset(ES_BUFFER_SENT).Reset(ES_NO_MORE_DATA); + Spent->Alter(true); + Driver->Touch(EScan::Reset); + } + + void Handle(TEvExportScan::TEvFeed::TPtr&) { + Y_ABORT_UNLESS(IsReady()); + + EXPORT_LOG_D("Handle TEvExportScan::TEvFeed" + << ": self# " << SelfId()); + + State.Set(ES_UPLOADER_READY).Reset(ES_BUFFER_SENT); + Spent->Alter(true); + if (EScan::Feed == MaybeSendBuffer()) { + Driver->Touch(EScan::Feed); + } + } + + void Handle(TEvExportScan::TEvFinish::TPtr& ev) { + Y_ABORT_UNLESS(IsReady()); + + EXPORT_LOG_D("Handle TEvExportScan::TEvFinish" + << ": self# " << SelfId() + << ", msg# " << ev->Get()->ToString()); + + Success = ev->Get()->Success; + Error = ev->Get()->Error; + Driver->Touch(EScan::Final); + } + +public: + static constexpr TStringBuf LogPrefix() { + return "scanner"sv; + } + + explicit TDirectReplicationScan() + : IActorCallback(static_cast(&TExportScan::StateWork), NKikimrServices::TActivity::EXPORT_SCAN_ACTOR) + , Stats(new TStats) + , Driver(nullptr) + , Success(false) + { + } + + void Describe(IOutputStream& o) const noexcept override { + o << "ExportScan { " + << "Uploader: " << Uploader + << Stats->ToString() << " " + << "Success: " << Success + << "Error: " << Error + << " }"; + } + + IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr scheme) noexcept override { + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); + + Driver = driver; + Scheme = std::move(scheme); + Spent = new TSpent(TAppData::TimeProvider.Get()); + Buffer->ColumnsOrder(Scheme->Tags()); + + return {EScan::Feed, {}}; + } + + void Registered(TActorSystem* sys, const TActorId&) override { + // Uploader = sys->Register(CreateUploaderFn(), TMailboxType::HTSwap, AppData()->BatchPoolId); + + State.Set(ES_REGISTERED); + MaybeReady(); + } + + EScan Seek(TLead& lead, ui64) noexcept override { + lead.To(Scheme->Tags(), {}, ESeek::Lower); + Buffer->Clear(); + + State.Set(ES_INITIALIZED); + MaybeReady(); + + Spent->Alter(true); + return EScan::Feed; + } + + EScan Feed(TArrayRef, const TRow& row) noexcept override { + if (!Buffer->Collect(row)) { + Success = false; + Error = Buffer->GetError(); + EXPORT_LOG_E("Error read data from table: " << Error); + return EScan::Final; + } + + return MaybeSendBuffer(); + } + + EScan Exhausted() noexcept override { + State.Set(ES_NO_MORE_DATA); + return MaybeSendBuffer(); + } + + TAutoPtr Finish(EAbort abort) noexcept override { + auto outcome = EExportOutcome::Success; + if (abort != EAbort::None) { + outcome = EExportOutcome::Aborted; + } else if (!Success) { + outcome = EExportOutcome::Error; + } + + PassAway(); + return new TExportScanProduct(outcome, Error, Stats->BytesRead, Stats->Rows); + } + + void PassAway() override { + if (const auto& actorId = std::exchange(Uploader, {})) { + Send(actorId, new TEvents::TEvPoisonPill()); + } + + IActorCallback::PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExportScan::TEvReset, Handle); + hFunc(TEvExportScan::TEvFeed, Handle); + hFunc(TEvExportScan::TEvFinish, Handle); + } + } + +private: + TActorId Uploader; + THolder Stats; + + IDriver* Driver; + TIntrusiveConstPtr Scheme; + TAutoPtr Spent; + + TBitMap State; + bool Success; + TString Error; + +}; // TExportScan + +NTable::IScan* CreateDirectReplicationScan() { + return nullptr; // FIXME +} + /// class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { @@ -72,20 +357,22 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { std::shared_ptr<::NKikimr::NDataShard::IExport> exp; // TODO: decouple from export Y_UNUSED(exp, appData, columns); - if (auto* restoreFactory = appData->DataShardRestoreIncrementalBackupFactory) { - std::shared_ptr(restoreFactory->CreateRestore(restoreSrc, columns)).swap(exp); - } else { - Abort(op, ctx, "Restore incremental backup are disabled"); - Y_VERIFY("1"); - return false; - } + // if (auto* restoreFactory = appData->DataShardRestoreIncrementalBackupFactory) { + // std::shared_ptr(restoreFactory->CreateRestore(restoreSrc, columns)).swap(exp); + // } else { + // std::shared_ptr(new TTableExport(restoreSrc, columns)).swap(exp); + // /* + // Abort(op, ctx, "Restore incremental backup are disabled"); + // return false; + // */ + // } - auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { - return exp->CreateUploader(self, txId); - }; + // auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { + // return exp->CreateUploader(self, txId); + // }; - THolder buffer{exp->CreateBuffer()}; - THolder scan{CreateExportScan(std::move(buffer), createUploader)}; + // THolder buffer{exp->CreateBuffer()}; + THolder scan{CreateDirectReplicationScan(/* std::move(buffer), createUploader */)}; // FIXME: @@ -242,7 +529,6 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { switch (ev->GetTypeRewrite()) { // OHFunc(TEvCancel, Handle); } - Y_VERIFY("2"); Y_UNUSED(op, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index b7d7b5ddd0b4..a61c10987a48 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -120,7 +120,7 @@ namespace NIncrBackup { class TConfigurePartsAtTable: public TSubOperationState { TString DebugHint() const override { return TStringBuilder() - << "NCdcStreamState::TConfigurePartsAtTable" + << "NIncrBackupState::TConfigurePartsAtTable" << " operationId: " << OperationId; } @@ -212,7 +212,7 @@ class TConfigurePartsAtTable: public TSubOperationState { class TProposeAtTable: public TSubOperationState { TString DebugHint() const override { return TStringBuilder() - << "NCdcStreamState::TProposeAtTable" + << "NIncrBackupState::TProposeAtTable" << " operationId: " << OperationId; } From af593d16b79425453b12542d99794ceeb9c8691b Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 30 Aug 2024 20:31:04 +0000 Subject: [PATCH 4/6] WIP --- ydb/core/protos/flat_scheme_op.proto | 1 + ...te_restore_incremental_backup_src_unit.cpp | 135 ++++++++---------- ydb/core/tx/replication/service/worker.cpp | 31 +++- ydb/core/tx/replication/service/worker.h | 5 + 4 files changed, 95 insertions(+), 77 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f44d722f782e..cae26ee33129 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1018,6 +1018,7 @@ message TRestoreIncrementalBackup { optional NKikimrProto.TPathID SrcPathId = 3; optional string DstTableName = 2; + optional NKikimrProto.TPathID DstPathId = 4; } enum EIndexType { diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp index b8f6fdaf651b..5dfc4e16881d 100644 --- a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -5,6 +5,17 @@ #include "export_iface.h" #include "export_scan.h" #include +#include +#include +#include + +#define EXPORT_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) namespace NKikimr { namespace NDataShard { @@ -14,54 +25,6 @@ using namespace NExportScan; /// -class IRestoreIncrementalBackupFactory { -public: - virtual ~IRestoreIncrementalBackupFactory() = default; - virtual IExport* CreateRestore(const ::NKikimrSchemeOp::TRestoreIncrementalBackup, const IExport::TTableColumns& columns) const = 0; - virtual void Shutdown() = 0; -}; - -class TTableExport: public IExport { -public: - explicit TTableExport(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& task, const TTableColumns& columns) - : Task(task) - , Columns(columns) - { - } - - IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override { - // FIXME - Y_UNUSED(dataShard, txId); - return nullptr; - } - - IBuffer* CreateBuffer() const override { - // using namespace NBackupRestoreTraits; - - // const auto& scanSettings = Task.GetScanSettings(); - // const ui64 maxRows = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max(); - // const ui64 maxBytes = scanSettings.GetBytesBatchSize(); - // const ui64 minBytes = Task.GetS3Settings().GetLimits().GetMinWriteBatchSize(); - - // switch (CodecFromTask(Task)) { - // case ECompressionCodec::None: - // return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes); - // case ECompressionCodec::Zstd: - // return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes); - // case ECompressionCodec::Invalid: - // Y_ABORT("unreachable"); - // } - // FIXME - return nullptr; - } - - void Shutdown() const override {} - -protected: - const ::NKikimrSchemeOp::TRestoreIncrementalBackup Task; - const TTableColumns Columns; -}; - class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IScan { enum EStateBits { ES_REGISTERED = 0, // Actor is registered @@ -126,7 +89,7 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS EScan MaybeSendBuffer() { const bool noMoreData = State.Test(ES_NO_MORE_DATA); - if (!noMoreData && !Buffer->IsFilled()) { + if (!noMoreData /* && !Buffer->IsFilled() */) { return EScan::Feed; } @@ -135,22 +98,23 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS return EScan::Sleep; } - IBuffer::TStats stats; - THolder ev{Buffer->PrepareEvent(noMoreData, stats)}; + // IBuffer::TStats stats; + // THolder ev{Buffer->PrepareEvent(noMoreData, stats)}; - if (!ev) { - Success = false; - Error = Buffer->GetError(); - return EScan::Final; - } + // if (!ev) { + // Success = false; + // Error = Buffer->GetError(); + // return EScan::Final; + // } - Send(Uploader, std::move(ev)); - State.Set(ES_BUFFER_SENT); - Stats->Aggr(stats); + // Send(Uploader, std::move(ev)); + // State.Set(ES_BUFFER_SENT); + // Stats->Aggr(stats); if (noMoreData) { Spent->Alter(false); - return EScan::Sleep; + return EScan::Final; // FIXME: tmp + // return EScan::Sleep; } return EScan::Feed; @@ -198,8 +162,9 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS return "scanner"sv; } - explicit TDirectReplicationScan() - : IActorCallback(static_cast(&TExportScan::StateWork), NKikimrServices::TActivity::EXPORT_SCAN_ACTOR) + explicit TDirectReplicationScan(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) + : IActorCallback(static_cast(&TDirectReplicationScan::StateWork), NKikimrServices::TActivity::EXPORT_SCAN_ACTOR) + , Config(incrBackup) , Stats(new TStats) , Driver(nullptr) , Success(false) @@ -215,27 +180,43 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS << " }"; } + auto CreateWriterFactory() { + return [=]() -> IActor* { + return NBackup::NImpl::CreateLocalTableWriter( + PathIdFromPathId(Config.GetDstPathId()), + NBackup::NImpl::EWriterType::Restore); + }; + } + IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr scheme) noexcept override { TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); Driver = driver; Scheme = std::move(scheme); Spent = new TSpent(TAppData::TimeProvider.Get()); - Buffer->ColumnsOrder(Scheme->Tags()); + // Buffer->ColumnsOrder(Scheme->Tags()); return {EScan::Feed, {}}; } - void Registered(TActorSystem* sys, const TActorId&) override { + void Registered(TActorSystem* /* sys */, const TActorId& /* selfId */) override { // Uploader = sys->Register(CreateUploaderFn(), TMailboxType::HTSwap, AppData()->BatchPoolId); + // + auto* workerActor = NKikimr::NReplication::NService::CreateWorker( + SelfId(), + SelfId(), + CreateWriterFactory()); + + Worker = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(workerActor); + State.Set(ES_REGISTERED); MaybeReady(); } EScan Seek(TLead& lead, ui64) noexcept override { - lead.To(Scheme->Tags(), {}, ESeek::Lower); - Buffer->Clear(); + lead.To(Scheme->Tags(), {}, NTable::ESeek::Lower); + // Buffer->Clear(); State.Set(ES_INITIALIZED); MaybeReady(); @@ -244,13 +225,13 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS return EScan::Feed; } - EScan Feed(TArrayRef, const TRow& row) noexcept override { - if (!Buffer->Collect(row)) { - Success = false; - Error = Buffer->GetError(); - EXPORT_LOG_E("Error read data from table: " << Error); - return EScan::Final; - } + EScan Feed(TArrayRef, const TRow& /* row */) noexcept override { + // if (!Buffer->Collect(row)) { + // Success = false; + // Error = Buffer->GetError(); + // EXPORT_LOG_E("Error read data from table: " << Error); + // return EScan::Final; + // } return MaybeSendBuffer(); } @@ -289,7 +270,9 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS } private: + const ::NKikimrSchemeOp::TRestoreIncrementalBackup Config; TActorId Uploader; + TActorId Worker; THolder Stats; IDriver* Driver; @@ -302,8 +285,8 @@ class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IS }; // TExportScan -NTable::IScan* CreateDirectReplicationScan() { - return nullptr; // FIXME +NTable::IScan* CreateDirectReplicationScan(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) { + return new TDirectReplicationScan(incrBackup); } /// @@ -372,7 +355,7 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { // }; // THolder buffer{exp->CreateBuffer()}; - THolder scan{CreateDirectReplicationScan(/* std::move(buffer), createUploader */)}; + THolder scan{CreateDirectReplicationScan(restoreSrc)}; // FIXME: diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 5bbac37f23bf..8a8a53293d98 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -92,6 +92,13 @@ class TWorker: public TActorBootstrapped { { } + explicit TActorInfo(const TActorId& actorId) + : ActorId(actorId) + , InitDone(true) + , CreateAttempt(0) + { + } + operator TActorId() const { return ActorId; } @@ -101,7 +108,10 @@ class TWorker: public TActorBootstrapped { } void Register(IActorOps* ops) { - ActorId = ops->RegisterWithSameMailbox(CreateFn()); + if (CreateFn) { + ActorId = ops->RegisterWithSameMailbox(CreateFn()); + } + ops->Send(ActorId, new TEvWorker::TEvHandshake()); InitDone = false; ++CreateAttempt; @@ -274,6 +284,17 @@ class TWorker: public TActorBootstrapped { { } + explicit TWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn) + : Parent(parent) + , Reader(preparedReader) + , Writer(std::move(createWriterFn)) + , Lag(TDuration::Zero()) + { + } + void Bootstrap() { for (auto* actor : {&Reader, &Writer}) { actor->Register(this); @@ -314,4 +335,12 @@ IActor* CreateWorker( return new TWorker(parent, std::move(createReaderFn), std::move(createWriterFn)); } +IActor* CreateWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn) +{ + return new TWorker(parent, preparedReader, std::move(createWriterFn)); +} + } diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index ec2fd4197230..dcc9be90156a 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -75,6 +75,11 @@ IActor* CreateWorker( std::function&& createReaderFn, std::function&& createWriterFn); +IActor* CreateWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn); + } Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TEvWorker::TEvData::TRecord, o, x) { From f94e0def604a64bea5e734eaa0bedf42e6fbbe60 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 6 Sep 2024 14:16:14 +0000 Subject: [PATCH 5/6] WIP --- ydb/core/tx/datashard/cdc_stream_scan.cpp | 183 ++++++++++- ydb/core/tx/datashard/cdc_stream_scan.h | 2 + ydb/core/tx/datashard/change_exchange.h | 1 + ydb/core/tx/datashard/change_exchange_impl.h | 1 + .../tx/datashard/change_exchange_split.cpp | 2 + ydb/core/tx/datashard/change_sender.cpp | 3 + .../datashard/change_sender_async_index.cpp | 40 ++- ...te_restore_incremental_backup_src_unit.cpp | 286 ++---------------- ydb/core/tx/datashard/datashard_impl.h | 4 + .../schemeshard__operation_common.h | 2 +- ...tion_create_restore_incremental_backup.cpp | 26 +- 11 files changed, 265 insertions(+), 285 deletions(-) diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index ed214a994363..272efbe483e6 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -1,6 +1,7 @@ #include "cdc_stream_scan.h" #include "change_record_body_serializer.h" #include "datashard_impl.h" +#include "change_exchange_impl.h" #include #include @@ -8,6 +9,10 @@ #include #include +#undef LOG_D +#undef LOG_I +#undef LOG_W + #define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) #define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) #define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) @@ -417,7 +422,7 @@ class TDataShard::TTxCdcStreamScanProgress }; // TTxCdcStreamScanProgress -class TCdcStreamScan: public IActorCallback, public IScan { +class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeRecordBodySerializer { using TStats = TCdcStreamScanManager::TStats; struct TDataShardId { @@ -474,7 +479,30 @@ class TCdcStreamScan: public IActorCallback, public IScan { switch (ev->GetTypeRewrite()) { hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle); + hFunc(TEvents::TEvWakeup, Start); + hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords); + // IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease); + default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + void Start(TEvents::TEvWakeup::TPtr&) { + Driver->Touch(EScan::Feed); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { + // LOG_D("Handltypename e " << ev->Get()->ToString()); + + TVector records(::Reserve(ev->Get()->Records.size())); + + for (const auto& record : ev->Get()->Records) { + auto it = PendingRecords.find(record.Order); + Y_ABORT_UNLESS(it != PendingRecords.end()); + records.emplace_back(it->second); } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::make_shared>(std::move(records)))); } void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { @@ -482,13 +510,99 @@ class TCdcStreamScan: public IActorCallback, public IScan { Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS); } - void Progress() { + static TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { + TVector key(Reserve(cells.size())); + + Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + } + + return key; + } + + static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector updates(::Reserve(cells.size() - 1)); + + bool foundSpecialColumn = false; + Y_ABORT_UNLESS(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__ydb_incrBackupImpl_deleted") { + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } + foundSpecialColumn = true; + continue; + } + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + Y_ABORT_UNLESS(foundSpecialColumn); + + return updates; + } + + EScan Progress() { Stats.RowsProcessed += Buffer.Rows(); Stats.BytesProcessed += Buffer.Bytes(); + if (IncrRestore) { + auto& ctx = TlsActivationContext->AsActorContext(); + auto TabletID = [&]() { return DataShard.TabletId; }; + LOG_D("IncrRestore@Progress()" + << ": Buffer.Rows()# " << Buffer.Rows()); + + // auto reservationCookie = Self->ReserveChangeQueueCapacity(Buffer.Rows()); + auto rows = Buffer.Flush(); + TVector changeRecords; + TVector records; + + auto table = Self->GetUserTables().at(TablePathId.LocalPathId); + for (auto& [k, v] : rows) { + LOG_D("IncrRestore@Progress()#iter" + << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); + const auto key = MakeKey(k.GetCells(), table); + const auto& keyTags = table->KeyColumnIds; + NKikimrChangeExchange::TDataChange body; + if (auto updates = MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::CdcDataChange) + .WithOrder(++Order) + .WithGroup(0) + .WithStep(ReadVersion.Step) + .WithTxId(ReadVersion.TxId) + .WithPathId(StreamPathId) + .WithTableId(TablePathId) + .WithSchemaVersion(table->GetTableSchemaVersion()) + .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) + .Build(); + + const auto& record = *recordPtr; + + records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + // Self->ChangesQueue.emplace(record.GetOrder(), record); + PendingRecords.emplace(record.GetOrder(), recordPtr); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records)); + // Self->MaybeActivateChangeSender(TlsActivationContext->AsActorContext()); + // Self->EnqueueChangeRecords(std::move(changeRecords), reservationCookie); + + return NoMoreData ? EScan::Sleep : EScan::Feed; + } + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()), Stats )); + + return EScan::Sleep; } void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) { @@ -526,9 +640,41 @@ class TCdcStreamScan: public IActorCallback, public IScan { , Driver(nullptr) , NoMoreData(false) , Stats(stats) + , IncrRestore(false) { } + static TVector InitValueTags(TDataShard* self, const TPathId& tablePathId) { + auto table = self->GetUserTables().at(tablePathId.LocalPathId); + TVector valueTags; + valueTags.reserve(table->Columns.size() - 1); + for (const auto& [tag, column] : table->Columns) { + if (!column.IsKey) { + valueTags.push_back(tag); + } + } + + return valueTags; + } + + explicit TCdcStreamScan( + TDataShard* self, + ui64 txId, + const TPathId& tablePathId, + const TPathId& streamPathId) + : IActorCallback(static_cast(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + , DataShard{self->SelfId(), self->TabletID()} + , TxId(txId) + , TablePathId(tablePathId) + , StreamPathId(streamPathId) + , ReadVersion({}) + , ValueTags(InitValueTags(self, tablePathId)) + , Limits({}) + , Stats({}) + , IncrRestore(true) + , Self(self) + {} + void Describe(IOutputStream& o) const noexcept override { o << "CdcStreamScan {" << " TxId: " << TxId @@ -541,11 +687,21 @@ class TCdcStreamScan: public IActorCallback, public IScan { TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); Driver = driver; Y_ABORT_UNLESS(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size()); + + if (IncrRestore) { + return {EScan::Sleep, {}}; + } + return {EScan::Feed, {}}; } void Registered(TActorSystem* sys, const TActorId&) override { - sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId())); + if (IncrRestore) { + auto ds = NKikimr::NDataShard::TDataShardId(Self->TabletID(), Self->Generation(), SelfId()); + ChangeSender = RegisterWithSameMailbox(CreateIncrRestoreChangeSender(ds, TablePathId, StreamPathId)); + } else { + sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId())); + } } EScan Seek(TLead& lead, ui64) noexcept override { @@ -581,8 +737,7 @@ class TCdcStreamScan: public IActorCallback, public IScan { return EScan::Final; } - Progress(); - return EScan::Sleep; + return Progress(); } TAutoPtr Finish(EAbort abort) noexcept override { @@ -611,7 +766,11 @@ class TCdcStreamScan: public IActorCallback, public IScan { bool NoMoreData; TBuffer Buffer; TStats Stats; - + bool IncrRestore; + TDataShard* Self; + ui64 Order = 0; + TActorId ChangeSender; + TMap PendingRecords; }; // TCdcStreamScan class TDataShard::TTxCdcStreamScanRun: public TTransactionBase { @@ -783,4 +942,16 @@ void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TA Execute(new TTxCdcStreamScanProgress(this, ev), ctx); } +THolder TDataShard::CreateVolatileStreamScan( + TPathId tablePathId, + const TPathId& streamPathId) +{ + const ui64 localTxId = NextTieBreakerIndex++; + return MakeHolder( + this, + localTxId, + tablePathId, + streamPathId); +} + } diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h index eaa2460e149e..ed6f928259ac 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.h +++ b/ydb/core/tx/datashard/cdc_stream_scan.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 548fe14e3163..309f356e33e7 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -49,6 +49,7 @@ struct TEvChangeExchange { enum class ESenderType { AsyncIndex, CdcStream, + IncrRestore, }; struct TEvAddSender: public TEventLocal { diff --git a/ydb/core/tx/datashard/change_exchange_impl.h b/ydb/core/tx/datashard/change_exchange_impl.h index b6af2aea263e..1945d651d54b 100644 --- a/ydb/core/tx/datashard/change_exchange_impl.h +++ b/ydb/core/tx/datashard/change_exchange_impl.h @@ -6,6 +6,7 @@ namespace NKikimr { namespace NDataShard { +IActor* CreateIncrRestoreChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId); IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId); IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId& streamPathId); diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 93e6d68a3b8d..b0954c5e154c 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -523,6 +523,8 @@ class TChangeExchageSplit: public TActorBootstrapped { return Register(new TCdcWorker(SelfId(), pathId, DataShard.TabletId, DstDataShards)); case EWorkerType::AsyncIndex: Y_ABORT("unreachable"); + case EWorkerType::IncrRestore: + Y_ABORT("unreachable"); } } diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index d1c5e8c40331..2d40cb328d03 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -59,6 +59,8 @@ class TChangeSender: public TActor { return Register(CreateAsyncIndexChangeSender(DataShard, userTableId, pathId)); case ESenderType::CdcStream: return Register(CreateCdcStreamChangeSender(DataShard, pathId)); + case ESenderType::IncrRestore: + return Register(CreateIncrRestoreChangeSender(DataShard, userTableId, pathId)); } } @@ -107,6 +109,7 @@ class TChangeSender: public TActor { auto it = Senders.find(msg.PathId); if (it != Senders.end()) { + Y_ABORT("Trying to create multiple senders"); Y_ABORT_UNLESS(it->second.UserTableId == msg.UserTableId); Y_ABORT_UNLESS(it->second.Type == msg.Type); LOG_W("Trying to add duplicate sender" diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1a361379f343..1e099bb4eb6e 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -62,6 +62,10 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedGet()->ToString()); auto records = MakeHolder(); @@ -198,6 +203,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped& tagMap) + const TPathId& indexTablePathId, const TMap& tagMap, bool noLease = false) : Parent(parent) , DataShard(dataShard) , ShardId(shardId) @@ -294,6 +300,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList)); Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); - Become(&TThis::StateResolveIndex); + if (!IncrRestore) { + Become(&TThis::StateResolveIndex); + } else { + Become(&TThis::StateResolveIndexTable); + } } STATEFN(StateResolveIndex) { @@ -599,7 +611,9 @@ class TAsyncIndexChangeSenderMain const auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, IndexTablePathId)) { + if (IncrRestore && !CheckTableId(entry, PathId)) { + return; + } else if (!IncrRestore && !CheckTableId(entry, IndexTablePathId)) { return; } @@ -611,6 +625,7 @@ class TAsyncIndexChangeSenderMain return; } + // FIXME(+active) TagMap.clear(); TVector keyColumnTypes; @@ -680,7 +695,9 @@ class TAsyncIndexChangeSenderMain auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, IndexTablePathId)) { + if (IncrRestore && !CheckTableId(entry, PathId)) { + return; + } else if (!IncrRestore && !CheckTableId(entry, IndexTablePathId)) { return; } @@ -700,6 +717,10 @@ class TAsyncIndexChangeSenderMain KeyDesc = std::move(entry.KeyDescription); CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + if (IncrRestore) { + Send(DataShard.ActorId, new TEvents::TEvWakeup); + } + Become(&TThis::StateMain); } @@ -718,7 +739,7 @@ class TAsyncIndexChangeSenderMain } IActor* CreateSender(ui64 partitionId) const override { - return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap); + return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IncrRestore ? PathId : IndexTablePathId, TagMap, IncrRestore); } void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { @@ -774,12 +795,13 @@ class TAsyncIndexChangeSenderMain return NKikimrServices::TActivity::CHANGE_SENDER_ASYNC_INDEX_ACTOR_MAIN; } - explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) + explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId, bool incrRestore = false) : TActorBootstrapped() , TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId) , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) + , IncrRestore(incrRestore) { } @@ -820,11 +842,15 @@ class TAsyncIndexChangeSenderMain TPathId IndexTablePathId; ui64 IndexTableVersion; THolder KeyDesc; - + bool IncrRestore; }; // TAsyncIndexChangeSenderMain IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) { return new TAsyncIndexChangeSenderMain(dataShard, userTableId, indexPathId); } +IActor* CreateIncrRestoreChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId) { + return new TAsyncIndexChangeSenderMain(dataShard, userTableId, restoreTargetPathId, true); +} + } diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp index 5dfc4e16881d..a883e512d60f 100644 --- a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -3,7 +3,8 @@ #include "datashard_active_transaction.h" #include "datashard_impl.h" #include "export_iface.h" -#include "export_scan.h" +#include "cdc_stream_scan.h" + #include #include #include @@ -25,273 +26,16 @@ using namespace NExportScan; /// -class TDirectReplicationScan: private NActors::IActorCallback, public NTable::IScan { - enum EStateBits { - ES_REGISTERED = 0, // Actor is registered - ES_INITIALIZED, // Seek(...) was called - ES_UPLOADER_READY, - ES_BUFFER_SENT, - ES_NO_MORE_DATA, - - ES_COUNT, - }; - - struct TStats: public IBuffer::TStats { - TStats() - : IBuffer::TStats() - { - auto counters = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "store_to_yt"); - - MonRows = counters->GetCounter("Rows", true); - MonBytesRead = counters->GetCounter("BytesRead", true); - MonBytesSent = counters->GetCounter("BytesSent", true); - } - - void Aggr(ui64 rows, ui64 bytesRead, ui64 bytesSent) { - Rows += rows; - BytesRead += bytesRead; - BytesSent += bytesSent; - - *MonRows += rows; - *MonBytesRead += bytesRead; - *MonBytesSent += bytesSent; - } - - void Aggr(const IBuffer::TStats& stats) { - Aggr(stats.Rows, stats.BytesRead, stats.BytesSent); - } - - TString ToString() const { - return TStringBuilder() - << "Stats { " - << " Rows: " << Rows - << " BytesRead: " << BytesRead - << " BytesSent: " << BytesSent - << " }"; - } - - private: - ::NMonitoring::TDynamicCounters::TCounterPtr MonRows; - ::NMonitoring::TDynamicCounters::TCounterPtr MonBytesRead; - ::NMonitoring::TDynamicCounters::TCounterPtr MonBytesSent; - }; - - bool IsReady() const { - return State.Test(ES_REGISTERED) && State.Test(ES_INITIALIZED); - } - - void MaybeReady() { - if (IsReady()) { - Send(Uploader, new TEvExportScan::TEvReady()); - } - } - - EScan MaybeSendBuffer() { - const bool noMoreData = State.Test(ES_NO_MORE_DATA); - - if (!noMoreData /* && !Buffer->IsFilled() */) { - return EScan::Feed; - } - - if (!State.Test(ES_UPLOADER_READY) || State.Test(ES_BUFFER_SENT)) { - Spent->Alter(false); - return EScan::Sleep; - } - - // IBuffer::TStats stats; - // THolder ev{Buffer->PrepareEvent(noMoreData, stats)}; - - // if (!ev) { - // Success = false; - // Error = Buffer->GetError(); - // return EScan::Final; - // } - - // Send(Uploader, std::move(ev)); - // State.Set(ES_BUFFER_SENT); - // Stats->Aggr(stats); - - if (noMoreData) { - Spent->Alter(false); - return EScan::Final; // FIXME: tmp - // return EScan::Sleep; - } - - return EScan::Feed; - } - - void Handle(TEvExportScan::TEvReset::TPtr&) { - Y_ABORT_UNLESS(IsReady()); - - EXPORT_LOG_D("Handle TEvExportScan::TEvReset" - << ": self# " << SelfId()); - - Stats.Reset(new TStats); - State.Reset(ES_UPLOADER_READY).Reset(ES_BUFFER_SENT).Reset(ES_NO_MORE_DATA); - Spent->Alter(true); - Driver->Touch(EScan::Reset); - } - - void Handle(TEvExportScan::TEvFeed::TPtr&) { - Y_ABORT_UNLESS(IsReady()); - - EXPORT_LOG_D("Handle TEvExportScan::TEvFeed" - << ": self# " << SelfId()); - - State.Set(ES_UPLOADER_READY).Reset(ES_BUFFER_SENT); - Spent->Alter(true); - if (EScan::Feed == MaybeSendBuffer()) { - Driver->Touch(EScan::Feed); - } - } - - void Handle(TEvExportScan::TEvFinish::TPtr& ev) { - Y_ABORT_UNLESS(IsReady()); - - EXPORT_LOG_D("Handle TEvExportScan::TEvFinish" - << ": self# " << SelfId() - << ", msg# " << ev->Get()->ToString()); - - Success = ev->Get()->Success; - Error = ev->Get()->Error; - Driver->Touch(EScan::Final); - } - -public: - static constexpr TStringBuf LogPrefix() { - return "scanner"sv; - } - - explicit TDirectReplicationScan(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) - : IActorCallback(static_cast(&TDirectReplicationScan::StateWork), NKikimrServices::TActivity::EXPORT_SCAN_ACTOR) - , Config(incrBackup) - , Stats(new TStats) - , Driver(nullptr) - , Success(false) - { - } - - void Describe(IOutputStream& o) const noexcept override { - o << "ExportScan { " - << "Uploader: " << Uploader - << Stats->ToString() << " " - << "Success: " << Success - << "Error: " << Error - << " }"; - } - - auto CreateWriterFactory() { - return [=]() -> IActor* { - return NBackup::NImpl::CreateLocalTableWriter( - PathIdFromPathId(Config.GetDstPathId()), - NBackup::NImpl::EWriterType::Restore); - }; - } - - IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr scheme) noexcept override { - TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); - - Driver = driver; - Scheme = std::move(scheme); - Spent = new TSpent(TAppData::TimeProvider.Get()); - // Buffer->ColumnsOrder(Scheme->Tags()); - - return {EScan::Feed, {}}; - } - - void Registered(TActorSystem* /* sys */, const TActorId& /* selfId */) override { - // Uploader = sys->Register(CreateUploaderFn(), TMailboxType::HTSwap, AppData()->BatchPoolId); - // - auto* workerActor = NKikimr::NReplication::NService::CreateWorker( - SelfId(), - SelfId(), - CreateWriterFactory()); - - Worker = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(workerActor); - - - State.Set(ES_REGISTERED); - MaybeReady(); - } - - EScan Seek(TLead& lead, ui64) noexcept override { - lead.To(Scheme->Tags(), {}, NTable::ESeek::Lower); - // Buffer->Clear(); - - State.Set(ES_INITIALIZED); - MaybeReady(); - - Spent->Alter(true); - return EScan::Feed; - } - - EScan Feed(TArrayRef, const TRow& /* row */) noexcept override { - // if (!Buffer->Collect(row)) { - // Success = false; - // Error = Buffer->GetError(); - // EXPORT_LOG_E("Error read data from table: " << Error); - // return EScan::Final; - // } - - return MaybeSendBuffer(); - } - - EScan Exhausted() noexcept override { - State.Set(ES_NO_MORE_DATA); - return MaybeSendBuffer(); - } - - TAutoPtr Finish(EAbort abort) noexcept override { - auto outcome = EExportOutcome::Success; - if (abort != EAbort::None) { - outcome = EExportOutcome::Aborted; - } else if (!Success) { - outcome = EExportOutcome::Error; - } - - PassAway(); - return new TExportScanProduct(outcome, Error, Stats->BytesRead, Stats->Rows); - } - - void PassAway() override { - if (const auto& actorId = std::exchange(Uploader, {})) { - Send(actorId, new TEvents::TEvPoisonPill()); - } - - IActorCallback::PassAway(); - } - - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvExportScan::TEvReset, Handle); - hFunc(TEvExportScan::TEvFeed, Handle); - hFunc(TEvExportScan::TEvFinish, Handle); - } - } - -private: - const ::NKikimrSchemeOp::TRestoreIncrementalBackup Config; - TActorId Uploader; - TActorId Worker; - THolder Stats; - - IDriver* Driver; - TIntrusiveConstPtr Scheme; - TAutoPtr Spent; - - TBitMap State; - bool Success; - TString Error; - -}; // TExportScan - -NTable::IScan* CreateDirectReplicationScan(const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) { - return new TDirectReplicationScan(incrBackup); +THolder CreateDirectReplicationScan(TDataShard& self, const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) { + TPathId tablePathId = PathIdFromPathId(incrBackup.GetSrcPathId()); + TPathId dstTablePathId = PathIdFromPathId(incrBackup.GetDstPathId()); + return self.CreateVolatileStreamScan(tablePathId, dstTablePathId); } /// class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { + THolder AddSender; protected: bool IsRelevant(TActiveTransaction* tx) const { return tx->GetSchemeTx().HasRestoreIncrementalBackupSrc(); @@ -335,6 +79,9 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; Y_ABORT_UNLESS(txc.DB.GetScheme().GetTableInfo(localTableId)); + Y_ABORT_UNLESS(restoreSrc.HasDstPathId()); + // const TPathId dstTableId = PathIdFromPathId(restoreSrc.GetDstPathId()); + auto* appData = AppData(ctx); const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; std::shared_ptr<::NKikimr::NDataShard::IExport> exp; // TODO: decouple from export @@ -355,7 +102,8 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { // }; // THolder buffer{exp->CreateBuffer()}; - THolder scan{CreateDirectReplicationScan(restoreSrc)}; + // + THolder scan{CreateDirectReplicationScan(DataShard, restoreSrc)}; // FIXME: @@ -379,6 +127,16 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { .SetReadPrio(TScanOptions::EReadPrio::Low) )); + // AddSender.Reset(new TEvChangeExchange::TEvAddSender( + // TTableId(DataShard.GetPathOwnerId(), tableId), + // TEvChangeExchange::ESenderType::IncrRestore, + // dstTableId + // )); + + // if (AddSender) { + // ctx.Send(DataShard.GetChangeSender(), AddSender.Release()); + // } + return true; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b291ef0ca3ce..4173f69a9041 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2126,6 +2126,10 @@ class TDataShard return LogThrottlers[type]; }; + THolder CreateVolatileStreamScan( + TPathId tablePathId, + const TPathId& streamPathId); + private: /// class TLoanReturnTracker { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 14ec42f43168..537c6829333e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -487,7 +487,7 @@ class TDone: public TSubOperationState { TString DebugHint() const override { return TStringBuilder() << "TDone" - << " opId# " << OperationId; + << " opId# " << OperationId << " "; } public: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index a61c10987a48..44b92ca7ec4d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -140,22 +140,26 @@ class TConfigurePartsAtTable: public TSubOperationState { protected: // FIXME - void FillNotice(const TPathId& pathId, NKikimrTxDataShard::TFlatSchemeTransaction& tx, TOperationContext& context) const { + void FillNotice( + const TPathId& pathId, + NKikimrTxDataShard::TFlatSchemeTransaction& tx, + TOperationContext& context) const + { Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); auto path = context.SS->PathsById.at(pathId); Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); auto table = context.SS->Tables.at(pathId); - auto& notice = *tx.MutableRestoreIncrementalBackupSrc(); - PathIdFromPathId(pathId, notice.MutableSrcPathId()); + tx.MutableRestoreIncrementalBackupSrc()->CopyFrom(RestoreOp); // TODO: copy op to notice } public: - explicit TConfigurePartsAtTable(TOperationId id) + explicit TConfigurePartsAtTable(TOperationId id, const NKikimrSchemeOp::TRestoreIncrementalBackup& restoreOp) : OperationId(id) + , RestoreOp(restoreOp) { IgnoreMessages(DebugHint(), {}); } @@ -206,7 +210,7 @@ class TConfigurePartsAtTable: public TSubOperationState { private: const TOperationId OperationId; - + const NKikimrSchemeOp::TRestoreIncrementalBackup RestoreOp; }; // TConfigurePartsAtTable class TProposeAtTable: public TSubOperationState { @@ -379,7 +383,7 @@ class TNewRestoreFromAtTable: public TSubOperation { switch (state) { case TTxState::Waiting: case TTxState::ConfigureParts: - return MakeHolder(OperationId); + return MakeHolder(OperationId, Transaction.GetRestoreIncrementalBackup()); case TTxState::Propose: return MakeHolder(OperationId); case TTxState::ProposedWaitParts: @@ -406,12 +410,16 @@ class TNewRestoreFromAtTable: public TSubOperation { const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetRestoreIncrementalBackup(); const auto& tableName = op.GetSrcTableName(); + const auto& dstTableName = op.GetDstTableName(); // LOG_N("TNewRestoreFromAtTable Propose" // << ": opId# " << OperationId // << ", stream# " << workingDir << "/" << tableName << "/" << streamName); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + auto result = MakeHolder( + NKikimrScheme::StatusAccepted, + ui64(OperationId.GetTxId()), + context.SS->TabletID()); const auto workingDirPath = TPath::Resolve(workingDir, context.SS); // { @@ -459,6 +467,7 @@ class TNewRestoreFromAtTable: public TSubOperation { // return result; // } // } + const auto dstTablePath = workingDirPath.Child(dstTableName); TString errStr; if (!context.SS->CheckApplyIf(Transaction, errStr)) { @@ -593,6 +602,9 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); outTx.MutableRestoreIncrementalBackup()->CopyFrom(restoreOp); + auto& restoreOp = *outTx.MutableRestoreIncrementalBackup(); + PathIdFromPathId(srcTablePath.Base()->PathId, restoreOp.MutableSrcPathId()); + PathIdFromPathId(dstTablePath.Base()->PathId, restoreOp.MutableDstPathId()); result.push_back(MakeSubOperation(NextPartId(opId, result), outTx)); } From 9647d53a45d668f2652eb26380aebca2927ac389 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 9 Sep 2024 00:09:47 +0000 Subject: [PATCH 6/6] WIP --- .../change_sender_common_ops.h | 17 +++++++++ ydb/core/tx/datashard/cdc_stream_scan.cpp | 36 ++++++++++++++++--- ydb/core/tx/datashard/change_exchange.h | 5 +++ .../datashard/change_sender_async_index.cpp | 33 +++++++++++++---- ...te_restore_incremental_backup_src_unit.cpp | 17 +++++++-- ydb/core/tx/datashard/datashard.h | 14 ++++++++ .../datashard/datashard_change_receiving.cpp | 21 ++++++++--- ydb/core/tx/datashard/datashard_impl.h | 9 ++++- 8 files changed, 133 insertions(+), 19 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index c90cb766cfde..d482bd5585da 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -137,6 +137,9 @@ class TBaseChangeSender { auto it = Senders.find(partitionId); if (it != Senders.end()) { senders.emplace(partitionId, std::move(it->second)); + if (it->second.Ready) { + --ReadySenders; + } Senders.erase(it); } else { LazyCreateSender(senders, partitionId); @@ -208,6 +211,8 @@ class TBaseChangeSender { THashSet registrations; bool needToResolve = false; + // int tmp = PendingSent.size(); + while (it != PendingSent.end()) { if (Enqueued && Enqueued.begin()->Order <= it->first) { break; @@ -258,6 +263,8 @@ class TBaseChangeSender { it = PendingSent.erase(it); } + // Y_ABORT_S("something strange: " << sendTo.size() << " " << tmp); + for (const auto partitionId : registrations) { RegisterSender(partitionId); } @@ -279,6 +286,7 @@ class TBaseChangeSender { Y_ABORT_UNLESS(sender.Ready); sender.Ready = false; + ReadySenders--; sender.Pending.reserve(sender.Prepared.size()); for (const auto& record : sender.Prepared) { @@ -527,6 +535,7 @@ class TBaseChangeSender { auto& sender = it->second; sender.Ready = true; + ReadySenders++; if (sender.Pending) { RemoveRecords(std::exchange(sender.Pending, {})); @@ -551,6 +560,9 @@ class TBaseChangeSender { } ReEnqueueRecords(it->second); + if (it->second.Ready) { + --ReadySenders; + } Senders.erase(it); GonePartitions.push_back(partitionId); @@ -576,6 +588,10 @@ class TBaseChangeSender { , MemUsage(0) {} + bool AllReady() { + return ReadySenders == Senders.size(); + } + void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { const auto& cgi = ev->Get()->Cgi(); if (const auto& str = cgi.Get("partitionId")) { @@ -776,6 +792,7 @@ class TBaseChangeSender { ui64 MemUsage; THashMap Senders; // ui64 is partition id + ui64 ReadySenders = 0; TSet Enqueued; TSet PendingBody; TMap PendingSent; // ui64 is order diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 272efbe483e6..aad223f943f9 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -482,6 +482,7 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco hFunc(TEvents::TEvWakeup, Start); hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords); + hFunc(TEvChangeExchange::TEvAllSent, Handle); // IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease); default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } @@ -491,6 +492,10 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco Driver->Touch(EScan::Feed); } + void Handle(TEvChangeExchange::TEvAllSent::TPtr&) { + Driver->Touch(EScan::Final); + } + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { // LOG_D("Handltypename e " << ev->Get()->ToString()); @@ -502,7 +507,7 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco records.emplace_back(it->second); } - Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::make_shared>(std::move(records)))); + Send(ev->Sender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::make_shared>(std::move(records)))); } void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { @@ -572,7 +577,7 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); } - auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::CdcDataChange) + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex) .WithOrder(++Order) .WithGroup(0) .WithStep(ReadVersion.Step) @@ -595,6 +600,10 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco // Self->MaybeActivateChangeSender(TlsActivationContext->AsActorContext()); // Self->EnqueueChangeRecords(std::move(changeRecords), reservationCookie); + if (NoMoreData) { + Send(ChangeSender, new TEvChangeExchange::TEvNoMoreData()); + } + return NoMoreData ? EScan::Sleep : EScan::Feed; } @@ -733,6 +742,10 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco EScan Exhausted() noexcept override { NoMoreData = true; + if (!Buffer && IncrRestore) { + return EScan::Sleep; + } + if (!Buffer) { return EScan::Final; } @@ -741,6 +754,10 @@ class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeReco } TAutoPtr Finish(EAbort abort) noexcept override { + if (IncrRestore) { + Send(DataShard.ActorId, new TEvDataShard::TEvRestoreFinished{TxId}); + } + if (abort != EAbort::None) { Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); } else { @@ -942,14 +959,23 @@ void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TA Execute(new TTxCdcStreamScanProgress(this, ev), ctx); } +void TDataShard::Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, const TActorContext& ctx) { + RestoreFinished = true; + + TOperation::TPtr op = Pipeline.FindOp(ev->Get()->TxId); + if (op) { + ForwardEventToOperation(ev, op, ctx); + } +} + THolder TDataShard::CreateVolatileStreamScan( TPathId tablePathId, - const TPathId& streamPathId) + const TPathId& streamPathId, + ui64 txId) { - const ui64 localTxId = NextTieBreakerIndex++; return MakeHolder( this, - localTxId, + txId, // why not tie breaker? tablePathId, streamPathId); } diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 309f356e33e7..7e0ffc1b0d76 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -33,6 +33,9 @@ struct TEvChangeExchange { // Split/merge EvSplitAck, + EvNoMoreData, + EvAllSent, + EvEnd, }; @@ -44,6 +47,8 @@ struct TEvChangeExchange { struct TEvStatus: public TEventPB {}; struct TEvActivateSender: public TEventPB {}; struct TEvActivateSenderAck: public TEventPB {}; + struct TEvNoMoreData: public TEventLocal {}; + struct TEvAllSent: public TEventLocal {}; /// Local events enum class ESenderType { diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1e099bb4eb6e..d56a721ee7f0 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -63,7 +63,11 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped(); + handshake->Record.SetOrigin(DataShard.TabletId); + handshake->Record.SetGeneration(DataShard.Generation); + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true)); + Become(&TThis::StateHandshake); return; } Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie); @@ -126,7 +130,6 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedGet()->ToString()); auto records = MakeHolder(); @@ -159,8 +162,9 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedGetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvNoMoreData, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvChangeExchange::TEvNoMoreData::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + NoMoreData = true; + + if (AllReady()) { + Send(DataShard.ActorId, new TEvChangeExchange::TEvAllSent()); + } } void Resolve() override { @@ -761,6 +777,10 @@ class TAsyncIndexChangeSenderMain void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); + + if (NoMoreData && AllReady()) { + Send(DataShard.ActorId, new TEvChangeExchange::TEvAllSent()); + } } void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { @@ -835,6 +855,7 @@ class TAsyncIndexChangeSenderMain const TDataShardId DataShard; const TTableId UserTableId; mutable TMaybe LogPrefix; + bool NoMoreData = false; THashMap MainColumnToTag; TMap TagMap; // from main to index diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp index a883e512d60f..e2211a3957b5 100644 --- a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -26,10 +26,10 @@ using namespace NExportScan; /// -THolder CreateDirectReplicationScan(TDataShard& self, const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup) { +THolder CreateDirectReplicationScan(TDataShard& self, const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup, ui64 txId) { TPathId tablePathId = PathIdFromPathId(incrBackup.GetSrcPathId()); TPathId dstTablePathId = PathIdFromPathId(incrBackup.GetDstPathId()); - return self.CreateVolatileStreamScan(tablePathId, dstTablePathId); + return self.CreateVolatileStreamScan(tablePathId, dstTablePathId, txId); } /// @@ -70,6 +70,8 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { TActiveTransaction* tx = dynamic_cast(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + Y_ABORT_UNLESS(!DataShard.RestoreStarted, "uh-oh"); + Y_ABORT_UNLESS(tx->GetSchemeTx().HasRestoreIncrementalBackupSrc()); const auto& restoreSrc = tx->GetSchemeTx().GetRestoreIncrementalBackupSrc(); @@ -103,7 +105,7 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { // THolder buffer{exp->CreateBuffer()}; // - THolder scan{CreateDirectReplicationScan(DataShard, restoreSrc)}; + THolder scan{CreateDirectReplicationScan(DataShard, restoreSrc, op->GetTxId())}; // FIXME: @@ -120,6 +122,8 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { readAheadHi = readAheadHiOverride; } + DataShard.RestoreStarted = true; + tx->SetScanTask(DataShard.QueueScan(localTableId, scan.Release(), op->GetTxId(), TScanOptions() .SetResourceBroker(taskName, taskPrio) @@ -266,8 +270,15 @@ class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext&) override final { } + + void Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, TOperation::TPtr op, const TActorContext& ctx) { + Y_UNUSED(ev, op, ctx); + ResetWaiting(op); + } + void ProcessEvent(TAutoPtr& ev, TOperation::TPtr op, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { + OHFunc(TEvDataShard::TEvRestoreFinished, Handle); // OHFunc(TEvCancel, Handle); } Y_UNUSED(op, ctx); diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 8c305a3a5ceb..60422aef8f37 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -332,6 +332,8 @@ struct TEvDataShard { EvSampleKRequest, EvSampleKResponse, + EvRestoreFinished, + EvEnd }; @@ -1716,6 +1718,18 @@ struct TEvDataShard { Record.SetErrorDescription(error); } }; + + struct TEvRestoreFinished + : public TEventLocal + { + TEvRestoreFinished(ui64 txId) + : TxId(txId) + { } + + ui64 TxId; + // todo restore tx id + persist + }; }; IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index 01daef6e42c4..6734a7e3064f 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -305,7 +305,14 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { return false; } - txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId())); + if (!MvccReadWriteVersion) { + auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); + Y_DEBUG_ABORT_UNLESS(readVersion == writeVersion); + MvccReadWriteVersion = writeVersion; + Pipeline.AddCommittingOp(*MvccReadWriteVersion); + } + + txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, *MvccReadWriteVersion); Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB); tableInfo.Stats.UpdateTime = TAppData::TimeProvider->Now(); AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_OK); @@ -314,8 +321,9 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { } public: - explicit TTxApplyChangeRecords(TDataShard* self, TEvChangeExchange::TEvApplyRecords::TPtr ev) + explicit TTxApplyChangeRecords(TDataShard* self, TPipeline& pipeline, TEvChangeExchange::TEvApplyRecords::TPtr ev) : TTransactionBase(self) + , Pipeline(pipeline) , Ev(std::move(ev)) , Status(new TEvChangeExchange::TEvStatus) { @@ -392,6 +400,10 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { void Complete(const TActorContext& ctx) override { Y_ABORT_UNLESS(Status); + if (MvccReadWriteVersion) { + Pipeline.RemoveCommittingOp(*MvccReadWriteVersion); + } + if (Status->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_OK) { Self->IncCounter(COUNTER_CHANGE_EXCHANGE_SUCCESSFUL_APPLY); } else { @@ -402,6 +414,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { } private: + TPipeline& Pipeline; TEvChangeExchange::TEvApplyRecords::TPtr Ev; THolder Status; @@ -410,7 +423,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { TVector Key; TVector Value; - + std::optional MvccReadWriteVersion; }; // TTxApplyChangeRecords void TDataShard::StartCollectingChangeExchangeHandshakes(const TActorContext& ctx) { @@ -446,7 +459,7 @@ void TDataShard::Handle(TEvChangeExchange::TEvApplyRecords::TPtr& ev, const TAct << ": origin# " << ev->Get()->Record.GetOrigin() << ", generation# " << ev->Get()->Record.GetGeneration() << ", at tablet# " << TabletID()); - Execute(new TTxApplyChangeRecords(this, ev), ctx); + Execute(new TTxApplyChangeRecords(this, Pipeline, ev), ctx); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 4173f69a9041..b13baec04abf 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1392,6 +1392,8 @@ class TDataShard void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -2128,7 +2130,8 @@ class TDataShard THolder CreateVolatileStreamScan( TPathId tablePathId, - const TPathId& streamPathId); + const TPathId& streamPathId, + ui64 txId); private: /// @@ -2967,6 +2970,9 @@ class TDataShard ui32 StatisticsScanTableId = 0; ui64 StatisticsScanId = 0; + bool RestoreStarted = false; + bool RestoreFinished = false; + public: auto& GetLockChangeRecords() { return LockChangeRecords; @@ -3158,6 +3164,7 @@ class TDataShard HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle); HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle); HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); + HFunc(TEvDataShard::TEvRestoreFinished, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());