Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Catalog2Dnx: improve throughput (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtivel committed Aug 22, 2018
1 parent 20df2bd commit 606ca27
Show file tree
Hide file tree
Showing 51 changed files with 1,701 additions and 341 deletions.
224 changes: 192 additions & 32 deletions src/Catalog/Dnx/DnxCatalogCollector.cs

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/Catalog/Dnx/DnxConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using NuGet.Services.Metadata.Catalog.Persistence;

namespace NuGet.Services.Metadata.Catalog.Dnx
{
internal static class DnxConstants
{
internal const string ApplicationOctetStreamContentType = "application/octet-stream";
internal const string DefaultCacheControl = "max-age=120";

internal static readonly IReadOnlyDictionary<string, string> RequiredBlobProperties = new Dictionary<string, string>()
{
{ StorageConstants.CacheControl, DefaultCacheControl },
{ StorageConstants.ContentType, ApplicationOctetStreamContentType }
};
}
}
85 changes: 73 additions & 12 deletions src/Catalog/Dnx/DnxMaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public DnxMaker(StorageFactory storageFactory)
public async Task<DnxEntry> AddPackageAsync(
Stream nupkgStream,
string nuspec,
string id,
string version,
string packageId,
string normalizedPackageVersion,
CancellationToken cancellationToken)
{
if (nupkgStream == null)
Expand All @@ -40,23 +40,57 @@ public DnxMaker(StorageFactory storageFactory)
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(nuspec));
}

if (string.IsNullOrEmpty(id))
if (string.IsNullOrEmpty(packageId))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(id));
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageId));
}

if (string.IsNullOrEmpty(version))
if (string.IsNullOrEmpty(normalizedPackageVersion))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(version));
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(normalizedPackageVersion));
}

cancellationToken.ThrowIfCancellationRequested();

var storage = _storageFactory.Create(id);
var normalizedVersion = NuGetVersionUtility.NormalizeVersion(version);
var storage = _storageFactory.Create(packageId);
var nuspecUri = await SaveNuspecAsync(storage, packageId, normalizedPackageVersion, nuspec, cancellationToken);
var nupkgUri = await SaveNupkgAsync(nupkgStream, storage, packageId, normalizedPackageVersion, cancellationToken);

return new DnxEntry(nupkgUri, nuspecUri);
}

public async Task<DnxEntry> AddPackageAsync(
IStorage sourceStorage,
string nuspec,
string packageId,
string normalizedPackageVersion,
CancellationToken cancellationToken)
{
if (sourceStorage == null)
{
throw new ArgumentNullException(nameof(sourceStorage));
}

if (string.IsNullOrEmpty(nuspec))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(nuspec));
}

var nuspecUri = await SaveNuspecAsync(storage, id, normalizedVersion, nuspec, cancellationToken);
var nupkgUri = await SaveNupkgAsync(nupkgStream, storage, id, normalizedVersion, cancellationToken);
if (string.IsNullOrEmpty(packageId))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageId));
}

if (string.IsNullOrEmpty(normalizedPackageVersion))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(normalizedPackageVersion));
}

cancellationToken.ThrowIfCancellationRequested();

var destinationStorage = _storageFactory.Create(packageId);
var nuspecUri = await SaveNuspecAsync(destinationStorage, packageId, normalizedPackageVersion, nuspec, cancellationToken);
var nupkgUri = await CopyNupkgAsync(sourceStorage, destinationStorage, packageId, normalizedPackageVersion, cancellationToken);

