Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTel instrumentation #54

Merged
merged 30 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c5217d0
Add path_patterns arg to perform_request
estolfo May 26, 2023
aa7b203
Add otel tracer and create span
estolfo May 26, 2023
b8df168
Add OpenTelemetry instrumentation
estolfo Jun 28, 2023
1b7a4d1
Add more tests
estolfo Jun 28, 2023
faae679
Update Ruby version check
estolfo Jun 28, 2023
0e1a7fb
Only capture db.statement for search endpoints
estolfo Jun 28, 2023
0ed2a9f
Add sanitization with ENV config
estolfo Jun 28, 2023
4cec224
Further refine sanitization
estolfo Jun 29, 2023
f58a82b
Handle case when endpoint and path templates values are nil
estolfo Jul 3, 2023
ca8dcfc
Updates to tests
estolfo Jul 3, 2023
6e6922e
Add github action for testing with OTel
estolfo Jul 3, 2023
c8aa213
Use ENV variable to set hosts in new client
estolfo Jul 3, 2023
128debe
Use ENV variable for test port
estolfo Jul 3, 2023
39d5db3
otel span attr server.port should be int
estolfo Jul 3, 2023
2c090d5
Test when named capture has an underscore
estolfo Jul 3, 2023
53f8b97
Add ENV variable for enabling/disabling otel instrumentation
estolfo Jul 4, 2023
0812ce2
Use opts instead of method args
estolfo Jul 4, 2023
f1a0c0b
Pass set variables to perform_method instead
estolfo Jul 11, 2023
0b0b3e2
Convert array of path params to string
estolfo Sep 6, 2023
05022db
Minor update
estolfo Sep 6, 2023
d1bfcc0
Add test for when no endpoint or path params are provided
estolfo Sep 21, 2023
4bcbe8e
Add option to provide a tracer provider when creating a client
estolfo Sep 25, 2023
2cbc27d
Fix typo
estolfo Sep 25, 2023
d653d91
db.system span attribute should always be elasticsearch
estolfo Sep 26, 2023
5470c77
Adjust names of environment variables
estolfo Sep 26, 2023
779c83e
Add some more tests for all body strategy options
estolfo Sep 26, 2023
a45f52a
Remove extra line
estolfo Sep 26, 2023
b04c310
Remove remnants of older approach
estolfo Sep 26, 2023
350e409
Add note for Open Telemetry wrapper class
estolfo Sep 27, 2023
7958acc
Add inline documentation
estolfo Sep 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/otel.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: opentelemetry
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
test-otel:
name: 'Test Open Telemetry'
env:
TEST_ES_SERVER: http://localhost:9250
PORT: 9250
TEST_WITH_OTEL: true
strategy:
fail-fast: false
matrix:
ruby: ['3.2', 'jruby-9.4']
es_version: ['8.9-SNAPSHOT']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Increase system limits
run: |
sudo swapoff -a
sudo sysctl -w vm.swappiness=1
sudo sysctl -w fs.file-max=262144
sudo sysctl -w vm.max_map_count=262144
- uses: elastic/elastic-github-actions/elasticsearch@master
with:
stack-version: ${{ matrix.es_version }}
security-enabled: false
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- name: Build and test with Rake
run: |
sudo apt-get update
sudo apt-get install libcurl4-openssl-dev
ruby -v
bundle install
- name: unit tests
run: bundle exec rake test:unit
- name: specs
run: bundle exec rake test:spec
- name: integration tests
run: bundle exec rake test:integration
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ group :development, :test do
else
gem 'pry-byebug'
end
if RUBY_VERSION >= '3.0'
gem 'opentelemetry-sdk', require: false
end
end
1 change: 1 addition & 0 deletions lib/elastic/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@
require 'elastic/transport/transport/http/faraday'
require 'elastic/transport/client'
require 'elastic/transport/redacted'
require 'elastic/transport/opentelemetry'

require 'elastic/transport/version'
29 changes: 27 additions & 2 deletions lib/elastic/transport/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,39 @@ def initialize(arguments = {}, &block)
@transport_class.new(hosts: @hosts, options: @arguments)
end
end

