Skip to content

Commit

Permalink
Code cleanup, optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
dbarkol committed Dec 13, 2018
1 parent dbcadcc commit 6a181c2
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 52 deletions.
1 change: 0 additions & 1 deletion FillCoordinates/Program.cs
Expand Up @@ -86,7 +86,6 @@ private static void InsertCoordinates(string partitionKey)
// This function will add 100 entities at a time. It will
// also update the partition key for each 1000 records so that
// they can be read at once.

var totalCoordinates = 0;
var partitionIndex = 0;
for (var c = 0; c < 1000; c++)
Expand Down
8 changes: 5 additions & 3 deletions ForecastGenerator/ForecastGenerator.csproj
Expand Up @@ -4,9 +4,11 @@
<AzureFunctionsVersion>v2</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="1.6.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.1" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.23" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="1.7.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="3.0.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.24" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Models\Models.csproj" />
Expand Down
104 changes: 84 additions & 20 deletions ForecastGenerator/SendCoordinates.cs
Expand Up @@ -2,13 +2,13 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.Azure.ServiceBus;
using Models;
using Newtonsoft.Json;

Expand All @@ -18,48 +18,55 @@ public static class SendCoordinates
{
#region Data Members

// Connection strings for service bus and storage queues
private static readonly string StorageConnectionString = Environment.GetEnvironmentVariable("StorageConnectionString");
private static readonly string ServiceBusConnectionString = Environment.GetEnvironmentVariable("ServiceBusConnectionString");

#endregion

#region Orchestrators

[FunctionName("SendCoordinates_Orchestrator")]
[FunctionName("SendCoordinatesOrchestrator")]
public static async Task Run([OrchestrationTrigger] DurableOrchestrationContext ctx)
{
// List of queues that we want to fill
string[] queueNumbers = {"1", "2", "3", "4", "5"};
// The number of queues that will be filled with
// forcasts requests.
const int numberOfQueues = 5;

// Kick of a task for each queue
// Kick off a task for each queue
var parallelTasks = new List<Task>();
foreach (var n in queueNumbers)
for (var n = 0; n < numberOfQueues; n++)
{
var t = ctx.CallSubOrchestratorAsync("Queue_Orchestrator", n);
var t = ctx.CallSubOrchestratorAsync("QueueOrchestrator", n+1);
parallelTasks.Add(t);
}

await Task.WhenAll(parallelTasks);
}

[FunctionName("Queue_Orchestrator")]
public static async Task SendMessagesToQueue(
[FunctionName("QueueOrchestrator")]
public static async Task QueueOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext ctx)
{
// Kick off the tasks to send messages to the queue
var queueNumber = ctx.GetInput<string>();
var parallelTasks = new List<Task>();

for (var i = 0; i < 100; i++)
{
// Pass the details about which queue and partition to use
// when sending the messages.
var partitionKey = $"{queueNumber}-{i}";
var queueName = $"queue{queueNumber}";

var details =
new ForecastMessageDetails
{
PartitionKey = partitionKey,
QueueName = $"queue{queueNumber}"
QueueName = queueName
};
Task t = ctx.CallActivityAsync("GenerateForecastActivity", JsonConvert.SerializeObject(details));

var t = ctx.CallActivityAsync("GenerateForecastStorageQueueActivity", JsonConvert.SerializeObject(details));
parallelTasks.Add(t);
}

Expand All @@ -68,21 +75,35 @@ public static async Task Run([OrchestrationTrigger] DurableOrchestrationContext

#endregion

#region Activities
#region Activitie Functions

[FunctionName("GenerateForecastActivity")]
public static async Task GenerateForecast([ActivityTrigger] string details)
/// <summary>
/// This activity function retrieves a list of coordinates from a table and
/// creates a message for each of them in a storage queue.
/// </summary>
/// <param name="details">Details about the partition key and queue to work with</param>
/// <returns></returns>
[FunctionName("GenerateForecastStorageQueueActivity")]
public static async Task GenerateForecastStorageQueueActivity([ActivityTrigger] string details)
{
// Deserialize the forecast details so that we can determine which partition
// key and queue to work with.
var messageDetails = JsonConvert.DeserializeObject<ForecastMessageDetails>(details);

// Get a reference to the table that will contain all the coordinates.
var storageAccount = CloudStorageAccount.Parse(StorageConnectionString);
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference("datapoints");
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(messageDetails.QueueName);
await table.CreateIfNotExistsAsync();

List<CoordinateEntity> coordinates = await GetCoordinatesFromStorage(messageDetails.PartitionKey, table);
// Get a reference to the storage queue that we will send messages to.
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(messageDetails.QueueName);
await queue.CreateIfNotExistsAsync();

// Get the coordinates from the table and add each one as a separate
// message to the queue.
var coordinates = await GetCoordinatesFromStorage(messageDetails.PartitionKey, table);
foreach (var c in coordinates)
{
var forecastRequest = new List<Coordinates> { new Coordinates { Latitude = c.Latitude, Longitude = c.Longitude } };
Expand All @@ -92,17 +113,60 @@ public static async Task GenerateForecast([ActivityTrigger] string details)
}
}

/// <summary>
/// This activity functon retrieves a list of coordinates from a table and
/// creates a message for each of them in a service bus queue.
/// </summary>
/// <param name="details">Details about the partition key and queue to work with</param>
/// <returns></returns>
[FunctionName("GenerateForecastServiceBusActivity")]
public static async Task GenerateForecastServiceBusActivity([ActivityTrigger] string details)
{
// Deserialize the forecast details so that we can determine which partion
// key and queue to work with.
var messageDetails = JsonConvert.DeserializeObject<ForecastMessageDetails>(details);

// Get a reference to the table that will contain all the coordinates.
var storageAccount = CloudStorageAccount.Parse(StorageConnectionString);
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference("datapoints");
await table.CreateIfNotExistsAsync();

// Get a reference to the service bus queue that we will send the messages to.
var queueClient = new Microsoft.Azure.ServiceBus.QueueClient(ServiceBusConnectionString, messageDetails.QueueName);

// Get the coordinates from the table and add each one as a separate message
// to the service bus queue.
var coordinates = await GetCoordinatesFromStorage(messageDetails.PartitionKey, table);
var messagesToSend = new List<Message>();
foreach (var c in coordinates)
{
var forecastRequest = new List<Coordinates> { new Coordinates { Latitude = c.Latitude, Longitude = c.Longitude } };
var serializedMessage = JsonConvert.SerializeObject(forecastRequest);
var message = new Message(Encoding.UTF8.GetBytes(serializedMessage));
messagesToSend.Add(message);
}

// Service bus allows you to add messages in a batch.
await queueClient.SendAsync(messagesToSend);
}

#endregion

#region Private Methods

private static async Task<List<CoordinateEntity>> GetCoordinatesFromStorage(string partitionKey, CloudTable table)
{
// Assumption: Each partition will contain a maximum of 1000 records.

// Format the query for retrieving records by a
// partition key. The assumption here is that there
// are no more than 1000 records for each partition.
var query = new TableQuery<CoordinateEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey));

List<CoordinateEntity> allEntities = new List<CoordinateEntity>();

// Retreive the records and add them to the collection
var allEntities = new List<CoordinateEntity>();
TableContinuationToken tableContinuationToken = null;
do
{
Expand Down
11 changes: 4 additions & 7 deletions ForecastGenerator/host.json
@@ -1,15 +1,12 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"sampling": {
"isEnabled": false
}
}
},
"extensions": {
"http": {
"routePrefix": ""
},
"durableTask": {
"PartitionCount": "16",
"azureStorageConnectionStringName": "DurableStorageConnectionString"
}
}
}
11 changes: 9 additions & 2 deletions ForecastProcessor/ForecastProcessor.csproj
Expand Up @@ -5,13 +5,20 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="3.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="3.0.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.0" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.24" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.2" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Models\Models.csproj" />
</ItemGroup>
<ItemGroup>
<Reference Include="Microsoft.Azure.WebJobs.EventHubs">
<HintPath>..\..\..\..\..\..\Users\davidbarkol\.nuget\packages\microsoft.azure.webjobs.extensions.eventhubs\3.0.1\lib\netstandard2.0\Microsoft.Azure.WebJobs.EventHubs.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand Down
46 changes: 28 additions & 18 deletions ForecastProcessor/GetForecasts.cs
Expand Up @@ -2,17 +2,18 @@
using System.Threading.Tasks;
using System.Net.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using Models;
using Newtonsoft.Json;

namespace ForecastProcessor
{
public static class GetForecasts
{
#region Private Data Members

// It is important that we instatiate this outside the scope of the function so
// It is important that we instantiate this outside the scope of the function so
// that it can be reused with each invocation.
private static readonly HttpClient Client = new HttpClient();

Expand All @@ -21,35 +22,44 @@ public static class GetForecasts

#endregion


[FunctionName("GetForecasts")]
public static async Task Run(
[QueueTrigger("%QueueName%", Connection = "QueueConnectionString")]Coordinates[] items, // collection of coordinates in the request
[CosmosDB(
databaseName: "weatherDatabase",
collectionName: "weatherCollection",
ConnectionStringSetting = "CosmosDBConnectionString")] IAsyncCollector<Forecast> documents, // output binding to CosmosDB
[QueueTrigger("%QueueName%", Connection = "QueueConnectionString")]Coordinates[] items,
[EventHub("%EventHubName%", Connection = "EventHubsConnectionString")] IAsyncCollector<Forecast> results,
ILogger log)
{
log.LogInformation("GetForecasts triggered");

// Iterate through the collection of coordinates, retrieve
// the forecast and then store it in a new document.
foreach (var coordinates in items)
{
// the forecast and then pass it along.
foreach (var c in items)
{
// Format the API request with the coordinates and API key
var apiRequest =
$"https://api.weather.com/v1/geocode/{coordinates.Latitude}/{coordinates.Longitude}/forecast/fifteenminute.json?language=en-US&units=e&apiKey={ApiKey}";
$"https://api.weather.com/v1/geocode/{c.Latitude}/{c.Longitude}/forecast/fifteenminute.json?language=en-US&units=e&apiKey={ApiKey}";

// Make the forecast request and read the response
var response = await Client.GetAsync(apiRequest);
var forecast = await response.Content.ReadAsStringAsync();
log.LogInformation(forecast);

// Important note:
// Uncomment this code if you have configured Event Hubs with
// enough throughput units to handle the ingress of messages. If
// this is being scaled out to roughly 5 functions apps (not instances)
// running in parallel, the default maximum limit of units (20) will
// not enough and request will be throttled.

// Add a new document with the forecast details
await documents.AddAsync(new Forecast
{
Latitude = coordinates.Latitude,
Longitude = coordinates.Longitude,
Result = forecast
});
// Send to an event hub
//await results.AddAsync(new Forecast
//{
// Longitude = c.Longitude,
// Latitude = c.Latitude,
// Result = forecast
//});
}

}
}
}
2 changes: 1 addition & 1 deletion scaletest.sln
Expand Up @@ -9,7 +9,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ForecastProcessor", "Foreca
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ForecastGenerator", "ForecastGenerator\ForecastGenerator.csproj", "{6F381DE0-1731-4274-B507-F4B448CC9077}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FillCoordinates", "FillCoordinates\FillCoordinates.csproj", "{E5ABA0CF-5802-4FAD-9105-C205B331352F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FillCoordinates", "FillCoordinates\FillCoordinates.csproj", "{E5ABA0CF-5802-4FAD-9105-C205B331352F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down

0 comments on commit 6a181c2

Please sign in to comment.