return new DnxEntry(nupkgUri, nuspecUri);
}
Expand Down Expand Up @@ -111,8 +145,9 @@ private async Task<Uri> SaveNuspecAsync(Storage storage, string id, string versi
{
var relativeAddress = GetRelativeAddressNuspec(id, version);
var nuspecUri = new Uri(storage.BaseAddress, relativeAddress);
var content = new StringStorageContent(nuspec, "text/xml", DnxConstants.DefaultCacheControl);

await storage.SaveAsync(nuspecUri, new StringStorageContent(nuspec, "text/xml", "max-age=120"), cancellationToken);
await storage.SaveAsync(nuspecUri, content, cancellationToken);

return nuspecUri;
}
Expand Down Expand Up @@ -195,10 +230,36 @@ private StorageContent CreateContent(IEnumerable<string> versions)
private async Task<Uri> SaveNupkgAsync(Stream nupkgStream, Storage storage, string id, string version, CancellationToken cancellationToken)
{
Uri nupkgUri = new Uri(storage.BaseAddress, GetRelativeAddressNupkg(id, version));
await storage.SaveAsync(nupkgUri, new StreamStorageContent(nupkgStream, "application/octet-stream", "max-age=120"), cancellationToken);
var content = new StreamStorageContent(
nupkgStream,
DnxConstants.ApplicationOctetStreamContentType,
DnxConstants.DefaultCacheControl);

await storage.SaveAsync(nupkgUri, content, cancellationToken);

return nupkgUri;
}

private async Task<Uri> CopyNupkgAsync(
IStorage sourceStorage,
Storage destinationStorage,
string id, string version, CancellationToken cancellationToken)
{
var packageFileName = PackageUtility.GetPackageFileName(id, version);
var sourceUri = sourceStorage.ResolveUri(packageFileName);
var destinationRelativeUri = GetRelativeAddressNupkg(id, version);
var destinationUri = destinationStorage.ResolveUri(destinationRelativeUri);

await sourceStorage.CopyAsync(
sourceUri,
destinationStorage,
destinationUri,
DnxConstants.RequiredBlobProperties,
cancellationToken);

return destinationUri;
}

private async Task DeleteNuspecAsync(Storage storage, string id, string version, CancellationToken cancellationToken)
{
string relativeAddress = GetRelativeAddressNuspec(id, version);
Expand Down
4 changes: 2 additions & 2 deletions src/Catalog/Helpers/PackageUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NuGet.Services.Metadata.Catalog.Helpers
{
public static class PackageUtility
{
public static string GetPackageFileNameLowercase(string packageId, string packageVersion)
public static string GetPackageFileName(string packageId, string packageVersion)
{
if (string.IsNullOrEmpty(packageId))
{
Expand All @@ -19,7 +19,7 @@ public static string GetPackageFileNameLowercase(string packageId, string packag
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageVersion));
}

return $"{packageId.ToLowerInvariant()}.{packageVersion.ToLowerInvariant()}.nupkg";
return $"{packageId}.{packageVersion}.nupkg";
}
}
}
6 changes: 6 additions & 0 deletions src/Catalog/NuGet.Services.Metadata.Catalog.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
<Reference Include="Microsoft.WindowsAzure.Storage, Version=8.2.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\WindowsAzure.Storage.8.2.1\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath>
</Reference>
<Reference Include="Microsoft.WindowsAzure.Storage.DataMovement, Version=0.6.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Azure.Storage.DataMovement.0.6.0\lib\net45\Microsoft.WindowsAzure.Storage.DataMovement.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
Expand Down Expand Up @@ -173,6 +176,7 @@
<Compile Include="CommitCollector.cs" />
<Compile Include="CommitMetadata.cs" />
<Compile Include="Constants.cs" />
<Compile Include="Dnx\DnxConstants.cs" />
<Compile Include="Dnx\DnxEntry.cs" />
<Compile Include="Helpers\AsyncExtensions.cs" />
<Compile Include="Helpers\PackageUtility.cs" />
Expand All @@ -192,6 +196,8 @@
<Compile Include="Persistence\IAzureStorage.cs" />
<Compile Include="Persistence\ICloudBlockBlob.cs" />
<Compile Include="Persistence\NamedStorageFactory.cs" />
<Compile Include="Persistence\OptimisticConcurrencyControlToken.cs" />
<Compile Include="Persistence\StorageConstants.cs" />
<Compile Include="ReadOnlyGraph.cs" />
<Compile Include="Persistence\StorageListItem.cs" />
<Compile Include="Registration\FlatContainerPackagePathProvider.cs" />
Expand Down
33 changes: 23 additions & 10 deletions src/Catalog/PackageCatalogItemCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,30 @@ public sealed class PackageCatalogItemCreator : IPackageCatalogItemCreator
return item;
}

