Permalink
Browse files

Initial code commit

  • Loading branch information...
tyrotoxin committed May 4, 2018
1 parent 369e7f1 commit db360b4d1dfda5d65215cd1349c79ca67ccfa9a5
Showing 390 changed files with 21,055 additions and 0 deletions.
@@ -0,0 +1,4 @@
.vs/
bin/
obj/
.user
@@ -0,0 +1,8 @@
using System.Reflection;
[assembly: AssemblyCompany("D-ASYNC LLC")]
[assembly: AssemblyCopyright("Copyright © 2018")]
[assembly: AssemblyTitle("Dasync.FabricConnector.AzureStorage")]
[assembly: AssemblyProduct("Dasync.FabricConnector.AzureStorage")]
[assembly: AssemblyVersion("0.1.0.0")]
[assembly: AssemblyFileVersion("0.1.0.0")]
@@ -0,0 +1,222 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Dasync.AzureStorage;
using Dasync.EETypes;
using Dasync.EETypes.Descriptors;
using Dasync.EETypes.Fabric;
using Dasync.EETypes.Intents;
using Dasync.Serialization;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Dasync.FabricConnector.AzureStorage
{
public class AzureStorageFabricConnector : IFabricConnector
{
private static readonly List<string> RoutineRecordPropertiesForPolling =
new List<string>
{
nameof(RoutineRecord.Status),
nameof(RoutineRecord.Result)
};
private readonly ServiceId _serviceId;
private readonly INumericIdGenerator _idGenerator;
private readonly ICloudQueue _transitionsQueue;
private readonly ICloudTable _routinesTable;
private readonly ISerializer _serializer;
public AzureStorageFabricConnector(
ServiceId serviceId,
INumericIdGenerator idGenerator,
ICloudQueue transitionsQueue,
ICloudTable routinesTable,
ISerializer serializer)
{
_serviceId = serviceId;
_idGenerator = idGenerator;
_transitionsQueue = transitionsQueue;
_routinesTable = routinesTable;
_serializer = serializer;
}
public async Task<ActiveRoutineInfo> ScheduleRoutineAsync(
ExecuteRoutineIntent intent, CancellationToken ct)
{
var pregeneratedRoutineId = intent.Id.ToString();
var transitionDescriptor = new TransitionDescriptor
{
Type = TransitionType.InvokeRoutine
};
var routineDescriptor = new RoutineDescriptor
{
IntentId = intent.Id,
MethodId = intent.MethodId,
RoutineId = pregeneratedRoutineId
};
var message = PackMessage(
("transition", transitionDescriptor),
("serviceId", intent.ServiceId),
("routine", routineDescriptor),
("parameters", intent.Parameters),
("continuation", intent.Continuation),
("caller", intent.Caller));
while (true)
{
try
{
await _transitionsQueue.AddMessageAsync(message, null, null, ct);
break;
}
catch (QueueDoesNotExistException)
{
await _transitionsQueue.CreateAsync(ct);
}
}
return new ActiveRoutineInfo
{
RoutineId = pregeneratedRoutineId
};
}
public async Task<ActiveRoutineInfo> ScheduleContinuationAsync(
ContinueRoutineIntent intent, CancellationToken ct)
{
var transitionDescriptor = new TransitionDescriptor
{
Type = TransitionType.ContinueRoutine
};
var message = PackMessage(
("transition", transitionDescriptor),
("serviceId", intent.Continuation.ServiceId),
("routine", intent.Continuation.Routine),
("result", intent.Result),
("callee", intent.Callee));
TimeSpan? delay = null;
if (intent.Continuation.ContinueAt.HasValue)
{
delay = intent.Continuation.ContinueAt.Value.ToUniversalTime() - DateTime.UtcNow;
if (delay <= TimeSpan.Zero)
delay = null;
}
while (true)
{
try
{
await _transitionsQueue.AddMessageAsync(message, null, delay, ct);
break;
}
catch (QueueDoesNotExistException)
{
await _transitionsQueue.CreateAsync(ct);
}
}
return new ActiveRoutineInfo
{
RoutineId = intent.Continuation.Routine.RoutineId,
ETag = intent.Continuation.Routine.ETag
};
}
public async Task<ActiveRoutineInfo> PollRoutineResultAsync(
ActiveRoutineInfo info, CancellationToken ct)
{
RoutineRecord routineRecord;
try
{
routineRecord = await _routinesTable.TryRetrieveAsync<RoutineRecord>(
_serviceId.ServiceName, info.RoutineId, RoutineRecordPropertiesForPolling, ct);
}
catch (TableDoesNotExistException)
{
routineRecord = null;
}
if (routineRecord != null)
{
TaskResult routineResult = null;
if (routineRecord.Status == (int)RoutineStatus.Complete &&
!string.IsNullOrEmpty(routineRecord.Result))
routineResult = _serializer.Deserialize<TaskResult>(routineRecord.Result);
info = new ActiveRoutineInfo
{
ETag = routineRecord.ETag,
RoutineId = info.RoutineId,
Result = routineResult
};
}
return info;
}
private CloudQueueMessage PackMessage(params (string key, object value)[] parts)
{
var delimiterId = _idGenerator.NewId();
if (_serializer is ITextSerializer)
{
using (var textWriter = new StringWriter())
{
using (var messageWriter = new MultipartMessageWriter(
textWriter, _serializer, delimiterId))
{
foreach (var (key, value) in parts)
if (value != null)
messageWriter.Write(key, value);
}
return new CloudQueueMessage(textWriter.ToString());
}
}
else
{
using (var memoryStream = new MemoryStream())
{
using (var messageWriter = new MultipartMessageWriter(
memoryStream, _serializer, delimiterId))
{
foreach (var (key, value) in parts)
if (value != null)
messageWriter.Write(key, value);
}
var data = memoryStream.ToArray();
var message = new CloudQueueMessage(string.Empty);
message.SetMessageContent(data);
return message;
}
}
}
}
public class AzureStorageFabricConnectorWithConfiguration
: AzureStorageFabricConnector, IFabricConnectorWithConfiguration
{
public AzureStorageFabricConnectorWithConfiguration(
ServiceId serviceId,
INumericIdGenerator idGenerator,
ICloudQueue transitionsQueue,
ICloudTable routinesTable,
ISerializer serializer,
AzureStorageFabricConnectorConfiguration originalConfiguration)
: base(serviceId, idGenerator, transitionsQueue, routinesTable, serializer)
{
Configuration = originalConfiguration;
}
public string ConnectorType => "AzureStorage";
public object Configuration { get; }
}
}
@@ -0,0 +1,15 @@
namespace Dasync.FabricConnector.AzureStorage
{
public class AzureStorageFabricConnectorConfiguration
{
public string StorageAccountName { get; set; }
public string TransitionsQueueName { get; set; }
public string RoutinesTableName { get; set; }
public string ServicesTableName { get; set; }
public string SerializerFormat { get; set; }
}
}
@@ -0,0 +1,49 @@
using System;
using Dasync.AzureStorage;
using Dasync.EETypes;
using Dasync.EETypes.Fabric;
using Dasync.Serialization;
namespace Dasync.FabricConnector.AzureStorage
{
public class AzureStorageFabricConnectorFactory : IFabricConnectorFactory
{
private readonly ISerializerFactorySelector _serializerFactorySelector;
private readonly INumericIdGenerator _idGenerator;
private readonly ICloudStorageAccountFactory _cloudStorageAccountFactory;
private readonly IStorageAccontConnectionStringResolver _storageAccontConnectionStringResolver;
public AzureStorageFabricConnectorFactory(
ISerializerFactorySelector serializerFactorySelector,
INumericIdGenerator idGenerator,
ICloudStorageAccountFactory cloudStorageAccountFactory,
IStorageAccontConnectionStringResolver storageAccontConnectionStringResolver)
{
_serializerFactorySelector = serializerFactorySelector;
_idGenerator = idGenerator;
_cloudStorageAccountFactory = cloudStorageAccountFactory;
_storageAccontConnectionStringResolver = storageAccontConnectionStringResolver;
}
public string ConnectorType => "AzureStorage";
public IFabricConnector Create(ServiceId serviceId, object configuration)
{
if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
var config = (AzureStorageFabricConnectorConfiguration)configuration;
var serializerFactory = _serializerFactorySelector.Select(config.SerializerFormat);
var serializer = serializerFactory.Create();
var connectionString = _storageAccontConnectionStringResolver.Resolve(config.StorageAccountName);
var storageAccount = _cloudStorageAccountFactory.Create(connectionString);
var transitionsQueue = storageAccount.QueueClient.GetQueueReference(config.TransitionsQueueName);
var routinesTable = storageAccount.TableClient.GetTableReference(config.RoutinesTableName);
return new AzureStorageFabricConnectorWithConfiguration(
serviceId, _idGenerator, transitionsQueue, routinesTable, serializer, config);
}
}
}
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using Dasync.EETypes.Fabric;
namespace Dasync.FabricConnector.AzureStorage
{
public static class DI
{
public static readonly Dictionary<Type, Type> Bindings = new Dictionary<Type, Type>
{
[typeof(IFabricConnectorFactory)] = typeof(AzureStorageFabricConnectorFactory),
};
}
}
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{4400C0E4-7820-4655-98D7-BD91CB8DA5B5}</ProjectGuid>
<OutputType>Library</OutputType>
<FileAlignment>512</FileAlignment>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Dasync.FabricConnector.AzureStorage</RootNamespace>
<AssemblyName>Dasync.FabricConnector.AzureStorage</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FrameworkFolder>netframework4.5</FrameworkFolder>
<OutputPath>bin\$(Configuration)\$(FrameworkFolder)\</OutputPath>
<BaseIntermediateOutputPath>obj\$(Configuration)\$(FrameworkFolder)\</BaseIntermediateOutputPath>
<IntermediateOutputPath>obj\$(Configuration)\$(FrameworkFolder)\</IntermediateOutputPath>
<DocumentationFile>bin\$(Configuration)\$(FrameworkFolder)\$(AssemblyName).xml</DocumentationFile>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<DefineConstants>DEBUG;TRACE;NETFX;NETFX45</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<DefineConstants>TRACE;NETFX;NETFX45</DefineConstants>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Compile Include="AssemblyInfo.cs" />
<Compile Include="AzureStorageFabricConnector.cs" />
<Compile Include="AzureStorageFabricConnectorConfiguration.cs" />
<Compile Include="AzureStorageFabricConnectorFactory.cs" />
<Compile Include="IStorageAccontConnectionStringResolver.cs" />
<Compile Include="MultipartMessage.cs" />
<Compile Include="DI.cs" />
<Compile Include="RoutineRecord.cs" />
<Compile Include="RoutineStatus.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.ValueTuple">
<Version>4.4.0</Version>
</PackageReference>
<PackageReference Include="WindowsAzure.Storage">
<Version>7.2.1</Version>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Data\Serialization\Dasync.Serialization.Fx450.csproj">
<Project>{d52dec50-984d-4089-b09b-c272348e820f}</Project>
<Name>Dasync.Serialization.Fx450</Name>
</ProjectReference>
<ProjectReference Include="..\..\Data\ValueContainer\Dasync.ValueContainer.Fx450.csproj">
<Project>{e1077c7c-048b-46da-b8a4-b85a30d6e00f}</Project>
<Name>Dasync.ValueContainer.Fx450</Name>
</ProjectReference>
<ProjectReference Include="..\..\Engine\EETypes\Dasync.EETypes.Fx450.csproj">
<Project>{e10b9109-b721-492e-b32d-048bc8f147c6}</Project>
<Name>Dasync.EETypes.Fx450</Name>
</ProjectReference>
<ProjectReference Include="..\Storage\Dasync.AzureStorage.Fx450.csproj">
<Project>{e952af37-734c-458f-8202-8d639367b96e}</Project>
<Name>Dasync.AzureStorage.Fx450</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>
Oops, something went wrong.

0 comments on commit db360b4

Please sign in to comment.