This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathTableRowCopyMessageProcessor.cs
More file actions
87 lines (78 loc) · 3.47 KB
/
TableRowCopyMessageProcessor.cs
File metadata and controls
87 lines (78 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Table;
namespace Knapcode.ExplorePackages.Worker.TableCopy
{
public class TableRowCopyMessageProcessor<T> : IMessageProcessor<TableRowCopyMessage<T>> where T : ITableEntity, new()
{
private readonly ServiceClientFactory _serviceClientFactory;
private readonly ILogger<TableRowCopyMessageProcessor<T>> _logger;
public TableRowCopyMessageProcessor(ServiceClientFactory serviceClientFactory, ILogger<TableRowCopyMessageProcessor<T>> logger)
{
_serviceClientFactory = serviceClientFactory;
_logger = logger;
}
public async Task ProcessAsync(TableRowCopyMessage<T> message, int dequeueCount)
{
var sourceTable = _serviceClientFactory
.GetStorageAccount()
.CreateCloudTableClient()
.GetTableReference(message.SourceTableName);
var destinationTable = _serviceClientFactory
.GetStorageAccount()
.CreateCloudTableClient()
.GetTableReference(message.DestinationTableName);
var sortedRowKeys = message.RowKeys.OrderBy(x => x, StringComparer.Ordinal).ToList();
var minRowKey = sortedRowKeys.First();
var maxRowKey = sortedRowKeys.Last();
var rowKeys = message.RowKeys.ToHashSet();
var query = new TableQuery<T>
{
FilterString = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(
StorageUtility.PartitionKey,
QueryComparisons.Equal,
message.PartitionKey),
TableOperators.And,
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(
StorageUtility.RowKey,
QueryComparisons.GreaterThanOrEqual,
minRowKey),
TableOperators.And,
TableQuery.GenerateFilterCondition(
StorageUtility.RowKey,
QueryComparisons.LessThanOrEqual,
maxRowKey))),
TakeCount = StorageUtility.MaxTakeCount,
};
var rows = new List<T>();
TableContinuationToken continuationToken = null;
do
{
var segment = await sourceTable.ExecuteQuerySegmentedAsync(query, continuationToken);
foreach (var row in segment.Results)
{
if (rowKeys.Contains(row.RowKey))
{
rows.Add(row);
}
}
continuationToken = segment.ContinuationToken;
}
while (continuationToken != null);
if (rows.Count != rowKeys.Count)
{
_logger.LogError(
"In partition key {PartitionKey}, some row keys were not found: {MissingRowKeys}",
message.PartitionKey,
rowKeys.Except(rows.Select(x => x.RowKey)).ToList());
throw new InvalidOperationException("When copying rows, some rows were not found.");
}
await destinationTable.InsertOrReplaceEntitiesAsync(rows);
}
}
}