private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDetails packageItem, CancellationToken cancellationToken)
private async Task<PackageCatalogItem> GetPackageViaStorageAsync(
FeedPackageDetails packageItem,
CancellationToken cancellationToken)
{
PackageCatalogItem item = null;
var packageFileName = PackageUtility.GetPackageFileNameLowercase(packageItem.PackageId, packageItem.PackageVersion);
var blob = await _storage.GetCloudBlockBlobReferenceAsync(packageFileName);
var packageId = packageItem.PackageId.ToLowerInvariant();
var packageVersion = packageItem.PackageVersion.ToLowerInvariant();
var packageFileName = PackageUtility.GetPackageFileName(packageId, packageVersion);
var blobUri = _storage.ResolveUri(packageFileName);
var blob = await _storage.GetCloudBlockBlobReferenceAsync(blobUri);

if (blob == null)
if (!await blob.ExistsAsync(cancellationToken))
{
_telemetryService.TrackMetric(
TelemetryConstants.NonExistentBlob,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));

return item;
}

using (_telemetryService.TrackDuration(
TelemetryConstants.PackageBlobReadSeconds,
GetProperties(packageItem, blob: null)))
GetProperties(packageId, packageVersion, blob: null)))
{
await blob.FetchAttributesAsync(cancellationToken);

Expand Down Expand Up @@ -146,7 +151,7 @@ private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDeta
_telemetryService.TrackMetric(
TelemetryConstants.BlobModified,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));
}
}
}
Expand All @@ -155,7 +160,7 @@ private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDeta
_telemetryService.TrackMetric(
TelemetryConstants.NonExistentPackageHash,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));
}
}

Expand Down Expand Up @@ -223,11 +228,19 @@ private async Task<PackageCatalogItem> GetPackageViaHttpAsync(FeedPackageDetails
}

private static Dictionary<string, string> GetProperties(FeedPackageDetails packageItem, ICloudBlockBlob blob)
{
return GetProperties(
packageItem.PackageId.ToLowerInvariant(),
packageItem.PackageVersion.ToLowerInvariant(),
blob);
}

