Skip to content

Commit

Permalink
Remove obsolete Cloud Functions-related code. (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorpeshansky committed Jun 6, 2019
1 parent bd8c4e7 commit 975357e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 244 deletions.
108 changes: 10 additions & 98 deletions lib/fluent/plugin/out_google_cloud.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,6 @@ module ServiceConstants
resource_type: 'gae_app',
metadata_attributes: %w(gae_backend_name gae_backend_version)
}.freeze
CLOUDFUNCTIONS_CONSTANTS = {
service: 'cloudfunctions.googleapis.com',
resource_type: 'cloud_function',
stream_severity_map: {
'stdout' => 'INFO',
'stderr' => 'ERROR'
}
}.freeze
COMPUTE_CONSTANTS = {
service: 'compute.googleapis.com',
resource_type: 'gce_instance'
Expand Down Expand Up @@ -541,21 +533,9 @@ def configure(conf)
@resource ||= determine_agent_level_monitored_resource_via_legacy

# Set regexp that we should match tags against later on. Using a list
# instead of a map to ensure order. For example, tags will be matched
# against Cloud Functions first, then GKE.
# instead of a map to ensure order.
@tag_regexp_list = []
if @resource.type == GKE_CONSTANTS[:resource_type]
# We only support Cloud Functions logs for GKE right now.
if fetch_gce_metadata('instance/attributes/'
).split.include?('gcf_region')
# Fetch this info and store it to avoid recurring
# metadata server calls.
@gcf_region = fetch_gce_metadata('instance/attributes/gcf_region')
@tag_regexp_list << [
CLOUDFUNCTIONS_CONSTANTS[:resource_type],
@compiled_cloudfunctions_tag_regexp
]
end
@tag_regexp_list << [
GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp
]
Expand Down Expand Up @@ -667,8 +647,7 @@ def write(chunk)
end
end

ts_secs, ts_nanos = compute_timestamp(
entry_level_resource.type, record, time)
ts_secs, ts_nanos = compute_timestamp(record, time)
severity = compute_severity(
entry_level_resource.type, record, entry_level_common_labels)

Expand Down Expand Up @@ -1087,14 +1066,6 @@ def set_regexp_patterns
@compiled_kubernetes_tag_regexp = Regexp.new(@kubernetes_tag_regexp) if
@kubernetes_tag_regexp

@compiled_cloudfunctions_tag_regexp =
/\.(?<encoded_function_name>.+)\.\d+-[^-]+_default_worker$/
@compiled_cloudfunctions_log_regexp = /^
(?:\[(?<severity>.)\])?
\[(?<timestamp>.{24})\]
(?:\[(?<execution_id>[^\]]+)\])?
[ ](?<text>.*)$/x

