-
Notifications
You must be signed in to change notification settings - Fork 64
/
java_client.ex
151 lines (126 loc) · 4.93 KB
/
java_client.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
defmodule RigOutboundGateway.Kinesis.JavaClient do
@moduledoc """
Manages the external Java-based Kinesis client application.
In Java land this would've been named AmazonKinesisJavaClientManager.
"""
use Rig.Config, :custom_validation
use GenServer
alias Rig.EventStream.KinesisToFilter
alias RigMetrics.EventsMetrics
alias RigOutboundGateway.Kinesis.LogStream
require Logger
@jinterface_version "1.8.1"
@metrics_source_label "kinesis"
@restart_delay_ms 20_000
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
# Confex callback
defp validate_config!(config) do
# checking that the files actually exists is deferred to init (see check_paths/0),
# as System.cwd doesn't point to the umbrella root at this point.
otp_jar =
case Keyword.get(config, :otp_jar) do
nil ->
Path.join(:code.root_dir(), "lib/jinterface-#{@jinterface_version}/priv/OtpErlang.jar")
val ->
val
end
%{
enabled?: Keyword.fetch!(config, :enabled?),
client_jar: Keyword.fetch!(config, :client_jar),
otp_jar: otp_jar,
log_level: Keyword.fetch!(config, :log_level) || "",
kinesis_app_name: Keyword.fetch!(config, :kinesis_app_name),
kinesis_aws_region: Keyword.fetch!(config, :kinesis_aws_region),
kinesis_stream: Keyword.fetch!(config, :kinesis_stream),
kinesis_endpoint: Keyword.fetch!(config, :kinesis_endpoint),
dynamodb_endpoint: Keyword.fetch!(config, :dynamodb_endpoint)
}
end
@impl GenServer
def init(:ok) do
conf = config()
if conf.enabled? do
# make sure the JInterface Jar file exists:
true =
File.exists?(conf.otp_jar) ||
"Could not find OtpErlang.jar for JInterface #{@jinterface_version} at #{conf.otp_jar}. Does your Erlang distribution come with Java support enabled?"
send(self(), :run_java_client)
end
{:ok, %{}}
end
@doc """
Starts (and awaits) the Java-client for Amazon Kinesis.
The process output is discarded. Instead of using stdout to receive Kinesis messages
from the Java client, the Java code uses JInterface to RPC
RigOutboundGateway.handle_raw/1 directly. This ensures that message boundaries are
kept (think newlines in messages) and that console log output doesn't interfere.
"""
@impl GenServer
def handle_info(:run_java_client, state) do
conf = config()
Logger.debug(fn -> "Starting Java-client for Kinesis.." end)
env = [
RIG_ERLANG_NAME: :erlang.node() |> Atom.to_string(),
RIG_ERLANG_COOKIE: :erlang.get_cookie() |> Atom.to_string(),
LOG_LEVEL: conf.log_level,
KINESIS_APP_NAME: conf.kinesis_app_name,
KINESIS_AWS_REGION: conf.kinesis_aws_region,
KINESIS_STREAM: conf.kinesis_stream,
KINESIS_ENDPOINT: conf.kinesis_endpoint,
KINESIS_DYNAMODB_ENDPOINT: conf.dynamodb_endpoint
]
# LogStream is used to pipe the Java logging output to RIG's logging output.
%Porcelain.Result{status: status} =
Porcelain.exec("java", java_args(), out: %LogStream{}, err: :out, env: env)
Logger.warn(fn ->
"Java-client for Kinesis is dead (exit code #{status}; restart in #{@restart_delay_ms} ms)."
end)
Process.send_after(self(), :run_java_client, @restart_delay_ms)
{:noreply, state}
end
defp java_args do
conf = config()
args = [
"-Djava.util.logging.SimpleFormatter.format=%4$s: %5$s%n",
"-Dexecutor=Elixir.RigOutboundGateway.Kinesis.JavaClient",
"-Dclient_name=kinesis-client",
"-cp",
"#{conf.client_jar}:#{conf.otp_jar}",
"com.accenture.rig.App"
]
Logger.info(fn -> "Exec: java #{Enum.join(args, " ")}" end)
args
end
@spec java_client_callback(data :: [{atom(), String.t()}, ...]) :: :ok
def java_client_callback(data) do
# start to measure processing time for Prometheus metric
metrics_start_time_mono = System.monotonic_time()
conf = config()
try do
case data[:body]
|> Jason.decode!()
|> KinesisToFilter.kinesis_handler() do
:ok ->
# update Prometheus metric with calculated processing time
EventsMetrics.measure_event_processing(
@metrics_source_label,
conf.kinesis_stream,
System.monotonic_time() - metrics_start_time_mono
)
err ->
info = %{error: err, topic: conf.kinesis_stream}
Logger.error("Callback failed to handle message: #{inspect(info)}")
# increase Prometheus metric with event failed to be consumed
EventsMetrics.count_failed_event(@metrics_source_label, conf.kinesis_stream)
end
rescue
err ->
info = %{error: err, topic: conf.kinesis_stream}
Logger.error(fn -> {"failed to decode message: #{inspect(err)}", [info: info]} end)
# increase Prometheus metric with event failed to be consumed
EventsMetrics.count_failed_event(@metrics_source_label, conf.kinesis_stream)
end
end
end