Skip to content

Commit

Permalink
Merge pull request #37 from Microsoft/fbeltrao/output-binding
Browse files Browse the repository at this point in the history
Initial output support
  • Loading branch information
ryancrawcour committed Mar 31, 2019
2 parents 85be3ca + 0c29b45 commit c23275d
Show file tree
Hide file tree
Showing 48 changed files with 1,313 additions and 230 deletions.
4 changes: 0 additions & 4 deletions samples/dotnet/ConsoleProducer/ConsoleProducer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-RC2" />
<PackageReference Include="librdkafka.redist" Version="1.0.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0.0-RC2" />
<PackageReference Include="Google.Protobuf" Version="3.7.0" />
</ItemGroup>
Expand All @@ -16,7 +15,4 @@
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Compile Remove="DeviceTelemetry.cs" />
</ItemGroup>
</Project>
43 changes: 0 additions & 43 deletions samples/dotnet/ConsoleProducer/DeviceTelemetry.cs

This file was deleted.

4 changes: 2 additions & 2 deletions samples/dotnet/ConsoleProducer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public static async Task Main(string[] args)
{
ITopicProducer producer;

// producer = new StringTopicProducer("broker:9092", "stringTopic", 100);
producer = new ProtobufTopicProducer("broker:9092", "protoUser", 100);
producer = new StringTopicProducer("localhost:9092", "stringTopic", 100);
// producer = new ProtobufTopicProducer("broker:9092", "protoUser", 100);
await producer.StartAsync();
}
catch (Exception ex)
Expand Down
1 change: 1 addition & 0 deletions samples/dotnet/ConsoleProducer/ProtobufTopicProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)

};


var builder = new ProducerBuilder<string, User>(config)
.SetValueSerializer(new ProtobufSerializer<User>());

Expand Down
20 changes: 10 additions & 10 deletions samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public static class AvroGenericTriggers
]
}";

