Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Catalog2Dnx: improve throughput #335

Merged
merged 1 commit into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
224 changes: 192 additions & 32 deletions src/Catalog/Dnx/DnxCatalogCollector.cs

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/Catalog/Dnx/DnxConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using NuGet.Services.Metadata.Catalog.Persistence;

namespace NuGet.Services.Metadata.Catalog.Dnx
{
internal static class DnxConstants
{
internal const string ApplicationOctetStreamContentType = "application/octet-stream";
internal const string DefaultCacheControl = "max-age=120";

internal static readonly IReadOnlyDictionary<string, string> RequiredBlobProperties = new Dictionary<string, string>()
{
{ StorageConstants.CacheControl, DefaultCacheControl },
{ StorageConstants.ContentType, ApplicationOctetStreamContentType }
};
}
}
85 changes: 73 additions & 12 deletions src/Catalog/Dnx/DnxMaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public DnxMaker(StorageFactory storageFactory)
public async Task<DnxEntry> AddPackageAsync(
Stream nupkgStream,
string nuspec,
string id,
string version,
string packageId,
string normalizedPackageVersion,
CancellationToken cancellationToken)
{
if (nupkgStream == null)
Expand All @@ -40,23 +40,57 @@ public DnxMaker(StorageFactory storageFactory)
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(nuspec));
}

if (string.IsNullOrEmpty(id))
if (string.IsNullOrEmpty(packageId))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(id));
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageId));
}

if (string.IsNullOrEmpty(version))
if (string.IsNullOrEmpty(normalizedPackageVersion))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(version));
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(normalizedPackageVersion));
}

cancellationToken.ThrowIfCancellationRequested();

var storage = _storageFactory.Create(id);
var normalizedVersion = NuGetVersionUtility.NormalizeVersion(version);
var storage = _storageFactory.Create(packageId);
var nuspecUri = await SaveNuspecAsync(storage, packageId, normalizedPackageVersion, nuspec, cancellationToken);
var nupkgUri = await SaveNupkgAsync(nupkgStream, storage, packageId, normalizedPackageVersion, cancellationToken);

return new DnxEntry(nupkgUri, nuspecUri);
}

public async Task<DnxEntry> AddPackageAsync(
IStorage sourceStorage,
string nuspec,
string packageId,
string normalizedPackageVersion,
CancellationToken cancellationToken)
{
if (sourceStorage == null)
{
throw new ArgumentNullException(nameof(sourceStorage));
}

if (string.IsNullOrEmpty(nuspec))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(nuspec));
}

var nuspecUri = await SaveNuspecAsync(storage, id, normalizedVersion, nuspec, cancellationToken);
var nupkgUri = await SaveNupkgAsync(nupkgStream, storage, id, normalizedVersion, cancellationToken);
if (string.IsNullOrEmpty(packageId))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageId));
}

if (string.IsNullOrEmpty(normalizedPackageVersion))
{
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(normalizedPackageVersion));
}

cancellationToken.ThrowIfCancellationRequested();

var destinationStorage = _storageFactory.Create(packageId);
var nuspecUri = await SaveNuspecAsync(destinationStorage, packageId, normalizedPackageVersion, nuspec, cancellationToken);
var nupkgUri = await CopyNupkgAsync(sourceStorage, destinationStorage, packageId, normalizedPackageVersion, cancellationToken);