private static Dictionary<string, string> GetProperties(string packageId, string packageVersion, ICloudBlockBlob blob)
{
var properties = new Dictionary<string, string>()
{
{ TelemetryConstants.Id, packageItem.PackageId.ToLowerInvariant() },
{ TelemetryConstants.Version, packageItem.PackageVersion.ToLowerInvariant() }
{ TelemetryConstants.Id, packageId },
{ TelemetryConstants.Version, packageVersion }
};

if (blob != null)
Expand Down
34 changes: 22 additions & 12 deletions src/Catalog/Persistence/AggregateStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,38 @@ namespace NuGet.Services.Metadata.Catalog.Persistence
public class AggregateStorage : Storage
{
public delegate StorageContent WriteSecondaryStorageContentInterceptor(
Uri primaryStorageBaseUri,
Uri primaryResourceUri,
Uri primaryStorageBaseUri,
Uri primaryResourceUri,
Uri secondaryStorageBaseUri,
Uri secondaryResourceUri,
Uri secondaryResourceUri,
StorageContent content);

private readonly Storage _primaryStorage;
private readonly Storage[] _secondaryStorage;
private readonly WriteSecondaryStorageContentInterceptor _writeSecondaryStorageContentInterceptor;

public AggregateStorage(Uri baseAddress, Storage primaryStorage, Storage[] secondaryStorage,
WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor)
: base(baseAddress)
{
_primaryStorage = primaryStorage;
_secondaryStorage = secondaryStorage;
_writeSecondaryStorageContentInterceptor = writeSecondaryStorageContentInterceptor;

BaseAddress = _primaryStorage.BaseAddress;
}

protected override Task OnSave(Uri resourceUri, StorageContent content, CancellationToken cancellationToken)

protected override Task OnCopyAsync(
Uri sourceUri,
IStorage destinationStorage,
Uri destinationUri,
IReadOnlyDictionary<string, string> destinationProperties,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

protected override Task OnSaveAsync(Uri resourceUri, StorageContent content, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
tasks.Add(_primaryStorage.SaveAsync(resourceUri, content, cancellationToken));
Expand All @@ -46,9 +56,9 @@ protected override Task OnSave(Uri resourceUri, StorageContent content, Cancella
if (_writeSecondaryStorageContentInterceptor != null)
{
secondaryContent = _writeSecondaryStorageContentInterceptor(
_primaryStorage.BaseAddress,
resourceUri,
storage.BaseAddress,
_primaryStorage.BaseAddress,
resourceUri,
storage.BaseAddress,
secondaryResourceUri, content);
}

Expand All @@ -58,12 +68,12 @@ protected override Task OnSave(Uri resourceUri, StorageContent content, Cancella
return Task.WhenAll(tasks);
}

protected override Task<StorageContent> OnLoad(Uri resourceUri, CancellationToken cancellationToken)
protected override Task<StorageContent> OnLoadAsync(Uri resourceUri, CancellationToken cancellationToken)
{
return _primaryStorage.LoadAsync(resourceUri, cancellationToken);
}

protected override Task OnDelete(Uri resourceUri, CancellationToken cancellationToken)
protected override Task OnDeleteAsync(Uri resourceUri, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
tasks.Add(_primaryStorage.DeleteAsync(resourceUri, cancellationToken));
Expand Down
19 changes: 15 additions & 4 deletions src/Catalog/Persistence/AggregateStorageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@ public class AggregateStorageFactory : StorageFactory
{
private readonly AggregateStorage.WriteSecondaryStorageContentInterceptor _writeSecondaryStorageContentInterceptor;

public AggregateStorageFactory(StorageFactory primaryStorageFactory, StorageFactory[] secondaryStorageFactories)
: this(primaryStorageFactory, secondaryStorageFactories, null)
public AggregateStorageFactory(
StorageFactory primaryStorageFactory,
StorageFactory[] secondaryStorageFactories,
bool verbose)
: this(
primaryStorageFactory,
secondaryStorageFactories,
writeSecondaryStorageContentInterceptor: null,
verbose: verbose)
{
}

public AggregateStorageFactory(StorageFactory primaryStorageFactory, StorageFactory[] secondaryStorageFactories,
AggregateStorage.WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor)
public AggregateStorageFactory(
StorageFactory primaryStorageFactory,
StorageFactory[] secondaryStorageFactories,
AggregateStorage.WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor,
bool verbose)
{
PrimaryStorageFactory = primaryStorageFactory;
SecondaryStorageFactories = secondaryStorageFactories;
_writeSecondaryStorageContentInterceptor = writeSecondaryStorageContentInterceptor;

BaseAddress = PrimaryStorageFactory.BaseAddress;
DestinationAddress = PrimaryStorageFactory.DestinationAddress;
Verbose = verbose;
}

public override Storage Create(string name = null)
Expand Down
10 changes: 5 additions & 5 deletions src/Catalog/Persistence/AzureCloudBlockBlob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public sealed class AzureCloudBlockBlob : ICloudBlockBlob

public AzureCloudBlockBlob(CloudBlockBlob blob)
{
if (blob == null)
{
throw new ArgumentNullException(nameof(blob));
}
_blob = blob ?? throw new ArgumentNullException(nameof(blob));
}

_blob = blob;
public Task<bool> ExistsAsync(CancellationToken cancellationToken)
{
return _blob.ExistsAsync();
}

public async Task FetchAttributesAsync(CancellationToken cancellationToken)
Expand Down

0 comments on commit 606ca27

Please sign in to comment.