[FunctionName(nameof(PageViews))]
public static void PageViews(
[KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData kafkaEvent,
ILogger logger)
{
if (kafkaEvent.Value is GenericRecord genericRecord)
{
logger.LogInformation($"{GenericToJson(genericRecord)}");
}
}
//[FunctionName(nameof(PageViews))]
//public static void PageViews(
// [KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData kafkaEvent,
// ILogger logger)
//{
// if (kafkaEvent.Value is GenericRecord genericRecord)
// {
// logger.LogInformation($"{GenericToJson(genericRecord)}");
// }
//}

public static string GenericToJson(GenericRecord record)
{
Expand Down
40 changes: 20 additions & 20 deletions samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@ namespace KafkaFunctionSample
/// </summary>
public static class AvroSpecificTriggers
{
[FunctionName(nameof(User))]
public static void User(
[KafkaTrigger("LocalBroker", "users", ValueType=typeof(UserRecord), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
}
}
//[FunctionName(nameof(User))]
//public static void User(
// [KafkaTrigger("LocalBroker", "users", ValueType=typeof(UserRecord), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
// ILogger logger)
//{
// foreach (var kafkaEvent in kafkaEvents)
// {
// logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
// }
//}

[FunctionName(nameof(PageViewsFemale))]
public static void PageViewsFemale(
[KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType=typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
ILogger logger)
{
foreach (var ke in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(ke.Value)}");
}
}
//[FunctionName(nameof(PageViewsFemale))]
//public static void PageViewsFemale(
// [KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType=typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
// ILogger logger)
//{
// foreach (var ke in kafkaEvents)
// {
// logger.LogInformation($"{JsonConvert.SerializeObject(ke.Value)}");
// }
//}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<AzureFunctionsVersion>v2</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-RC2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.26" />
<PackageReference Include="Google.Protobuf" Version="3.7.0" />
</ItemGroup>
Expand Down
39 changes: 39 additions & 0 deletions samples/dotnet/KafkaFunctionSample/ProduceStringTopic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;

namespace KafkaFunctionSample
{
public static class ProduceStringTopic
{
[FunctionName("ProduceStringTopic")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("stringTopicTenPartitions", BrokerList = "LocalBroker")] IAsyncCollector<KafkaEventData> events,
ILogger log)
{
try
{
var kafkaEvent = new KafkaEventData()
{
Value = await new StreamReader(req.Body).ReadToEndAsync(),
};

await events.AddAsync(kafkaEvent);
}
catch (Exception ex)
{
throw new Exception("Are you sure the topic 'stringTopicTenPartitions' exists? To created using Confluent Docker quickstart run this command: 'docker-compose exec broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 10 --topic stringTopicTenPartitions'", ex);
}

return new OkResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
private readonly IConverterManager converterManager;
private readonly INameResolver nameResolver;
private readonly IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration;
private readonly IKafkaProducerManager kafkaProducerManager;

public KafkaExtensionConfigProvider(
IConfiguration config,
IOptions<KafkaOptions> options,
ILoggerFactory loggerFactory,
IConverterManager converterManager,
INameResolver nameResolver,
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration)
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration,
IKafkaProducerManager kafkaProducerManager)
{
this.config = config;
this.options = options;
this.loggerFactory = loggerFactory;
this.converterManager = converterManager;
this.nameResolver = nameResolver;
this.configuration = configuration;
this.kafkaProducerManager = kafkaProducerManager;
}

public void Initialize(ExtensionConfigContext context)
Expand All @@ -60,6 +63,15 @@ public void Initialize(ExtensionConfigContext context)
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(this.config, this.options, this.converterManager, this.nameResolver, this.loggerFactory);
context.AddBindingRule<KafkaTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

// register output binding
context.AddBindingRule<KafkaAttribute>()
.BindToCollector(BuildCollectorFromAttribute);
}

private IAsyncCollector<KafkaEventData> BuildCollectorFromAttribute(KafkaAttribute attribute)
{
return new KafkaAsyncCollector(attribute.Topic, this.kafkaProducerManager.Resolve(attribute));
}

private ISpecificRecord ConvertKafkaEventData2AvroSpecific(KafkaEventData kafkaEventData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<Kafk
configure(options);
});

builder.Services.AddSingleton<IKafkaProducerManager, KafkaProducerManager>();

return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public class KafkaEventData
public DateTime Timestamp { get; set; }
public object Value { get; set; }

public KafkaEventData()
{
}

public KafkaEventData(byte[] bytes)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,21 +337,28 @@ private async Task SafeCloseConsumerAsync()
{
return;
}

// Stop subscriber thread
this.cancellationTokenSource.Cancel();

// Stop function executor
await this.functionExecutor?.CloseAsync(TimeSpan.FromMilliseconds(TimeToWaitForRunningProcessToEnd));
try
{
this.isClosed = true;

// Wait for subscriber thread to end
await this.subscriberFinished?.WaitAsync(TimeToWaitForRunningProcessToEnd);
// Stop subscriber thread
this.cancellationTokenSource.Cancel();

this.isClosed = true;
// Stop function executor
await this.functionExecutor?.CloseAsync(TimeSpan.FromMilliseconds(TimeToWaitForRunningProcessToEnd));

this.consumer?.Unsubscribe();
this.consumer?.Dispose();
this.functionExecutor?.Dispose();
// Wait for subscriber thread to end
await this.subscriberFinished?.WaitAsync(TimeToWaitForRunningProcessToEnd);

this.consumer?.Unsubscribe();
this.consumer?.Dispose();
this.functionExecutor?.Dispose();
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to close Kafka listener");
}
}

protected virtual void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.Kafka</RootNamespace>
<PackageId>Microsoft.Azure.WebJobs.Extensions.Kafka</PackageId>
<Description>Microsoft Azure WebJobs SDK Kafka Extension</Description>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
Expand Down Expand Up @@ -35,4 +36,12 @@
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Folder Include="Config\" />
<Folder Include="Listeners\" />
<Folder Include="Properties\" />
<Folder Include="Trigger\" />
<Folder Include="Serialization\" />
<Folder Include="Output\" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public interface IKafkaProducer
{
Task ProduceAsync(string topic, KafkaEventData item, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
/// <summary>
/// Manages <see cref="IKafkaProducer"/>
/// </summary>
public interface IKafkaProducerManager
{
IKafkaProducer Resolve(KafkaAttribute attribute);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public class KafkaAsyncCollector : IAsyncCollector<KafkaEventData>
{
private readonly string topic;
private readonly IKafkaProducer producer;

public KafkaAsyncCollector(string topic, IKafkaProducer producer)
{
this.topic = topic;
this.producer = producer;
}

public async Task AddAsync(KafkaEventData item, CancellationToken cancellationToken = default)
{
await this.producer.ProduceAsync(this.topic, item, cancellationToken);
}

public Task FlushAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}
Loading

0 comments on commit c23275d

Please sign in to comment.