@compiled_http_latency_regexp =
/^\s*(?<seconds>\d+)(?<decimal>\.\d+)?\s*s\s*$/
end
Expand Down Expand Up @@ -1364,23 +1335,6 @@ def determine_group_level_monitored_resource_and_labels(tag,

# Once the resource type is settled down, determine the labels.
case resource.type
# Cloud Functions.
when CLOUDFUNCTIONS_CONSTANTS[:resource_type]
resource.labels.merge!(
'region' => @gcf_region,
'function_name' => decode_cloudfunctions_function_name(
matched_regexp_group['encoded_function_name'])
)
instance_id = resource.labels.delete('instance_id')
common_labels.merge!(
"#{GKE_CONSTANTS[:service]}/instance_id" => instance_id,
"#{COMPUTE_CONSTANTS[:service]}/resource_id" => instance_id,
"#{GKE_CONSTANTS[:service]}/cluster_name" =>
resource.labels.delete('cluster_name'),
"#{COMPUTE_CONSTANTS[:service]}/zone" =>
resource.labels.delete('zone')
)

# GKE container.
when GKE_CONSTANTS[:resource_type]
if matched_regexp_group
Expand Down Expand Up @@ -1475,17 +1429,6 @@ def determine_entry_level_monitored_resource_and_labels(
common_labels = group_level_common_labels.dup

case resource.type
# Cloud Functions.
when CLOUDFUNCTIONS_CONSTANTS[:resource_type]
if record.key?('log')
@cloudfunctions_log_match =
@compiled_cloudfunctions_log_regexp.match(record['log'])
common_labels['execution_id'] =
@cloudfunctions_log_match['execution_id'] if
@cloudfunctions_log_match &&
@cloudfunctions_log_match['execution_id']
end

# GKE container.
when GKE_CONSTANTS[:resource_type]
# Move the stdout/stderr annotation from the record into a label.
Expand Down Expand Up @@ -1627,7 +1570,7 @@ def time_or_nil(ts_secs, ts_nanos)
nil
end

def compute_timestamp(resource_type, record, time)
def compute_timestamp(record, time)
current_time = Time.now
if record.key?('timestamp') &&
record['timestamp'].is_a?(Hash) &&
Expand Down Expand Up @@ -1655,12 +1598,6 @@ def compute_timestamp(resource_type, record, time)
'timestampSeconds and timestampNanos instead.'
end
timestamp = time_or_nil(ts_secs, ts_nanos)
elsif resource_type == CLOUDFUNCTIONS_CONSTANTS[:resource_type] &&
@cloudfunctions_log_match
timestamp = DateTime.parse(
@cloudfunctions_log_match['timestamp']).to_time
ts_secs = timestamp.tv_sec
ts_nanos = timestamp.tv_nsec
elsif record.key?('time')
# k8s ISO8601 timestamp
begin
Expand Down Expand Up @@ -1718,14 +1655,7 @@ def compute_timestamp(resource_type, record, time)
end

def compute_severity(resource_type, record, entry_level_common_labels)
if resource_type == CLOUDFUNCTIONS_CONSTANTS[:resource_type]
if @cloudfunctions_log_match && @cloudfunctions_log_match['severity']
return parse_severity(@cloudfunctions_log_match['severity'])
elsif record.key?('stream')
return CLOUDFUNCTIONS_CONSTANTS[:stream_severity_map].fetch(
record.delete('stream'), 'DEFAULT')
end
elsif record.key?('severity')
if record.key?('severity')
return parse_severity(record.delete('severity'))
elsif resource_type == GKE_CONSTANTS[:resource_type]
stream = entry_level_common_labels["#{GKE_CONSTANTS[:service]}/stream"]
Expand Down Expand Up @@ -1946,14 +1876,6 @@ def parse_latency(latency)
end
end

def decode_cloudfunctions_function_name(function_name)
function_name.gsub(/c\.[a-z]/) { |s| s.upcase[-1] }
.gsub('u.u', '_')
.gsub('d.d', '$')
.gsub('a.a', '@')
.gsub('p.p', '.')
end

def format(tag, time, record)
Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s
end
Expand Down Expand Up @@ -2035,19 +1957,11 @@ def set_payload(resource_type, record, entry, is_json)
# Only one of {text_payload, json_payload} will be set.
text_payload = nil
json_payload = nil
# If this is a Cloud Functions log that matched the expected regexp,
# use text payload. Otherwise, use JSON if we found valid JSON, or text
# payload in the following cases:
# 1. This is a Cloud Functions log and the 'log' key is available
# 2. This is an unstructured Container log and the 'log' key is available
# 3. The only remaining key is 'message'
if resource_type == CLOUDFUNCTIONS_CONSTANTS[:resource_type] &&
@cloudfunctions_log_match
text_payload = @cloudfunctions_log_match['text']
elsif resource_type == CLOUDFUNCTIONS_CONSTANTS[:resource_type] &&
record.key?('log')
text_payload = record['log']
elsif is_json
# Use JSON if we found valid JSON, or text payload in the following
# cases:
# 1. This is an unstructured Container log and the 'log' key is available
# 2. The only remaining key is 'message'
if is_json
json_payload = record
elsif [GKE_CONSTANTS[:resource_type],
DOCKER_CONSTANTS[:resource_type]].include?(resource_type) &&
Expand Down Expand Up @@ -2076,9 +1990,7 @@ def set_payload(resource_type, record, entry, is_json)
end

def log_name(tag, resource)
if resource.type == CLOUDFUNCTIONS_CONSTANTS[:resource_type]
tag = 'cloud-functions'
elsif resource.type == APPENGINE_CONSTANTS[:resource_type]
if resource.type == APPENGINE_CONSTANTS[:resource_type]
# Add a prefix to Managed VM logs to prevent namespace collisions.
tag = "#{APPENGINE_CONSTANTS[:service]}/#{tag}"
elsif resource.type == GKE_CONSTANTS[:resource_type]
Expand Down
90 changes: 0 additions & 90 deletions test/plugin/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1223,70 +1223,6 @@ def test_json_container_log_metadata_from_tag
end
end

def test_cloudfunctions_log
setup_gce_metadata_stubs
setup_cloudfunctions_metadata_stubs
[1, 2, 3, 5, 11, 50].each do |n|
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CLOUDFUNCTIONS_TAG)
# The test driver doesn't clear its buffer of entries after running, so
# do it manually here.
d.instance_variable_get('@entries').clear
@logs_sent = []
n.times { |i| d.emit(cloudfunctions_log_entry(i)) }
d.run
end
verify_log_entries(n, CLOUDFUNCTIONS_PARAMS) do |entry, i|
verify_default_log_entry_text(entry['textPayload'], i, entry)
assert_equal 'DEBUG', entry['severity'],
"Test with #{n} logs failed. \n#{entry}"
end
end
end

def test_cloudfunctions_logs_text_not_matched
setup_gce_metadata_stubs
setup_cloudfunctions_metadata_stubs
[1, 2, 3, 5, 11, 50].each do |n|
@logs_sent = []
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CLOUDFUNCTIONS_TAG)
# The test driver doesn't clear its buffer of entries after running, so
# do it manually here.
d.instance_variable_get('@entries').clear
n.times { |i| d.emit(cloudfunctions_log_entry_text_not_matched(i)) }
d.run
end
verify_log_entries(
n, CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS) do |entry|
assert_equal 'INFO', entry['severity'],
"Test with #{n} logs failed. \n#{entry}"
end
end
end

