Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Partition Key Routing #18

Open
dkinon opened this issue Sep 26, 2017 · 6 comments
Open

Kafka Partition Key Routing #18

dkinon opened this issue Sep 26, 2017 · 6 comments

Comments

@dkinon
Copy link

dkinon commented Sep 26, 2017

I've got basic kafka message routing working but I have a requirement to route messages to a partition based on a routing key. Is there a way to achieve this currently?

@dkinon
Copy link
Author

dkinon commented Sep 26, 2017

Let me know if this should be a separate issue. I tried to workaround my issue by processing a key-based partition assignment in nginx lua and then settings a variable used by kafka_partition. It looks like this won't work because I get this error from ngx_kafka_module: nginx: [emerg] "kafka_partition" directive invalid number. Let me know what technical blockers there are for passing a variable to the kafka_partition directive.

Also, based on my initial question, let me know if there is a better way to approach this.

@brg-liuwei
Copy link
Owner

In my opinion, it is a good way using ngx_lua to solve this problem. Creating several locations that each location binds one partition specific and using ngx_lua route requests to a location based on your routing key.

And I'm very glad to diagnose your problems if it is convenient for you to paste some configuration code here.

@dkinon
Copy link
Author

dkinon commented Sep 29, 2017

@brg-liuwei thanks. The config is relatively simple at this point:

    location /events {
        access_by_lua_file /etc/nginx/conf.d/kafka.lua;
        kafka_partition 0;
        kafka_topic events;
    }

I understand what you mean by having a separate location block per partition to solve this problem however I'm unable to add a partition number into the incoming URI. So the remaining question is how do I modify the URI of an incoming request with ngx_lua? I'm thinking a request comes into /events to hit access_by_lua_file and inside that lua script, it determines the correct partition (based on some hash bucket function) and then points the request at a new location that is something like /events/<partition>. Let me know if that makes sense and if you have any suggestions on how to do this with ngx_lua.

@brg-liuwei
Copy link
Owner

brg-liuwei commented Sep 30, 2017

@dkinon
Assume you config kafka with 3 partitions, the following code maybe help you:

    # create 3 internal locations associate to one existing partition
    location = /kafka_partition/0 {
        internal;
        kafka_topic events;
        kafka_partition 0;
    }   

    location = /kafka_partition/1 {
        internal;
        kafka_topic events;
        kafka_partition 1;
    }   

    location = /kafka_partition/2 {
        internal;
        kafka_topic events;
        kafka_partition 2;
    } 

    location = /send_to_kafka {
        content_by_lua_block {
            -- get route key
            local key = ngx.var.arg_route_key or "no_route_key"

            -- calc hash
            local hash = ngx.crc32_short(ngx.md5(key))

            -- get partition(assume there are 3 partition in your kafka topic)
            local partition = hash % 3

            -- send to location which binds the specific partition
            ngx.location.capture("/kafka_partition/" .. partition, {
                method = ngx.HTTP_POST, -- ngx_kafka_module only supports POST
                body = "hash code:" .. hash .. ", partition:" .. partition .. ", route key:" .. key,
            })  
        }   
    }

Send message with route_key:

    curl host:port/send_to_kafka?route_key=helloworld

Those code has been tested in my dev env. It worked. And if you have any another questions, feel free to add more comments.

@dkinon
Copy link
Author

dkinon commented Oct 4, 2017

This is working for me, thanks.

It would be nice to have a more dynamic configuration for this where kafka_partition could accept an nginx variable rather than a fixed number. This would simplify the nginx locations and the lua code. I would be apposed to doing the work and creating a PR if you could help me understand the technical blockers to this approach.

@brg-liuwei
Copy link
Owner

@dkinon The PR you mentioned is welcomed. You can first read the source code and then learn some basic knowledge about nginx module development. And we can discuss technical questions there or in PR context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants