Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial output support #37

Merged
merged 1 commit into from
Mar 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions samples/dotnet/ConsoleProducer/ConsoleProducer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-RC2" />
<PackageReference Include="librdkafka.redist" Version="1.0.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0.0-RC2" />
<PackageReference Include="Google.Protobuf" Version="3.7.0" />
</ItemGroup>
Expand All @@ -16,7 +15,4 @@
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Compile Remove="DeviceTelemetry.cs" />
</ItemGroup>
</Project>
43 changes: 0 additions & 43 deletions samples/dotnet/ConsoleProducer/DeviceTelemetry.cs

This file was deleted.

4 changes: 2 additions & 2 deletions samples/dotnet/ConsoleProducer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public static async Task Main(string[] args)
{
ITopicProducer producer;

// producer = new StringTopicProducer("broker:9092", "stringTopic", 100);
producer = new ProtobufTopicProducer("broker:9092", "protoUser", 100);
producer = new StringTopicProducer("localhost:9092", "stringTopic", 100);
// producer = new ProtobufTopicProducer("broker:9092", "protoUser", 100);
await producer.StartAsync();
}
catch (Exception ex)
Expand Down
1 change: 1 addition & 0 deletions samples/dotnet/ConsoleProducer/ProtobufTopicProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)

};


var builder = new ProducerBuilder<string, User>(config)
.SetValueSerializer(new ProtobufSerializer<User>());

Expand Down
20 changes: 10 additions & 10 deletions samples/dotnet/KafkaFunctionSample/AvroGenericTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public static class AvroGenericTriggers
]
}";