def test_multiple_cloudfunctions_logs_tag_not_matched
setup_gce_metadata_stubs
setup_cloudfunctions_metadata_stubs
[1, 2, 3, 5, 11, 50].each do |n|
@logs_sent = []
setup_logging_stubs do
d = create_driver(APPLICATION_DEFAULT_CONFIG, CONTAINER_TAG)
# The test driver doesn't clear its buffer of entries after running, so
# do it manually here.
d.instance_variable_get('@entries').clear
n.times { |i| d.emit(cloudfunctions_log_entry(i)) }
d.run
end
verify_log_entries(n, CONTAINER_FROM_TAG_PARAMS, 'textPayload') \
do |entry, i|
assert_equal '[D][2015-09-25T12:34:56.789Z][123-0] test log entry ' \
"#{i}", entry['textPayload'],
"Test with #{n} logs failed. \n#{entry}"
end
end
end

def test_dataproc_log
setup_gce_metadata_stubs
setup_dataproc_metadata_stubs
Expand Down Expand Up @@ -2173,18 +2109,6 @@ def setup_k8s_metadata_stubs(should_respond = true)
end
end

def setup_cloudfunctions_metadata_stubs
stub_metadata_request(
'instance/attributes/',
"attribute1\ncluster-location\ncluster-name\ngcf_region\nlast_attribute")
stub_metadata_request('instance/attributes/cluster-location',
K8S_LOCATION2)
stub_metadata_request('instance/attributes/cluster-name',
K8S_CLUSTER_NAME)
stub_metadata_request('instance/attributes/gcf_region',
CLOUDFUNCTIONS_REGION)
end

