Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

<!-- ### Added -->
### Added

- Added possibility to define Kafka/Kinesis topic and schema per reverse proxy endpoint. The current solution using environment variables is deprecated, but still used as a fallback -- will be removed in the version 3.0. [#229](https://github.com/Accenture/reactive-interaction-gateway/issues/229)
- Added Kinesis + Localstack example [#229](https://github.com/Accenture/reactive-interaction-gateway/issues/229)

<!-- ### Changed -->

Expand Down
3 changes: 2 additions & 1 deletion config/rig_inbound_gateway/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ config :rig, RigInboundGateway.ApiProxy.Handler.Kinesis,
kinesis_request_stream: {:system, "PROXY_KINESIS_REQUEST_STREAM", nil},
kinesis_request_region: {:system, "PROXY_KINESIS_REQUEST_REGION", "eu-west-1"},
response_timeout: {:system, :integer, "PROXY_KINESIS_RESPONSE_TIMEOUT", 5_000},
cors: {:system, "CORS", "*"}
cors: {:system, "CORS", "*"},
kinesis_endpoint: {:system, "KINESIS_ENDPOINT", ""}

config :rig, RigInboundGateway.RequestLogger.Kafka,
# The list of brokers, given by a comma-separated list of host:port items:
Expand Down
11 changes: 8 additions & 3 deletions docs/api-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,19 @@ For fire-and-forget style requests, the endpoint configuration looks like this:
"id": "my-endpoint",
"method": "POST",
"path": "/",
"target": "kafka"
"target": "kafka",
"topic": "my-topic",
"schema": "my-avro-schema"
}]
}
},
...
}]
```

Note that the `target` field is set to `kafka` (for Kinesis use `kinesis`).
Note that the `target` field is set to `kafka` (for Kinesis use `kinesis`). The `topic` field is mandatory, but the `schema` field is optional. Alternatively (fallback to the previously used solution), you can define these values via environment variables, described by the `PROXY_KAFKA_*` and `PROXY_KINESIS_*` variables in the [Operator's Guide](./rig-ops-guide.md). Note that the `topic` and `schema` fields are just about publishing to event stream and have nothing to do with events consumption.

> Beware, that the fallback method is deprecated and will be removed in the version 3.0.

The endpoint expects the following request format:

Expand All @@ -165,7 +169,7 @@ The endpoint expects the following request format:
}
```

Topic/stream configuration is handled by environment variables, described by the `PROXY_KAFKA_*` and `PROXY_KINESIS_*` variables in the [Operator's Guide](./rig-ops-guide.md).
> `target_partition` is optional, if not set -- RIG produces event to random Kafka/Kinesis partition.

### Wait for response

Expand All @@ -184,6 +188,7 @@ Configuration of such API endpoint might look like this:
"method": "POST",
"path": "/",
"target": "kafka",
"topic": "my-topic",
"response_from": "kafka"
}]
}
Expand Down
6 changes: 3 additions & 3 deletions docs/rig-ops-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ Variable&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
`PROXY_RECV_TIMEOUT` | Timeout used when receiving a response for a forwarded/proxied request. | 5000
`PROXY_KAFKA_RESPONSE_TOPICS` | Kafka topic for acknowledging Kafka sync events from proxy by correlation ID | ["rig-proxy-response"]
`PROXY_KAFKA_RESPONSE_KAFKA_GROUP_ID` | Kafka group ID used for forwarding asynchronous HTTP responses to waiting HTTP clients. The default should be fine. | "rig-proxy-response"
`PROXY_KAFKA_REQUEST_AVRO` | Avro schema name for events published from proxy. | ""
`PROXY_KAFKA_REQUEST_TOPIC` | Kafka topic for publishing sync/async events from proxy. | ""
`PROXY_KAFKA_REQUEST_AVRO` | **DEPRECATED. (Will be removed with the version 3.0.)** Avro schema name for events published from proxy. | ""
`PROXY_KAFKA_REQUEST_TOPIC` | **DEPRECATED. (Will be removed with the version 3.0.)** Kafka topic for publishing sync/async events from proxy. | ""
`PROXY_KAFKA_RESPONSE_TIMEOUT` | In case an endpoint has `target` set to `http` and `response_from` set to `kafka`, this is the maximum delay between an HTTP request and the corresponding Kafka response message. | 5000
`PROXY_KINESIS_RESPONSE_TIMEOUT` | In case an endpoint has `target` set to `http` and `response_from` set to `kinesis`, this is the maximum delay between an HTTP request and the corresponding Kinesis response message. | 5000
`PROXY_KINESIS_REQUEST_REGION` | AWS region for Kinesis stream publishing events from proxy. | "eu-west-1"
`PROXY_KINESIS_REQUEST_STREAM` | Kinesis stream for publishing sync/async events from proxy. | nil
`PROXY_KINESIS_REQUEST_STREAM` | **DEPRECATED. (Will be removed with the version 3.0.)** Kinesis stream for publishing sync/async events from proxy. | nil
`REQUEST_LOG` | Type of loggers to use to log requests processed by API Proxy, delimited by comma. | []
`SUBMISSION_CHECK` | Select if and how submitting/publishing events should be denied. Can be either `no_check` (submissions are always allowed), `jwt_validation` (submissions are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the submissions. Such an external service is expected to accept POST requests. The CloudEvent is passed as a JSON map in the body. The original request's `Authorization` headers are reused for this request. The submission is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a submission request. | "NO_CHECK"
`SUBSCRIPTION_CHECK` | Select if and how creating subscriptions should be denied. Can be either `no_check` (subscriptions are always allowed), `jwt_validation` (subscription are allowed if at least one authorization token is valid - using JWT_SECRET_KEY - and not blacklisted), or an URL that points to an external service that decides whether to allow or deny the subscription. Such an external service is expected to accept POST requests. The subscription parameters are passed in the body. The original request's `Authorization` headers are reused for this request. The subscription is allowed if the service returns 2xx and denied otherwise; return either 401 or 403 to reject a subscription request. | "NO_CHECK"
Expand Down
71 changes: 71 additions & 0 deletions examples/kinesis-localstack/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Kinesis Localstack

Example showing how to use RIG with AWS Kinesis and [Localstack](https://github.com/localstack/localstack).

## Setup local Kinesis with RIG

> In case you still want to use deprecated version, uncomment line `examples/kinesis-localstack/docker-compose.yml:37`

```sh
# run Localstack and RIG
docker-compose up -d

# create Kinesis streams
docker-compose exec localstack bash -c 'awslocal kinesis create-stream --stream-name RIG-outbound --shard-count 1 --region eu-west-1 && awslocal kinesis create-stream --stream-name RIG-firehose --shard-count 1 --region eu-west-1'

# check created resources in Localstack
http://localhost:8080

# check RIG logs -> after few moments you should see running "record workers"
# you'll see all plenty fo logs about CloudWatch, but that's not important in local setup
docker logs -f reactive-interaction-gateway

# send event via AWS CLI
docker-compose exec localstack bash -c 'awslocal kinesis put-record --stream-name RIG-outbound --data "{\"specversion\":\"0.2\",\"type\":\"com.github.pull.create\",\"source\":\"https://github.com/cloudevents/spec/pull\",\"id\":\"A234-1234-1234\",\"time\":\"2018-04-05T17:31:00Z\",\"data\":\"hello\"}" --partition-key test --region eu-west-1'
```

## Setup Proxy API endpoint

### Recommended way

```sh
curl -X "POST" \
-H "Content-Type: application/json" \
-d "{\"id\":\"kinesis-service\",\"name\":\"kinesis-service\",\"version_data\":{\"default\":{\"endpoints\":[{\"id\":\"kinesis-producer-endpoint\",\"path\":\"/kinesis\",\"method\":\"POST\",\"secured\":false,\"target\":\"kinesis\",\"topic\":\"RIG-outbound\"}]}},\"proxy\":{\"use_env\":false,\"target_url\":\"localstack\",\"port\":4568}}" \
--silent \
"http://localhost:4010/v2/apis"
```

### Deprecated way

> Will be removed in version 3.0.

```sh
# send event via RIG's proxy -> register API in RIG's proxy and send HTTP request
curl -X "POST" \
-H "Content-Type: application/json" \
-d "{\"id\":\"kinesis-service\",\"name\":\"kinesis-service\",\"version_data\":{\"default\":{\"endpoints\":[{\"id\":\"kinesis-producer-endpoint\",\"path\":\"/kinesis\",\"method\":\"POST\",\"secured\":false,\"target\":\"kinesis\"}]}},\"proxy\":{\"use_env\":false,\"target_url\":\"localstack\",\"port\":4568}}" \
--silent \
"http://localhost:4010/v2/apis"
```

## Produce messages

```sh
# setting partition key manually
curl -X "POST" \
-H "Content-Type: application/json" \
-d "{\"specversion\":\"0.2\",\"type\":\"com.github.pull.create\",\"source\":\"https://github.com/cloudevents/spec/pull\",\"id\":\"A234-1234-1234\",\"time\":\"2018-04-05T17:31:00Z\",\"rig\":{\"target_partition\":\"the-partition-key\"},\"data\":\"hello\"}" \
--silent \
"http://localhost:4000/kinesis"

# partition key not set -> will be randomized
curl -X "POST" \
-H "Content-Type: application/json" \
-d "{\"specversion\":\"0.2\",\"type\":\"com.github.pull.create\",\"source\":\"https://github.com/cloudevents/spec/pull\",\"id\":\"A234-1234-1234\",\"time\":\"2018-04-05T17:31:00Z\",\"data\":\"hello\"}" \
--silent \
"http://localhost:4000/kinesis"

# get Kinesis shard iterator and record -> should list several records ... you can also monitor RIG logs and see consumed events
docker-compose exec localstack bash -c 'export SHARD_ITERATOR=$(awslocal kinesis get-shard-iterator --stream-name RIG-outbound --shard-id 0 --shard-iterator-type TRIM_HORIZON --region eu-west-1 --query ShardIterator --output text) && awslocal kinesis get-records --shard-iterator $SHARD_ITERATOR --region eu-west-1'
```
37 changes: 37 additions & 0 deletions examples/kinesis-localstack/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: "2.1"

services:
localstack:
image: localstack/localstack
container_name: localstack
ports:
- "4567-4597:4567-4597"
- "${PORT_WEB_UI-8080}:${PORT_WEB_UI-8080}"
environment:
- SERVICES=${SERVICES-dynamodb,kinesis,cloudwatch}
- DEBUG=${DEBUG- }
- DATA_DIR=${DATA_DIR- }
- PORT_WEB_UI=${PORT_WEB_UI- }
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- }
- KINESIS_ERROR_PROBABILITY=${KINESIS_ERROR_PROBABILITY- }
- DEFAULT_REGION=eu-west-1

rig:
build:
context: ../../
dockerfile: aws.dockerfile
container_name: reactive-interaction-gateway
ports:
- 4000:4000
- 4010:4010
environment:
- LOG_LEVEL=debug
- AWS_ACCESS_KEY_ID=foobar
- AWS_SECRET_ACCESS_KEY=foobar
- KINESIS_APP_NAME=rig
- KINESIS_DYNAMODB_ENDPOINT=http://localstack:4569
- KINESIS_ENABLED=1
- KINESIS_ENDPOINT=http://localstack:4568
- AWS_CBOR_DISABLE=1
# Using env var to set stream name is deprecated and will be removed in version 3.0!
# - PROXY_KINESIS_REQUEST_STREAM=RIG-outbound
2 changes: 2 additions & 0 deletions integration_tests/kafka_tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export KAFKA_SOURCE_TOPICS=rig-avro
export KAFKA_SERIALIZER=avro
export KAFKA_SCHEMA_REGISTRY_HOST=localhost:8081
export KAFKA_LOG_SCHEMA=rig-request-logger-value
export PROXY_KAFKA_REQUEST_TOPIC=rig-avro
export PROXY_KAFKA_REQUEST_AVRO=rig-avro-value

section_header "Running integration test suite for Kafka & Avro"
mix test --only avro "$@"
2 changes: 2 additions & 0 deletions lib/rig_inbound_gateway/api_proxy/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule RigInboundGateway.ApiProxy.Api do
optional(:transform_request_headers) => boolean,
optional(:type) => String.t(),
optional(:target) => String.t(),
optional(:topic) => String.t(),
optional(:schema) => String.t(),
id: String.t(),
# Simple matching; curly braces may be used to ignore parts of the URI.
# Example:
Expand Down
83 changes: 76 additions & 7 deletions lib/rig_inbound_gateway/api_proxy/handler/kafka.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
written to the "rig" extension field (following the CloudEvents v0.2 spec).
- `partition`: The targetted Kafka partition.

or

...
CloudEvent
...
`rig`: {\"target_partition\":\"the-partition-key\"}

"""

# ---
Expand Down Expand Up @@ -85,6 +92,45 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
@doc @help_text
def handle_http_request(conn, api, endpoint, request_path)

def handle_http_request(
conn,
_,
%{"target" => "kafka", "topic" => topic} = endpoint,
request_path
)
when byte_size(topic) > 0 do
response_from = Map.get(endpoint, "response_from", "http")
schema = Map.get(endpoint, "schema")

conn.assigns[:body]
|> Jason.decode()
|> case do
# Deprecated way to pass events:
{:ok, %{"partition" => partition, "event" => event}} ->
do_handle_http_request(conn, request_path, partition, event, response_from, topic, schema)

# Preferred way to pass events, where the partition goes into the "rig" extension:
{:ok, %{"specversion" => _, "rig" => %{"target_partition" => partition}} = event} ->
do_handle_http_request(conn, request_path, partition, event, response_from, topic, schema)

# Deprecated way to pass events, partition not set -> will be randomized:
{:ok, %{"event" => event}} ->
do_handle_http_request(conn, request_path, <<>>, event, response_from, topic)

# Preferred way to pass events, partition not set -> will be randomized:
{:ok, %{"specversion" => _} = event} ->
do_handle_http_request(conn, request_path, <<>>, event, response_from, topic)

{:ok, _} ->
respond_with_bad_request(conn, response_from, "the body does not look like a CloudEvent")

{:error, _} ->
respond_with_bad_request(conn, response_from, "expected a JSON encoded request body")
end
end

@deprecated "Using environemnt variables to set Kafka proxy request topic and schema is deprecated.
Set these values directly in proxy json file"
def handle_http_request(
conn,
_,
Expand All @@ -104,6 +150,14 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
{:ok, %{"specversion" => _, "rig" => %{"target_partition" => partition}} = event} ->
do_handle_http_request(conn, request_path, partition, event, response_from)

# Deprecated way to pass events, partition not set -> will be randomized:
{:ok, %{"event" => event}} ->
do_handle_http_request(conn, request_path, <<>>, event, response_from)

# Preferred way to pass events, partition not set -> will be randomized:
{:ok, %{"specversion" => _} = event} ->
do_handle_http_request(conn, request_path, <<>>, event, response_from)

{:ok, _} ->
respond_with_bad_request(conn, response_from, "the body does not look like a CloudEvent")

Expand All @@ -119,7 +173,9 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
request_path,
partition,
event,
response_from
response_from,
topic \\ nil,
schema \\ nil
) do
kafka_message =
event
Expand All @@ -136,7 +192,7 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do
})
|> Poison.encode!()

produce(partition, kafka_message)
produce(partition, kafka_message, topic, schema)

wait_for_response? =
case response_from do
Expand Down Expand Up @@ -218,16 +274,29 @@ defmodule RigInboundGateway.ApiProxy.Handler.Kafka do

# ---

defp produce(server \\ __MODULE__, key, plaintext) do
GenServer.call(server, {:produce, key, plaintext})
defp produce(server \\ __MODULE__, key, plaintext, topic, schema) do
GenServer.call(server, {:produce, key, plaintext, topic, schema})
end

# ---

@impl GenServer
def handle_call({:produce, key, plaintext}, _from, %{kafka_config: kafka_config} = state) do
%{request_topic: topic, request_schema: schema} = config()
res = RigKafka.produce(kafka_config, topic, schema, key, plaintext)
def handle_call(
{:produce, key, plaintext, topic, schema},
_from,
%{kafka_config: kafka_config} = state
) do
%{request_topic: request_topic, request_schema: request_schema} = config()

res =
RigKafka.produce(
kafka_config,
topic || request_topic,
schema || request_schema,
key,
plaintext
)

{:reply, res, state}
end

Expand Down
Loading