Skip to content

Commit

Permalink
Auto-sync from Azure-Kusto-Service
Browse files Browse the repository at this point in the history
  • Loading branch information
Kusto Build System committed Feb 20, 2024
1 parent 287efb8 commit 1994be5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 49 deletions.
90 changes: 51 additions & 39 deletions src/Ingestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ internal class Ingestor
private readonly string m_connectToStorageWithUserAuth = null;
private readonly string m_connectToStorageLoginUri = null;
private readonly string m_connectToStorageWithManagedIdentity = null;
private FixedWindowThrottlerPolicy m_fixedWindowThrottlerPolicy;
private FixedWindowThrottlerPolicy m_ingestionFixedWindowThrottlerPolicy;
private FixedWindowThrottlerPolicy m_listingFixedWindowThrottlerPolicy;

private object m_objectsListingLock = new object();
SemaphoreSlim m_listingLockOrNull;

// Ingestion results for queued ingest flow when confirmation is required
private object m_ingestionResultsLock = new object();
Expand Down Expand Up @@ -94,6 +95,9 @@ internal class Ingestor
private Disposer m_disposer;
private IPersistentStorageFactory m_persistentStorageFactory;
private BlobPersistentStorageFactory2 m_blob;
private const int c_maxBlocksCapacity = 10000;
private const int c_delayOnThrottlingMs = 50;
private readonly TimeSpan c_ingestionRateTime = TimeSpan.FromSeconds(1);
#endif
#endregion // Data members

Expand Down Expand Up @@ -172,7 +176,10 @@ private Ingestor(ExtendedCommandLineArgs args, AdditionalArguments additionalArg

m_ingestionProperties.IgnoreSizeLimit = m_args.NoSizeLimit;

m_fixedWindowThrottlerPolicy = new FixedWindowThrottlerPolicy(args.IngestionRateCount, TimeSpan.FromSeconds(args.IngestionRateTime));
m_ingestionFixedWindowThrottlerPolicy = new FixedWindowThrottlerPolicy(args.IngestionRateCount, c_ingestionRateTime);
m_listingFixedWindowThrottlerPolicy = new FixedWindowThrottlerPolicy(args.ListingRateCount, c_ingestionRateTime);

m_listingLockOrNull = m_objectsCountQuota > 0 ? new SemaphoreSlim(initialCount: 1, maxCount: 1) : null;
InitPSLFields();
}

Expand Down Expand Up @@ -237,7 +244,7 @@ private void Reset()

return new Ingestor(args, additionalArgs, ingestionProperties, logger);
}
#endregion Construction and initialization
#endregion Construction and initialization

internal void RunQueuedIngest(KustoConnectionStringBuilder kcsb)
{
Expand Down Expand Up @@ -265,13 +272,13 @@ record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem,
{
ingestBlock = new ActionBlock<DataSource>(
record => IngestSingle(record, m_objectsCountQuota, ingestClient, m_bFileSystem, false, m_ingestionProperties, m_ingestWithManagedIdentity),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });

// ListFiles calls PSL EnumerateFiles which accepts a pattern but BlobPersistentStorageFactory2 doesn't use the full pattern
// but only its prefix, therefore we still have to filter ourselves.
filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFiles(file, m_patternRegex, m_objectsCountQuota, ingestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, ingestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
filterObjectsBlock.Completion.ContinueWith(delegate { ingestBlock.Complete(); });

listObjectsBlock = new ActionBlock<string>(
Expand Down Expand Up @@ -415,7 +422,7 @@ record => LogSingleObject(record),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });

filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFiles(file, m_patternRegex, m_objectsCountQuota, simulatedIngestBlock),
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, simulatedIngestBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
filterObjectsBlock.Completion.ContinueWith(delegate { simulatedIngestBlock.Complete(); });

Expand Down Expand Up @@ -472,11 +479,11 @@ record => UploadFiles(record, tempContainer),
{
uploadOrAccumulateBlock = new ActionBlock<DataSource>(
record => AccumulateObjects(record),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });

filterObjectsBlock = new ActionBlock<IPersistentStorageFile>(
file => FilterFiles(file, m_patternRegex, m_objectsCountQuota, uploadOrAccumulateBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount });
file => FilterFilesAsync(file, m_patternRegex, m_objectsCountQuota, uploadOrAccumulateBlock),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExtendedEnvironment.RestrictedProcessorCount, BoundedCapacity = c_maxBlocksCapacity, EnsureOrdered = false });
filterObjectsBlock.Completion.ContinueWith(delegate { uploadOrAccumulateBlock.Complete(); });

listObjectsBlock = new ActionBlock<string>(
Expand Down Expand Up @@ -666,7 +673,7 @@ private void EnableStorageUserAuthIfNeeded(ref string sourcePath, out IKustoToke
}
}

private void ListFiles(string sourcePath, string sourceVirtualDirectory,int filesToTake, ITargetBlock<IPersistentStorageFile> targetBlock)
private void ListFiles(string sourcePath, string sourceVirtualDirectory, int filesToTake, ITargetBlock<IPersistentStorageFile> targetBlock)
{
#if !OPEN_SOURCE_COMPILATION
try
Expand All @@ -686,7 +693,11 @@ private void ListFiles(string sourcePath, string sourceVirtualDirectory,int file

ExtendedParallel.ForEach(sourceFiles, m_directIngestParallelRequests, r =>
{
targetBlock.SendAsync(r);
while (!m_listingFixedWindowThrottlerPolicy.ShouldInvoke())
{
Task.Delay(TimeSpan.FromMilliseconds(c_delayOnThrottlingMs)).ConfigureAwait(false).ResultEx();
}
targetBlock.SendAsync(r).ConfigureAwait(false).ResultEx();
Interlocked.Increment(ref m_objectsListed);
});

Expand All @@ -698,39 +709,40 @@ private void ListFiles(string sourcePath, string sourceVirtualDirectory,int file
#endif
}

private void FilterFiles(IPersistentStorageFile cloudFile, Regex patternRegex, int filesToTake, ITargetBlock<DataSource> targetBlock)
private async Task FilterFilesAsync(IPersistentStorageFile cloudFile, Regex patternRegex, int filesToTake, ITargetBlock<DataSource> targetBlock)
{
try
{
if (cloudFile != null && (patternRegex == null || patternRegex.IsMatch(cloudFile.GetFileName())))
{
// we are taking this lock here in order to not process more items than specified.
lock (m_objectsListingLock)
// Semaphore is used in order to not process more items than specified, if not specified - it is null.
m_listingLockOrNull?.Wait();

if (filesToTake >= 0 && filesToTake <= Interlocked.Read(ref m_objectsAccepted))
{
if (filesToTake >= 0 && filesToTake <= Interlocked.Read(ref m_objectsAccepted))
{
// we're done, don't need new stuff
return;
}

long size = Utilities.EstimateFileSize(cloudFile, m_estimatedCompressionRatio);
DateTime? creationTime = Utilities.InferFileCreationTimeUtc(cloudFile, m_creationTimeInNamePattern);

targetBlock.SendAsync(new DataSource
{
CloudFileUri = $"{cloudFile.GetUnsecureUri()}",
SafeCloudFileUri = cloudFile.GetFileUri(),
SizeInBytes = size,
CreationTimeUtc = creationTime
});

Interlocked.Increment(ref m_objectsAccepted);
m_listingLockOrNull?.Release();
// We're done, don't need new stuff
return;
}

m_listingLockOrNull?.Release();
long size = await Utilities.EstimateFileSizeAsync(cloudFile, m_estimatedCompressionRatio);
DateTime? creationTime = await Utilities.InferFileCreationTimeUtcAsync(cloudFile, m_creationTimeInNamePattern);

await targetBlock.SendAsync(new DataSource
{
CloudFileUri = $"{cloudFile.GetUnsecureUri()}",
SafeCloudFileUri = cloudFile.GetFileUri(),
SizeInBytes = size,
CreationTimeUtc = creationTime
});

Interlocked.Increment(ref m_objectsAccepted);
}
}
catch (Exception ex)
{
m_logger.LogError($"FilterFiles failed: {ex.Message}");
m_logger.LogError($"FilterFilesAsync failed on blob '{cloudFile.GetFileUri()}', error: {ex.Message}");
}
}

Expand Down Expand Up @@ -764,9 +776,9 @@ private void FilterFiles(IPersistentStorageFile cloudFile, Regex patternRegex, i
ingestionProperties = baseIngestionProperties;
}

while (!await m_fixedWindowThrottlerPolicy.ShouldInvokeAsync().ConfigureAwait(false))
while (!await m_ingestionFixedWindowThrottlerPolicy.ShouldInvokeAsync().ConfigureAwait(false))
{
await Task.Delay(TimeSpan.FromMilliseconds(50)).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(c_delayOnThrottlingMs)).ConfigureAwait(false);
}

if (fromFileSystem)
Expand Down Expand Up @@ -800,7 +812,7 @@ private void FilterFiles(IPersistentStorageFile cloudFile, Regex patternRegex, i
}
catch (Exception ex)
{
m_logger.LogError($"IngestSingle failed: {ex.MessageEx(true)}");
m_logger.LogError($"IngestSingle failed on blob '{storageObject.SafeCloudFileUri}', error: {ex.MessageEx(true)}");
}
}