def setup_dataproc_metadata_stubs
stub_metadata_request(
'instance/attributes/',
Expand Down Expand Up @@ -2339,20 +2263,6 @@ def k8s_node_log_entry(log)
}
end

def cloudfunctions_log_entry(i)
{
stream: 'stdout',
log: '[D][2015-09-25T12:34:56.789Z][123-0] ' + log_entry(i)
}
end

def cloudfunctions_log_entry_text_not_matched(i)
{
stream: 'stdout',
log: log_entry(i)
}
end

def dataflow_log_entry(i)
{
step: DATAFLOW_STEP_ID,
Expand Down
56 changes: 0 additions & 56 deletions test/plugin/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,6 @@ module Constants
CONTAINER_SEVERITY = 'INFO'.freeze
CONTAINER_LOCAL_RESOURCE_ID_PREFIX = 'gke_container'.freeze

# Cloud Functions specific labels.
CLOUDFUNCTIONS_FUNCTION_NAME = '$My_Function.Name-@1'.freeze
CLOUDFUNCTIONS_REGION = 'us-central1'.freeze
CLOUDFUNCTIONS_EXECUTION_ID = '123-0'.freeze
CLOUDFUNCTIONS_CLUSTER_NAME = 'cluster-1'.freeze
CLOUDFUNCTIONS_NAMESPACE_NAME = 'default'.freeze
CLOUDFUNCTIONS_POD_NAME =
'd.dc.myu.uc.functionp.pc.name-a.a1.987-c0l82'.freeze
CLOUDFUNCTIONS_CONTAINER_NAME = 'worker'.freeze

# Dataflow specific labels.
DATAFLOW_REGION = 'us-central1'.freeze
DATAFLOW_JOB_NAME = 'job_name_1'.freeze
Expand Down Expand Up @@ -709,52 +699,6 @@ module Constants
DOCKER_CONTAINER_PARAMS_NO_STREAM =
DOCKER_CONTAINER_PARAMS.merge(labels: {}).freeze

# Cloud Functions.
CLOUDFUNCTIONS_TAG = "kubernetes.#{CLOUDFUNCTIONS_POD_NAME}_" \
"#{CLOUDFUNCTIONS_NAMESPACE_NAME}_" \
"#{CLOUDFUNCTIONS_CONTAINER_NAME}".freeze

CLOUDFUNCTIONS_PARAMS = {
resource: {
type: CLOUDFUNCTIONS_CONSTANTS[:resource_type],
labels: {
'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME,
'region' => CLOUDFUNCTIONS_REGION
}
},
log_name: 'cloud-functions',
project_id: PROJECT_ID,
labels: {
'execution_id' => CLOUDFUNCTIONS_EXECUTION_ID,
"#{GKE_CONSTANTS[:service]}/instance_id" => VM_ID,
"#{GKE_CONSTANTS[:service]}/cluster_name" =>
CLOUDFUNCTIONS_CLUSTER_NAME,
"#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
"#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
"#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
}
}.freeze

CLOUDFUNCTIONS_TEXT_NOT_MATCHED_PARAMS = {
resource: {
type: CLOUDFUNCTIONS_CONSTANTS[:resource_type],
labels: {
'function_name' => CLOUDFUNCTIONS_FUNCTION_NAME,
'region' => CLOUDFUNCTIONS_REGION
}
},
log_name: 'cloud-functions',
project_id: PROJECT_ID,
labels: {
"#{GKE_CONSTANTS[:service]}/instance_id" => VM_ID,
"#{GKE_CONSTANTS[:service]}/cluster_name" =>
CLOUDFUNCTIONS_CLUSTER_NAME,
"#{COMPUTE_CONSTANTS[:service]}/resource_id" => VM_ID,
"#{COMPUTE_CONSTANTS[:service]}/resource_name" => HOSTNAME,
"#{COMPUTE_CONSTANTS[:service]}/zone" => ZONE
}
}.freeze

# Cloud Dataflow.
DATAFLOW_PARAMS = {
resource: {
Expand Down

0 comments on commit 975357e

Please sign in to comment.