This repository has been archived by the owner on Dec 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 28
/
DynamoDBLogParser.rb
166 lines (151 loc) · 5.38 KB
/
DynamoDBLogParser.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# encoding: utf-8
#
#Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
#
require 'java'
require 'json'
require 'bigdecimal'
require 'activesupport/json_encoder'
require 'base64'
require "logstash-input-dynamodb_jars"
java_import "com.fasterxml.jackson.databind.ObjectMapper"
java_import "com.amazonaws.services.dynamodbv2.model.AttributeValue"
java_import "com.amazonaws.dynamodb.bootstrap.AttributeValueMixIn"
module Logstash
module Inputs
module DynamoDB
class DynamoDBLogParser
MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21;
def initialize(view_type, log_format, key_schema, region)
@view_type = view_type
@log_format = log_format
@mapper ||= ObjectMapper.new()
@mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL)
@mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn);
@key_schema = key_schema
ActiveSupport.encode_big_decimal_as_string = false
@hash_template = Hash.new
@hash_template["eventID"] = "0"
@hash_template["eventName"] = "INSERT"
@hash_template["eventVersion"] = "1.0"
@hash_template["eventSource"] = "aws:dynamodb"
@hash_template["awsRegion"] = region
end
public
def parse_scan(log, new_image_size)
data_hash = JSON.parse(@mapper.writeValueAsString(log))
@hash_template["dynamodb"] = Hash.new
@hash_template["dynamodb"]["keys"] = Hash.new
size_bytes = calculate_key_size_in_bytes(log)
@key_schema.each { |x|
@hash_template["dynamodb"]["keys"][x] = data_hash[x]
}
unless @view_type == "keys_only"
size_bytes += new_image_size
@hash_template["dynamodb"]["newImage"] = data_hash
end
@hash_template["dynamodb"]["sequenceNumber"] = "0"
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase
return parse_view_type(@hash_template)
end
public
def parse_stream(log)
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
end
private
def calculate_key_size_in_bytes(record)
key_size = 0
@key_schema.each { |x|
key_size += x.length
value = record.get(x)
if !(value.getB().nil?)
b = value.getB();
key_size += Base64.decode64(b).length
elsif !(value.getS().nil?)
s = value.getS();
key_size += s.length;
elsif !(value.getN().nil?)
key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER;
end
}
return key_size
end
private
def parse_view_type(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN
return hash.to_json
end
case @view_type
when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY
return parse_format(hash["dynamodb"]["keys"])
when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE
return parse_format(hash["dynamodb"]["oldImage"])
when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE
return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb.
end
end
private
def parse_format(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB
return hash.to_json
else
return dynamodb_to_json(hash)
end
end
private
def dynamodb_to_json(hash)
return formatAttributeValueMap(hash).to_json
end
private
def formatAttributeValueMap(hash)
keys_to_delete = []
hash.each do |k, v|
dynamodb_key = v.keys.first
dynamodb_value = v.values.first
if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B")
keys_to_delete.push(k) # remove binary values and binary sets
next
end
hash[k] = formatAttributeValue(v.keys.first, v.values.first)
end
keys_to_delete.each {|key| hash.delete(key)}
return hash
end
private
def formatAttributeValue(key, value)
case key
when "M"
formatAttributeValueMap(value)
when "L"
value.map! do |v|
v = formatAttributeValue(v.keys.first, v.values.first)
end
when "NS","SS","BS"
value.map! do |v|
v = formatAttributeValue(key[0], v)
end
when "N"
BigDecimal.new(value)
when "NULL"
nil
else
value
end
end
end
end
end
end