if defined?(::OpenTelemetry) && ENV[OpenTelemetry::ENV_VARIABLE_ENABLED] != 'false'
@otel = OpenTelemetry.new(@arguments)
end
end

# Performs a request through delegation to {#transport}.
#
def perform_request(method, path, params = {}, body = nil, headers = nil)
def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {})
method = @send_get_body_as if 'GET' == method && body
validate_ca_fingerprints if @ca_fingerprint
transport.perform_request(method, path, params, body, headers)
if @otel
# If no endpoint is specified in the opts, use the HTTP method name
span_name = opts[:endpoint] || method
@otel.tracer.in_span(span_name) do |span|
span['http.request.method'] = method
span['db.system'] = 'elasticsearch'
opts[:defined_params]&.each do |k, v|
if v.respond_to?(:join)
span["db.elasticsearch.path_parts.#{k}"] = v.join(',')
else
span["db.elasticsearch.path_parts.#{k}"] = v
end
end
if body_as_json = @otel.process_body(body, opts[:endpoint])
span['db.statement'] = body_as_json
end
span['db.operation'] = opts[:endpoint] if opts[:endpoint]
transport.perform_request(method, path, params || {}, body, headers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since params is being passed with the default value of {} in the perform_request signature, I think we don't need || {} here and on line 197, right?.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my testing, there was sometimes a case when there were no params but there was a body so the perform_request method would be called with the params arg explicitly set to nil. So once it gets to this line you get an NoMethodError: undefined method delete' for nil:NilClass` error.

I think this scenario is already possible before these changes:

$ client.perform_request('POST', '/', nil, '{"foo":"bar"}', '{"Content-Type":"application/x-ndjson"}')
NoMethodError: undefined method `delete' for nil:NilClass
from ..../elastic-transport-ruby/fork/elastic-transport-ruby/lib/elastic/transport/transport/base.rb:283:in `perform_request'

end
else
transport.perform_request(method, path, params || {}, body, headers)
end
end

private
Expand Down
157 changes: 157 additions & 0 deletions lib/elastic/transport/opentelemetry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module Elastic
module Transport
# Wrapper object for Open Telemetry objects, associated config and functionality.
#
# @api private
class OpenTelemetry
OTEL_TRACER_NAME = 'elasticsearch-api'
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
ENV_VARIABLE_ENABLED = 'OTEL_RUBY_INSTRUMENTATION_ELASTICSEARCH_ENABLED'
# Describes how to handle search queries in the request body when assigned to
# a span attribute.
# Valid values are 'raw', 'omit', 'sanitize'. Default is 'omit'.
ENV_VARIABLE_BODY_STRATEGY = 'OTEL_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY'
DEFAULT_BODY_STRATEGY = 'omit'
# A string list of keys whose values are redacted. This is only relevant if the body strategy is
# 'sanitize'. For example, a config 'sensitive-key,other-key' will redact the values at
# 'sensitive-key' and 'other-key' in addition to the default keys.
ENV_VARIABLE_BODY_SANITIZE_KEYS = 'OTEL_RUBY_INSTRUMENTATION_ELASTICSEARCH_SEARCH_QUERY_SANITIZE_KEYS'

# A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in
# the request body may be captured for these endpoints, depending on the body capture strategy.
SEARCH_ENDPOINTS = Set[
"search",
"async_search.submit",
"msearch",
"eql.search",
"terms_enum",
"search_template",
"msearch_template",
"render_search_template",
]

# Initialize the Open Telemetry wrapper object. Takes the options originally passed to
# Client#initialize.
def initialize(opts)
@tracer = (opts[:opentelemetry_tracer_provider] || ::OpenTelemetry.tracer_provider).tracer(
OTEL_TRACER_NAME, Elastic::Transport::VERSION
)
@body_strategy = ENV[ENV_VARIABLE_BODY_STRATEGY] || DEFAULT_BODY_STRATEGY
@sanitize_keys = ENV[ENV_VARIABLE_BODY_SANITIZE_KEYS]&.split(',')&.collect! do |pattern|
Regexp.new(pattern.gsub('*', '.*'))
end
end
attr_accessor :tracer

# Process the request body. Applies the body strategy, which can be one of the following:
# 'omit': return nil
# 'sanitize': redact values at the default list of keys + any additional keys provided in
# the OTEL_RUBY_INSTRUMENTATION_ELASTICSEARCH_SEARCH_QUERY_SANITIZE_KEYS env variable.
# 'raw': return the original body, unchanged
def process_body(body, endpoint)
unless @body_strategy == 'omit' || !SEARCH_ENDPOINTS.include?(endpoint)
if @body_strategy == 'sanitize'
Sanitizer.sanitize(body, @sanitize_keys).to_json
elsif @body_strategy == 'raw'
body&.is_a?(String) ? body : body.to_json
end
end
end

# Replaces values in a hash with 'REDACTED', given a set of keys to match on.
class Sanitizer
class << self
FILTERED = 'REDACTED'
DEFAULT_KEY_PATTERNS =
%w[password passwd pwd secret *key *token* *session* *credit* *card* *auth* set-cookie].map! do |p|
Regexp.new(p.gsub('*', '.*'))
end

def sanitize(body, key_patterns = [])
patterns = DEFAULT_KEY_PATTERNS
patterns += key_patterns if key_patterns
sanitize!(DeepDup.dup(body), patterns)
end

private

def sanitize!(obj, key_patterns)
return obj unless obj.is_a?(Hash)

obj.each_pair do |k, v|
if filter_key?(key_patterns, k)
obj[k] = FILTERED
elsif v.is_a?(Hash)
sanitize!(v, key_patterns)
else
next
end
end
end

def filter_key?(key_patterns, key)
key_patterns.any? { |regex| regex.match(key) }
end
end
end

# Makes a deep copy of an Array or Hash
# NB: Not guaranteed to work well with complex objects, only simple Hash,
# Array, String, Number, etc.
class DeepDup
def initialize(obj)
@obj = obj
end

def dup
deep_dup(@obj)
end

def self.dup(obj)
new(obj).dup
end

private

def deep_dup(obj)
case obj
when Hash then hash(obj)
when Array then array(obj)
else obj.dup
end
end

def array(arr)
arr.map { |obj| deep_dup(obj) }
end

def hash(hsh)
result = hsh.dup

hsh.each_pair do |key, value|
result[key] = deep_dup(value)
end

result
end
end
end
end
end
8 changes: 8 additions & 0 deletions lib/elastic/transport/transport/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,14 @@ def connection_headers(connection)
connection.connection.headers
end
end

def capture_otel_span_attributes(connection, url)
if defined?(::OpenTelemetry)
::OpenTelemetry::Trace.current_span&.set_attribute('url.full', url)
::OpenTelemetry::Trace.current_span&.set_attribute('server.address', connection.host[:host])
::OpenTelemetry::Trace.current_span&.set_attribute('server.port', connection.host[:port].to_i)
end
end
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/elastic/transport/transport/http/curb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class Curb
# @see Transport::Base#perform_request
#
def perform_request(method, path, params={}, body=nil, headers=nil, opts={})
super do |connection, _url|
super do |connection, url|
capture_otel_span_attributes(connection, url)
connection.connection.url = connection.full_url(path, params)
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, headers)
Expand Down
1 change: 1 addition & 0 deletions lib/elastic/transport/transport/http/faraday.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Faraday
#
def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {})
super do |connection, url|
capture_otel_span_attributes(connection, url)
headers = parse_headers(headers, connection)
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, headers)
Expand Down
1 change: 1 addition & 0 deletions lib/elastic/transport/transport/http/manticore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def build_client(options = {})
#
def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {})
super do |connection, url|
capture_otel_span_attributes(connection, url)
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, parse_headers(headers))
params[:body] = body if body
Expand Down
Loading
Loading