Skip to content

SoftStoneDevelop/KafkaExchanger

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KafkaExchanger

Nuget Downloads Stars License

Kafka broker message processing service generator to simplify communication in a microservices environment. Can be either statefull (on any storage) or stateless.

Usage with Protobuff key/value:

syntax = "proto3";
option csharp_namespace = "protobuff";
package protobuffKeys;

message SimpleKey
{
    int32 Id = 1;
}
syntax = "proto3";
option csharp_namespace = "protobuff";
package protobuffValues;

message SimpleValue
{
    int32 Id = 1;
    Priority Priority = 2;
    string Message = 3;
}

enum Priority
{
    Priority_UNSPECIFIED = 0;
    Priority_WHITE = 1;
    Priority_YELLOW = 2;
    Priority_RED = 3;
}

Declare partial classes with attributes:

[RequestAwaiter(useLogger: false),
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input0
  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue))//output0
]
public partial class TestProtobuffAwaiter
{

}

Pass configs to Start methods. It's all what you need to do.

using var producerPool = new ProducerPoolProtoProto(3, "localhost:9194, localhost:9294, localhost:9394");
using var awaitService = new TestProtobuffAwaiter();
awaitService.Start(configKafka, producerPool);

using var response = await awaitService.Produce(
  new protobuff.SimpleKey() { Id = 459  },
  new protobuff.SimpleValue() { Id = 459, Priority = protobuff.Priority.Unspecified, Message = "Hello world!" }
  );
//response.Input0Message is TestProtobuffAwaiter.Input0Message
//where response.Input0Message.Key is protobuff.SimpleKey
//and response.Input0Message.Value is protobuff.SimpleValue
            

You can have many input or output topics(two in the example):

[RequestAwaiter(useLogger: false),
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input0
  Input(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//input1

  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue)),//output0
  Output(keyType: typeof(protobuff.SimpleKey), valueType: typeof(protobuff.SimpleValue))//output1
]
public partial class TestProtobuffAwaiter
{

}
using var producerPool = new ProducerPoolProtoProto(3, "localhost:9194, localhost:9294, localhost:9394");
using var awaitService = new TestProtobuffAwaiter();
awaitService.Start(configKafka, producerPool, producerPool);

using var response = await awaitService.Produce(
  //to output0
  new protobuff.SimpleKey() { Id = 459  },
  new protobuff.SimpleValue() { Id = 459, Priority = protobuff.Priority.Unspecified, Message = "Hello world!" },

  //to output1
  new protobuff.SimpleKey() { Id = 123  },
  new protobuff.SimpleValue() { Id = 123, Priority = protobuff.Priority.Unspecified, Message = "Hello world! 2" }
  );

//response.Input0Message is TestProtobuffAwaiter.Input0Message
//response.Input1Message is TestProtobuffAwaiter.Input1Message

About

Kafka broker message processing service generator to simplify communication in a microservices environment. Can be either statefull (on any storage) or stateless.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages