Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
added more elastic changes. Moved around some classes. Changed requir…
Browse files Browse the repository at this point in the history
…e_jar to all pull from single file
  • Loading branch information
Brandon Tom committed Aug 21, 2015
1 parent f43499b commit 7837c27
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 251 deletions.
248 changes: 125 additions & 123 deletions lib/logstash/inputs/DynamoDBLogParser.rb
Expand Up @@ -19,146 +19,148 @@
require 'bigdecimal'
require 'activesupport/json_encoder'
require 'base64'
begin
require 'jar-dependencies'
require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' )
require_jar( 'com.amazonaws', 'dynamodb-import-export-tool', '1.0.0' )
end

require "logstash-input-dynamodb_jars"
java_import "com.fasterxml.jackson.databind.ObjectMapper"
java_import "com.amazonaws.services.dynamodbv2.model.AttributeValue"
java_import "com.amazonaws.dynamodb.bootstrap.AttributeValueMixIn"

class DynamoDBLogParser
module Logstash
module Inputs
module DynamoDB
class DynamoDBLogParser

MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21;
MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21;

def initialize(view_type, log_format, key_schema, region)
@view_type = view_type
@log_format = log_format
@mapper ||= ObjectMapper.new()
@mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL)
@mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn);
@key_schema = key_schema
ActiveSupport.encode_big_decimal_as_string = false
@hash_template = Hash.new
@hash_template["eventID"] = "0"
@hash_template["eventName"] = "INSERT"
@hash_template["eventVersion"] = "1.0"
@hash_template["eventSource"] = "aws:dynamodb"
@hash_template["awsRegion"] = region
end
def initialize(view_type, log_format, key_schema, region)
@view_type = view_type
@log_format = log_format
@mapper ||= ObjectMapper.new()
@mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL)
@mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn);
@key_schema = key_schema
ActiveSupport.encode_big_decimal_as_string = false
@hash_template = Hash.new
@hash_template["eventID"] = "0"
@hash_template["eventName"] = "INSERT"
@hash_template["eventVersion"] = "1.0"
@hash_template["eventSource"] = "aws:dynamodb"
@hash_template["awsRegion"] = region
end

public
def parse_scan(log, new_image_size)
data_hash = JSON.parse(@mapper.writeValueAsString(log))
public
def parse_scan(log, new_image_size)
data_hash = JSON.parse(@mapper.writeValueAsString(log))

@hash_template["dynamodb"] = Hash.new
@hash_template["dynamodb"]["keys"] = Hash.new
size_bytes = calculate_key_size_in_bytes(log)
@key_schema.each { |x|
@hash_template["dynamodb"]["keys"][x] = data_hash[x]
}
unless @view_type == "keys_only"
size_bytes += new_image_size
@hash_template["dynamodb"]["newImage"] = data_hash
end
@hash_template["dynamodb"]["sequenceNumber"] = "0"
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase
@hash_template["dynamodb"] = Hash.new
@hash_template["dynamodb"]["keys"] = Hash.new
size_bytes = calculate_key_size_in_bytes(log)
@key_schema.each { |x|
@hash_template["dynamodb"]["keys"][x] = data_hash[x]
}
unless @view_type == "keys_only"
size_bytes += new_image_size
@hash_template["dynamodb"]["newImage"] = data_hash
end
@hash_template["dynamodb"]["sequenceNumber"] = "0"
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase

return parse_view_type(@hash_template)
end
return parse_view_type(@hash_template)
end

public
def parse_stream(log)
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
end
public
def parse_stream(log)
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
end

private
def calculate_key_size_in_bytes(record)
key_size = 0
@key_schema.each { |x|
key_size += x.length
value = record.get(x)
if not value.getB().nil?
b = value.getB();
key_size += Base64.decode64(b).length
elsif not value.getS().nil?
s = value.getS();
key_size += s.length;
elsif not value.getN().nil?
key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER;
end
}
return key_size
end
private
def calculate_key_size_in_bytes(record)
key_size = 0
@key_schema.each { |x|
key_size += x.length
value = record.get(x)
if !(value.getB().nil?)
b = value.getB();
key_size += Base64.decode64(b).length
elsif !(value.getS().nil?)
s = value.getS();
key_size += s.length;
elsif !(value.getN().nil?)
key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER;
end
}
return key_size
end

private
def parse_view_type(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN
return hash.to_json
end
case @view_type
when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY
return parse_format(hash["dynamodb"]["keys"])
when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE
return parse_format(hash["dynamodb"]["oldImage"])
when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE
return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb.
end
end
private
def parse_view_type(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN
return hash.to_json
end
case @view_type
when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY
return parse_format(hash["dynamodb"]["keys"])
when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE
return parse_format(hash["dynamodb"]["oldImage"])
when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE
return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb.
end
end

private
def parse_format(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB
return hash.to_json
else
return dynamodb_to_json(hash)
end
end
private
def parse_format(hash)
if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB
return hash.to_json
else
return dynamodb_to_json(hash)
end
end

private
def dynamodb_to_json(hash)
return formatAttributeValueMap(hash).to_json
end
private
def dynamodb_to_json(hash)
return formatAttributeValueMap(hash).to_json
end

private
def formatAttributeValueMap(hash)
keys_to_delete = []
hash.each do |k, v|
dynamodb_key = v.keys.first
dynamodb_value = v.values.first
if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B")
keys_to_delete.push(k) # remove binary values and binary sets
next
end
hash[k] = formatAttributeValue(v.keys.first, v.values.first)
end
keys_to_delete.each {|key| hash.delete(key)}
return hash
end
private
def formatAttributeValueMap(hash)
keys_to_delete = []
hash.each do |k, v|
dynamodb_key = v.keys.first
dynamodb_value = v.values.first
if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B")
keys_to_delete.push(k) # remove binary values and binary sets
next
end
hash[k] = formatAttributeValue(v.keys.first, v.values.first)
end
keys_to_delete.each {|key| hash.delete(key)}
return hash
end

private
def formatAttributeValue(key, value)
case key
when "M"
formatAttributeValueMap(value)
when "L"
value.map! do |v|
v = formatAttributeValue(v.keys.first, v.values.first)
end
when "NS","SS","BS"
value.map! do |v|
v = formatAttributeValue(key[0], v)
end
when "N"
BigDecimal.new(value)
when "NULL"
nil
else
value
end
end

private
def formatAttributeValue(key, value)
case key
when "M"
formatAttributeValueMap(value)
when "L"
value.map! do |v|
v = formatAttributeValue(v.keys.first, v.values.first)
end
when "NS","SS","BS"
value.map! do |v|
v = formatAttributeValue(key[0], v)
end
return value
when "N"
return BigDecimal.new(value)
when "NULL"
return nil
else
return value
end
end

end
77 changes: 40 additions & 37 deletions lib/logstash/inputs/LogStashRecordProcessor.rb
Expand Up @@ -15,51 +15,54 @@
#limitations under the License.
#
require "java"
begin
require 'jar-dependencies'
require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' )
require_jar( 'log4j', 'log4j', '1.2.17' )
end

require "logstash-input-dynamodb_jars"
java_import "com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason"
java_import "java.lang.IllegalStateException"
java_import "org.apache.log4j.LogManager"

class LogStashRecordProcessor
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
module Logstash
module Inputs
module DynamoDB
class LogStashRecordProcessor
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor

attr_accessor :queue, :shard_id
attr_accessor :queue, :shard_id

def initialize(queue)
# Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor.
# No good way to overload methods in JRuby, so deciding which was supposed to be called here.
if (queue.is_a? String)
@shard_id = queue
return
else
@queue ||= queue
@logger ||= LogStash::Inputs::DynamoDB.logger
end
end
def initialize(queue)
# Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor.
# No good way to overload methods in JRuby, so deciding which was supposed to be called here.
if (queue.is_a? String)
@shard_id = queue
return
else
@queue ||= queue
@logger ||= LogStash::Inputs::DynamoDB.logger
end
end

def process_records(records, checkpointer)
@logger.debug("Processing batch of " + records.size().to_s + " records")
records.each do |record|
@queue.push(record)
end
#checkpoint once all of the records have been consumed
checkpointer.checkpoint()
end
def process_records(records, checkpointer)
@logger.debug("Processing batch of " + records.size().to_s + " records")
records.each do |record|
@queue.push(record)
end
#checkpoint once all of the records have been consumed
checkpointer.checkpoint()
end

def shutdown(checkpointer, reason)
case reason
when ShutdownReason::TERMINATE
checkpointer.checkpoint()
when ShutdownReason::ZOMBIE
else
raise RuntimeError, "Invalid shutdown reason."
end
unless @shard_id.nil?
@logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s)
def shutdown(checkpointer, reason)
case reason
when ShutdownReason::TERMINATE
checkpointer.checkpoint()
when ShutdownReason::ZOMBIE
else
raise RuntimeError, "Invalid shutdown reason."
end
unless @shard_id.nil?
@logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s)
end
end
end
end
end
end

0 comments on commit 7837c27

Please sign in to comment.