return new DnxEntry(nupkgUri, nuspecUri);
}
Expand Down Expand Up @@ -111,8 +145,9 @@ private async Task<Uri> SaveNuspecAsync(Storage storage, string id, string versi
{
var relativeAddress = GetRelativeAddressNuspec(id, version);
var nuspecUri = new Uri(storage.BaseAddress, relativeAddress);
var content = new StringStorageContent(nuspec, "text/xml", DnxConstants.DefaultCacheControl);

await storage.SaveAsync(nuspecUri, new StringStorageContent(nuspec, "text/xml", "max-age=120"), cancellationToken);
await storage.SaveAsync(nuspecUri, content, cancellationToken);

return nuspecUri;
}
Expand Down Expand Up @@ -195,10 +230,36 @@ private StorageContent CreateContent(IEnumerable<string> versions)
private async Task<Uri> SaveNupkgAsync(Stream nupkgStream, Storage storage, string id, string version, CancellationToken cancellationToken)
{
Uri nupkgUri = new Uri(storage.BaseAddress, GetRelativeAddressNupkg(id, version));
await storage.SaveAsync(nupkgUri, new StreamStorageContent(nupkgStream, "application/octet-stream", "max-age=120"), cancellationToken);
var content = new StreamStorageContent(
nupkgStream,
DnxConstants.ApplicationOctetStreamContentType,
DnxConstants.DefaultCacheControl);

await storage.SaveAsync(nupkgUri, content, cancellationToken);

return nupkgUri;
}

private async Task<Uri> CopyNupkgAsync(
IStorage sourceStorage,
Storage destinationStorage,
string id, string version, CancellationToken cancellationToken)
{
var packageFileName = PackageUtility.GetPackageFileName(id, version);
var sourceUri = sourceStorage.ResolveUri(packageFileName);
var destinationRelativeUri = GetRelativeAddressNupkg(id, version);
var destinationUri = destinationStorage.ResolveUri(destinationRelativeUri);

await sourceStorage.CopyAsync(
sourceUri,
destinationStorage,
destinationUri,
DnxConstants.RequiredBlobProperties,
cancellationToken);

return destinationUri;
}

private async Task DeleteNuspecAsync(Storage storage, string id, string version, CancellationToken cancellationToken)
{
string relativeAddress = GetRelativeAddressNuspec(id, version);
Expand Down
4 changes: 2 additions & 2 deletions src/Catalog/Helpers/PackageUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NuGet.Services.Metadata.Catalog.Helpers
{
public static class PackageUtility
{
public static string GetPackageFileNameLowercase(string packageId, string packageVersion)
public static string GetPackageFileName(string packageId, string packageVersion)
{
if (string.IsNullOrEmpty(packageId))
{
Expand All @@ -19,7 +19,7 @@ public static string GetPackageFileNameLowercase(string packageId, string packag
throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(packageVersion));
}

return $"{packageId.ToLowerInvariant()}.{packageVersion.ToLowerInvariant()}.nupkg";
return $"{packageId}.{packageVersion}.nupkg";
}
}
}
6 changes: 6 additions & 0 deletions src/Catalog/NuGet.Services.Metadata.Catalog.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
<Reference Include="Microsoft.WindowsAzure.Storage, Version=8.2.1.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\WindowsAzure.Storage.8.2.1\lib\net45\Microsoft.WindowsAzure.Storage.dll</HintPath>
</Reference>
<Reference Include="Microsoft.WindowsAzure.Storage.DataMovement, Version=0.6.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.Azure.Storage.DataMovement.0.6.0\lib\net45\Microsoft.WindowsAzure.Storage.DataMovement.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<HintPath>..\..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
<Private>True</Private>
Expand Down Expand Up @@ -173,6 +176,7 @@
<Compile Include="CommitCollector.cs" />
<Compile Include="CommitMetadata.cs" />
<Compile Include="Constants.cs" />
<Compile Include="Dnx\DnxConstants.cs" />
<Compile Include="Dnx\DnxEntry.cs" />
<Compile Include="Helpers\AsyncExtensions.cs" />
<Compile Include="Helpers\PackageUtility.cs" />
Expand All @@ -192,6 +196,8 @@
<Compile Include="Persistence\IAzureStorage.cs" />
<Compile Include="Persistence\ICloudBlockBlob.cs" />
<Compile Include="Persistence\NamedStorageFactory.cs" />
<Compile Include="Persistence\OptimisticConcurrencyControlToken.cs" />
<Compile Include="Persistence\StorageConstants.cs" />
<Compile Include="ReadOnlyGraph.cs" />
<Compile Include="Persistence\StorageListItem.cs" />
<Compile Include="Registration\FlatContainerPackagePathProvider.cs" />
Expand Down
33 changes: 23 additions & 10 deletions src/Catalog/PackageCatalogItemCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,30 @@ public sealed class PackageCatalogItemCreator : IPackageCatalogItemCreator
return item;
}

private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDetails packageItem, CancellationToken cancellationToken)
private async Task<PackageCatalogItem> GetPackageViaStorageAsync(
FeedPackageDetails packageItem,
CancellationToken cancellationToken)
{
PackageCatalogItem item = null;
var packageFileName = PackageUtility.GetPackageFileNameLowercase(packageItem.PackageId, packageItem.PackageVersion);
var blob = await _storage.GetCloudBlockBlobReferenceAsync(packageFileName);
var packageId = packageItem.PackageId.ToLowerInvariant();
var packageVersion = packageItem.PackageVersion.ToLowerInvariant();
var packageFileName = PackageUtility.GetPackageFileName(packageId, packageVersion);
var blobUri = _storage.ResolveUri(packageFileName);
var blob = await _storage.GetCloudBlockBlobReferenceAsync(blobUri);

if (blob == null)
if (!await blob.ExistsAsync(cancellationToken))
{
_telemetryService.TrackMetric(
TelemetryConstants.NonExistentBlob,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));

return item;
}