[FunctionName(nameof(PageViews))]
public static void PageViews(
[KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData kafkaEvent,
ILogger logger)
{
if (kafkaEvent.Value is GenericRecord genericRecord)
{
logger.LogInformation($"{GenericToJson(genericRecord)}");
}
}
//[FunctionName(nameof(PageViews))]
//public static void PageViews(
// [KafkaTrigger("LocalBroker", "pageviews", AvroSchema = PageViewsSchema, ConsumerGroup = "azfunc")] KafkaEventData kafkaEvent,
// ILogger logger)
//{
// if (kafkaEvent.Value is GenericRecord genericRecord)
// {
// logger.LogInformation($"{GenericToJson(genericRecord)}");
// }
//}

public static string GenericToJson(GenericRecord record)
{
Expand Down
40 changes: 20 additions & 20 deletions samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@ namespace KafkaFunctionSample
/// </summary>
public static class AvroSpecificTriggers
{
[FunctionName(nameof(User))]
public static void User(
[KafkaTrigger("LocalBroker", "users", ValueType=typeof(UserRecord), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
}
}
//[FunctionName(nameof(User))]
//public static void User(
// [KafkaTrigger("LocalBroker", "users", ValueType=typeof(UserRecord), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
// ILogger logger)
//{
// foreach (var kafkaEvent in kafkaEvents)
// {
// logger.LogInformation($"{JsonConvert.SerializeObject(kafkaEvent.Value)}");
// }
//}

[FunctionName(nameof(PageViewsFemale))]
public static void PageViewsFemale(
[KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType=typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
ILogger logger)
{
foreach (var ke in kafkaEvents)
{
logger.LogInformation($"{JsonConvert.SerializeObject(ke.Value)}");
}
}
//[FunctionName(nameof(PageViewsFemale))]
//public static void PageViewsFemale(
// [KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType=typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
// ILogger logger)
//{
// foreach (var ke in kafkaEvents)
// {
// logger.LogInformation($"{JsonConvert.SerializeObject(ke.Value)}");
// }
//}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<AzureFunctionsVersion>v2</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.0.0-RC2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.26" />
<PackageReference Include="Google.Protobuf" Version="3.7.0" />
</ItemGroup>
Expand Down
39 changes: 39 additions & 0 deletions samples/dotnet/KafkaFunctionSample/ProduceStringTopic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;

namespace KafkaFunctionSample
{
public static class ProduceStringTopic
{
[FunctionName("ProduceStringTopic")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("stringTopicTenPartitions", BrokerList = "LocalBroker")] IAsyncCollector<KafkaEventData> events,
ILogger log)
{
try
{
var kafkaEvent = new KafkaEventData()
{
Value = await new StreamReader(req.Body).ReadToEndAsync(),
};

await events.AddAsync(kafkaEvent);
}
catch (Exception ex)
{
throw new Exception("Are you sure the topic 'stringTopicTenPartitions' exists? To created using Confluent Docker quickstart run this command: 'docker-compose exec broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 10 --topic stringTopicTenPartitions'", ex);
}

return new OkResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
private readonly IConverterManager converterManager;
private readonly INameResolver nameResolver;
private readonly IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration;
private readonly IKafkaProducerManager kafkaProducerManager;

public KafkaExtensionConfigProvider(
IConfiguration config,
IOptions<KafkaOptions> options,
ILoggerFactory loggerFactory,
IConverterManager converterManager,
INameResolver nameResolver,
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration)
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration,
IKafkaProducerManager kafkaProducerManager)
{
this.config = config;
this.options = options;
this.loggerFactory = loggerFactory;
this.converterManager = converterManager;
this.nameResolver = nameResolver;
this.configuration = configuration;
this.kafkaProducerManager = kafkaProducerManager;
}

public void Initialize(ExtensionConfigContext context)
Expand All @@ -60,6 +63,15 @@ public void Initialize(ExtensionConfigContext context)
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(this.config, this.options, this.converterManager, this.nameResolver, this.loggerFactory);
context.AddBindingRule<KafkaTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

// register output binding
context.AddBindingRule<KafkaAttribute>()
.BindToCollector(BuildCollectorFromAttribute);
}

private IAsyncCollector<KafkaEventData> BuildCollectorFromAttribute(KafkaAttribute attribute)
{
return new KafkaAsyncCollector(attribute.Topic, this.kafkaProducerManager.Resolve(attribute));
}

private ISpecificRecord ConvertKafkaEventData2AvroSpecific(KafkaEventData kafkaEventData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<Kafk
configure(options);
});

builder.Services.AddSingleton<IKafkaProducerManager, KafkaProducerManager>();

return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public class KafkaEventData
public DateTime Timestamp { get; set; }
public object Value { get; set; }

public KafkaEventData()
{
}

public KafkaEventData(byte[] bytes)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,21 +337,28 @@ private async Task SafeCloseConsumerAsync()
{
return;
}

// Stop subscriber thread
this.cancellationTokenSource.Cancel();

// Stop function executor
await this.functionExecutor?.CloseAsync(TimeSpan.FromMilliseconds(TimeToWaitForRunningProcessToEnd));
try
{
this.isClosed = true;

// Wait for subscriber thread to end
await this.subscriberFinished?.WaitAsync(TimeToWaitForRunningProcessToEnd);
// Stop subscriber thread
this.cancellationTokenSource.Cancel();

this.isClosed = true;
// Stop function executor
await this.functionExecutor?.CloseAsync(TimeSpan.FromMilliseconds(TimeToWaitForRunningProcessToEnd));

this.consumer?.Unsubscribe();
this.consumer?.Dispose();
this.functionExecutor?.Dispose();
// Wait for subscriber thread to end
await this.subscriberFinished?.WaitAsync(TimeToWaitForRunningProcessToEnd);

this.consumer?.Unsubscribe();
this.consumer?.Dispose();
this.functionExecutor?.Dispose();
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to close Kafka listener");
}
}

protected virtual void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.Kafka</RootNamespace>
<PackageId>Microsoft.Azure.WebJobs.Extensions.Kafka</PackageId>
<Description>Microsoft Azure WebJobs SDK Kafka Extension</Description>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
Expand Down Expand Up @@ -35,4 +36,12 @@
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Folder Include="Config\" />
<Folder Include="Listeners\" />
<Folder Include="Properties\" />
<Folder Include="Trigger\" />
<Folder Include="Serialization\" />
<Folder Include="Output\" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public interface IKafkaProducer
{
Task ProduceAsync(string topic, KafkaEventData item, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
/// <summary>
/// Manages <see cref="IKafkaProducer"/>
/// </summary>
public interface IKafkaProducerManager
{
IKafkaProducer Resolve(KafkaAttribute attribute);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
public class KafkaAsyncCollector : IAsyncCollector<KafkaEventData>
{
private readonly string topic;
private readonly IKafkaProducer producer;

public KafkaAsyncCollector(string topic, IKafkaProducer producer)
{
this.topic = topic;
this.producer = producer;
}

public async Task AddAsync(KafkaEventData item, CancellationToken cancellationToken = default)
{
await this.producer.ProduceAsync(this.topic, item, cancellationToken);
}

public Task FlushAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
}
Loading