Expand Down Expand Up @@ -1098,7 +1110,7 @@ private void WaitForIngestionResult(IEnumerable<IKustoIngestionResult> ingestion

m_logger.LogInfo(sb.ToString());
}
#endregion // Private helper methods
#endregion // Private helper methods

#region DataSource and DataSourcesBatch

Expand Down
12 changes: 7 additions & 5 deletions src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,14 @@ internal class ExtendedCommandLineArgs : CommandLineArgs

[CommandLineArg("repeatCount", "Number of repetitions of the whole ingest cycle (0: no repetitions, -1: repeat indefinitely)", ShortName = "repeat", Mandatory = false)]
public int RepeatCount = 0;
#endregion

[CommandLineArg("ingestionRateCount", "Number of blobs queued in one time window.", ShortName = "rateCount", Mandatory = false, DefaultValue = 500)]
#region Parameters to tune the rate of requests
[CommandLineArg("ingestionRateCount", "Number of blobs queued in one second.", ShortName = "rateCount", Mandatory = false, DefaultValue = 500)]
public uint IngestionRateCount = 500;

[CommandLineArg("ingestionRateTime", "Duration of time window in seconds.", ShortName = "rateTime", Mandatory = false, DefaultValue = 1)]
public uint IngestionRateTime = 1;
[CommandLineArg("listingRateCount", "Number of blobs listed in one second.", ShortName = "listRateCount", Mandatory = false, DefaultValue = 500)]
public uint ListingRateCount = 500;
#endregion

public string FormattedParametersSummary()
Expand Down Expand Up @@ -291,8 +293,8 @@ public string FormattedParametersSummary()
if (RepeatCount > 0) { esb.AppendLine($"-repeatCount : {RepeatCount}"); }
if (!string.Equals(DevTracing, "*")) { esb.AppendLine($"-trace : {DevTracing}"); }
esb.AppendLine($"-ingestionRateCount : {IngestionRateCount}");
esb.AppendLine($"-ingestionRateTime : {IngestionRateTime}");
esb.AppendLine($"-listingRateCount : {IngestionRateCount}");

esb.Unindent();
return esb.ToString();
}
Expand Down
12 changes: 7 additions & 5 deletions src/Utilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;

#if !OPEN_SOURCE_COMPILATION
using Kusto.Cloud.Platform.AWS.PersistentStorage;
using Kusto.Cloud.Platform.Azure.Storage;
Expand Down Expand Up @@ -116,11 +118,11 @@ internal static long TryGetFileSize(string path, double estimatedCompressionRati
return 0L;
}

internal static long EstimateFileSize(IPersistentStorageFile cloudFile, double estimatedCompressionRatio)
internal static async Task<long> EstimateFileSizeAsync(IPersistentStorageFile cloudFile, double estimatedCompressionRatio)
{
if (cloudFile is IFileWithMetadata cloudFileWithMetadata)
{
IReadOnlyDictionary<string, string> metadata = cloudFileWithMetadata.GetFileMetaDataAsync().ResultEx();
IReadOnlyDictionary<string, string> metadata = await cloudFileWithMetadata.GetFileMetaDataAsync();
long? estimatedSizeBytes = GetPositiveLongProperty(metadata,
#if !OPEN_SOURCE_COMPILATION
(cloudFile is S3PersistentStorageFile) ? Constants.AwsMetadaRawDataSize :
Expand All @@ -142,7 +144,7 @@ internal static long EstimateFileSize(IPersistentStorageFile cloudFile, double e
}
}

long blobSize = cloudFile.GetLength();
long blobSize = await cloudFile.GetLengthAsync();
string blobName = cloudFile.GetFileName();

// TODO: we need to add proper handling per format
Expand All @@ -159,12 +161,12 @@ internal static long EstimateFileSize(IPersistentStorageFile cloudFile, double e
return TryParseDateTimeUtcFromString(path, fileCreationTimeFormat);
}

internal static DateTime? InferFileCreationTimeUtc(IPersistentStorageFile cloudFile, DateTimeFormatPattern blobCreationTimeFormat)
internal static async Task<DateTime?> InferFileCreationTimeUtcAsync(IPersistentStorageFile cloudFile, DateTimeFormatPattern blobCreationTimeFormat)
{
// Metadata always wins, as it is more deliberate
if (cloudFile is IFileWithMetadata cloudFileWithMetadata)
{
var metadata = cloudFileWithMetadata.GetFileMetaDataAsync().ResultEx();
var metadata = await cloudFileWithMetadata.GetFileMetaDataAsync();
DateTime? creationTimeUtc = GetDateTimeProperty(metadata,
#if !OPEN_SOURCE_COMPILATION
(cloudFile is S3PersistentStorageFile) ? Constants.AwsMetadataCreationTimeLegacy :
Expand Down

0 comments on commit 1994be5

Please sign in to comment.