using (_telemetryService.TrackDuration(
TelemetryConstants.PackageBlobReadSeconds,
GetProperties(packageItem, blob: null)))
GetProperties(packageId, packageVersion, blob: null)))
{
await blob.FetchAttributesAsync(cancellationToken);

Expand Down Expand Up @@ -146,7 +151,7 @@ private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDeta
_telemetryService.TrackMetric(
TelemetryConstants.BlobModified,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));
}
}
}
Expand All @@ -155,7 +160,7 @@ private async Task<PackageCatalogItem> GetPackageViaStorageAsync(FeedPackageDeta
_telemetryService.TrackMetric(
TelemetryConstants.NonExistentPackageHash,
metric: 1,
properties: GetProperties(packageItem, blob));
properties: GetProperties(packageId, packageVersion, blob));
}
}

Expand Down Expand Up @@ -223,11 +228,19 @@ private async Task<PackageCatalogItem> GetPackageViaHttpAsync(FeedPackageDetails
}

private static Dictionary<string, string> GetProperties(FeedPackageDetails packageItem, ICloudBlockBlob blob)
{
return GetProperties(
packageItem.PackageId.ToLowerInvariant(),
packageItem.PackageVersion.ToLowerInvariant(),
blob);
}

private static Dictionary<string, string> GetProperties(string packageId, string packageVersion, ICloudBlockBlob blob)
{
var properties = new Dictionary<string, string>()
{
{ TelemetryConstants.Id, packageItem.PackageId.ToLowerInvariant() },
{ TelemetryConstants.Version, packageItem.PackageVersion.ToLowerInvariant() }
{ TelemetryConstants.Id, packageId },
{ TelemetryConstants.Version, packageVersion }
};

if (blob != null)
Expand Down
34 changes: 22 additions & 12 deletions src/Catalog/Persistence/AggregateStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,38 @@ namespace NuGet.Services.Metadata.Catalog.Persistence
public class AggregateStorage : Storage
{
public delegate StorageContent WriteSecondaryStorageContentInterceptor(
Uri primaryStorageBaseUri,
Uri primaryResourceUri,
Uri primaryStorageBaseUri,
Uri primaryResourceUri,
Uri secondaryStorageBaseUri,
Uri secondaryResourceUri,
Uri secondaryResourceUri,
StorageContent content);

private readonly Storage _primaryStorage;
private readonly Storage[] _secondaryStorage;
private readonly WriteSecondaryStorageContentInterceptor _writeSecondaryStorageContentInterceptor;

public AggregateStorage(Uri baseAddress, Storage primaryStorage, Storage[] secondaryStorage,
WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor)
: base(baseAddress)
{
_primaryStorage = primaryStorage;
_secondaryStorage = secondaryStorage;
_writeSecondaryStorageContentInterceptor = writeSecondaryStorageContentInterceptor;

BaseAddress = _primaryStorage.BaseAddress;
}

protected override Task OnSave(Uri resourceUri, StorageContent content, CancellationToken cancellationToken)

protected override Task OnCopyAsync(
Uri sourceUri,
IStorage destinationStorage,
Uri destinationUri,
IReadOnlyDictionary<string, string> destinationProperties,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

protected override Task OnSaveAsync(Uri resourceUri, StorageContent content, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
tasks.Add(_primaryStorage.SaveAsync(resourceUri, content, cancellationToken));
Expand All @@ -46,9 +56,9 @@ protected override Task OnSave(Uri resourceUri, StorageContent content, Cancella
if (_writeSecondaryStorageContentInterceptor != null)
{
secondaryContent = _writeSecondaryStorageContentInterceptor(
_primaryStorage.BaseAddress,
resourceUri,
storage.BaseAddress,
_primaryStorage.BaseAddress,
resourceUri,
storage.BaseAddress,
secondaryResourceUri, content);
}

Expand All @@ -58,12 +68,12 @@ protected override Task OnSave(Uri resourceUri, StorageContent content, Cancella
return Task.WhenAll(tasks);
}

protected override Task<StorageContent> OnLoad(Uri resourceUri, CancellationToken cancellationToken)
protected override Task<StorageContent> OnLoadAsync(Uri resourceUri, CancellationToken cancellationToken)
{
return _primaryStorage.LoadAsync(resourceUri, cancellationToken);
}

protected override Task OnDelete(Uri resourceUri, CancellationToken cancellationToken)
protected override Task OnDeleteAsync(Uri resourceUri, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
tasks.Add(_primaryStorage.DeleteAsync(resourceUri, cancellationToken));
Expand Down
19 changes: 15 additions & 4 deletions src/Catalog/Persistence/AggregateStorageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,31 @@ public class AggregateStorageFactory : StorageFactory
{
private readonly AggregateStorage.WriteSecondaryStorageContentInterceptor _writeSecondaryStorageContentInterceptor;

public AggregateStorageFactory(StorageFactory primaryStorageFactory, StorageFactory[] secondaryStorageFactories)
: this(primaryStorageFactory, secondaryStorageFactories, null)
public AggregateStorageFactory(
StorageFactory primaryStorageFactory,
StorageFactory[] secondaryStorageFactories,
bool verbose)
: this(
primaryStorageFactory,
secondaryStorageFactories,
writeSecondaryStorageContentInterceptor: null,
verbose: verbose)
{
}

public AggregateStorageFactory(StorageFactory primaryStorageFactory, StorageFactory[] secondaryStorageFactories,
AggregateStorage.WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor)
public AggregateStorageFactory(
StorageFactory primaryStorageFactory,
StorageFactory[] secondaryStorageFactories,
AggregateStorage.WriteSecondaryStorageContentInterceptor writeSecondaryStorageContentInterceptor,
bool verbose)
{
PrimaryStorageFactory = primaryStorageFactory;
SecondaryStorageFactories = secondaryStorageFactories;
_writeSecondaryStorageContentInterceptor = writeSecondaryStorageContentInterceptor;

BaseAddress = PrimaryStorageFactory.BaseAddress;
DestinationAddress = PrimaryStorageFactory.DestinationAddress;
Verbose = verbose;
}

public override Storage Create(string name = null)
Expand Down
10 changes: 5 additions & 5 deletions src/Catalog/Persistence/AzureCloudBlockBlob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public sealed class AzureCloudBlockBlob : ICloudBlockBlob

public AzureCloudBlockBlob(CloudBlockBlob blob)
{
if (blob == null)
{
throw new ArgumentNullException(nameof(blob));
}
_blob = blob ?? throw new ArgumentNullException(nameof(blob));
}

_blob = blob;
public Task<bool> ExistsAsync(CancellationToken cancellationToken)
{
return _blob.ExistsAsync();
}

public async Task FetchAttributesAsync(CancellationToken cancellationToken)
Expand Down