diff --git a/lib/logstash/inputs/DynamoDBLogParser.rb b/lib/logstash/inputs/DynamoDBLogParser.rb index b7393d1..48da24a 100644 --- a/lib/logstash/inputs/DynamoDBLogParser.rb +++ b/lib/logstash/inputs/DynamoDBLogParser.rb @@ -19,146 +19,148 @@ require 'bigdecimal' require 'activesupport/json_encoder' require 'base64' -begin - require 'jar-dependencies' - require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' ) - require_jar( 'com.amazonaws', 'dynamodb-import-export-tool', '1.0.0' ) -end + +require "logstash-input-dynamodb_jars" java_import "com.fasterxml.jackson.databind.ObjectMapper" java_import "com.amazonaws.services.dynamodbv2.model.AttributeValue" java_import "com.amazonaws.dynamodb.bootstrap.AttributeValueMixIn" -class DynamoDBLogParser +module Logstash + module Inputs + module DynamoDB + class DynamoDBLogParser - MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21; + MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21; - def initialize(view_type, log_format, key_schema, region) - @view_type = view_type - @log_format = log_format - @mapper ||= ObjectMapper.new() - @mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL) - @mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn); - @key_schema = key_schema - ActiveSupport.encode_big_decimal_as_string = false - @hash_template = Hash.new - @hash_template["eventID"] = "0" - @hash_template["eventName"] = "INSERT" - @hash_template["eventVersion"] = "1.0" - @hash_template["eventSource"] = "aws:dynamodb" - @hash_template["awsRegion"] = region - end + def initialize(view_type, log_format, key_schema, region) + @view_type = view_type + @log_format = log_format + @mapper ||= ObjectMapper.new() + @mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL) + @mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn); + @key_schema = key_schema + ActiveSupport.encode_big_decimal_as_string = false + @hash_template = Hash.new + @hash_template["eventID"] = "0" + @hash_template["eventName"] = "INSERT" + @hash_template["eventVersion"] = "1.0" + @hash_template["eventSource"] = "aws:dynamodb" + @hash_template["awsRegion"] = region + end - public - def parse_scan(log, new_image_size) - data_hash = JSON.parse(@mapper.writeValueAsString(log)) + public + def parse_scan(log, new_image_size) + data_hash = JSON.parse(@mapper.writeValueAsString(log)) - @hash_template["dynamodb"] = Hash.new - @hash_template["dynamodb"]["keys"] = Hash.new - size_bytes = calculate_key_size_in_bytes(log) - @key_schema.each { |x| - @hash_template["dynamodb"]["keys"][x] = data_hash[x] - } - unless @view_type == "keys_only" - size_bytes += new_image_size - @hash_template["dynamodb"]["newImage"] = data_hash - end - @hash_template["dynamodb"]["sequenceNumber"] = "0" - @hash_template["dynamodb"]["sizeBytes"] = size_bytes - @hash_template["dynamodb"]["streamViewType"] = @view_type.upcase + @hash_template["dynamodb"] = Hash.new + @hash_template["dynamodb"]["keys"] = Hash.new + size_bytes = calculate_key_size_in_bytes(log) + @key_schema.each { |x| + @hash_template["dynamodb"]["keys"][x] = data_hash[x] + } + unless @view_type == "keys_only" + size_bytes += new_image_size + @hash_template["dynamodb"]["newImage"] = data_hash + end + @hash_template["dynamodb"]["sequenceNumber"] = "0" + @hash_template["dynamodb"]["sizeBytes"] = size_bytes + @hash_template["dynamodb"]["streamViewType"] = @view_type.upcase - return parse_view_type(@hash_template) - end + return parse_view_type(@hash_template) + end - public - def parse_stream(log) - return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"]) - end + public + def parse_stream(log) + return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"]) + end - private - def calculate_key_size_in_bytes(record) - key_size = 0 - @key_schema.each { |x| - key_size += x.length - value = record.get(x) - if not value.getB().nil? - b = value.getB(); - key_size += Base64.decode64(b).length - elsif not value.getS().nil? - s = value.getS(); - key_size += s.length; - elsif not value.getN().nil? - key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER; - end - } - return key_size - end + private + def calculate_key_size_in_bytes(record) + key_size = 0 + @key_schema.each { |x| + key_size += x.length + value = record.get(x) + if !(value.getB().nil?) + b = value.getB(); + key_size += Base64.decode64(b).length + elsif !(value.getS().nil?) + s = value.getS(); + key_size += s.length; + elsif !(value.getN().nil?) + key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER; + end + } + return key_size + end - private - def parse_view_type(hash) - if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN - return hash.to_json - end - case @view_type - when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY - return parse_format(hash["dynamodb"]["keys"]) - when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE - return parse_format(hash["dynamodb"]["oldImage"]) - when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE - return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb. - end - end + private + def parse_view_type(hash) + if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN + return hash.to_json + end + case @view_type + when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY + return parse_format(hash["dynamodb"]["keys"]) + when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE + return parse_format(hash["dynamodb"]["oldImage"]) + when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE + return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb. + end + end - private - def parse_format(hash) - if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB - return hash.to_json - else - return dynamodb_to_json(hash) - end - end + private + def parse_format(hash) + if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB + return hash.to_json + else + return dynamodb_to_json(hash) + end + end - private - def dynamodb_to_json(hash) - return formatAttributeValueMap(hash).to_json - end + private + def dynamodb_to_json(hash) + return formatAttributeValueMap(hash).to_json + end - private - def formatAttributeValueMap(hash) - keys_to_delete = [] - hash.each do |k, v| - dynamodb_key = v.keys.first - dynamodb_value = v.values.first - if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B") - keys_to_delete.push(k) # remove binary values and binary sets - next - end - hash[k] = formatAttributeValue(v.keys.first, v.values.first) - end - keys_to_delete.each {|key| hash.delete(key)} - return hash - end + private + def formatAttributeValueMap(hash) + keys_to_delete = [] + hash.each do |k, v| + dynamodb_key = v.keys.first + dynamodb_value = v.values.first + if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B") + keys_to_delete.push(k) # remove binary values and binary sets + next + end + hash[k] = formatAttributeValue(v.keys.first, v.values.first) + end + keys_to_delete.each {|key| hash.delete(key)} + return hash + end + + private + def formatAttributeValue(key, value) + case key + when "M" + formatAttributeValueMap(value) + when "L" + value.map! do |v| + v = formatAttributeValue(v.keys.first, v.values.first) + end + when "NS","SS","BS" + value.map! do |v| + v = formatAttributeValue(key[0], v) + end + when "N" + BigDecimal.new(value) + when "NULL" + nil + else + value + end + end - private - def formatAttributeValue(key, value) - case key - when "M" - formatAttributeValueMap(value) - when "L" - value.map! do |v| - v = formatAttributeValue(v.keys.first, v.values.first) - end - when "NS","SS","BS" - value.map! do |v| - v = formatAttributeValue(key[0], v) end - return value - when "N" - return BigDecimal.new(value) - when "NULL" - return nil - else - return value end end - end diff --git a/lib/logstash/inputs/LogStashRecordProcessor.rb b/lib/logstash/inputs/LogStashRecordProcessor.rb index c21970e..3c0f7f0 100644 --- a/lib/logstash/inputs/LogStashRecordProcessor.rb +++ b/lib/logstash/inputs/LogStashRecordProcessor.rb @@ -15,51 +15,54 @@ #limitations under the License. # require "java" -begin - require 'jar-dependencies' - require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' ) - require_jar( 'log4j', 'log4j', '1.2.17' ) -end + +require "logstash-input-dynamodb_jars" java_import "com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason" java_import "java.lang.IllegalStateException" java_import "org.apache.log4j.LogManager" -class LogStashRecordProcessor - include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor +module Logstash + module Inputs + module DynamoDB + class LogStashRecordProcessor + include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor - attr_accessor :queue, :shard_id + attr_accessor :queue, :shard_id - def initialize(queue) - # Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor. - # No good way to overload methods in JRuby, so deciding which was supposed to be called here. - if (queue.is_a? String) - @shard_id = queue - return - else - @queue ||= queue - @logger ||= LogStash::Inputs::DynamoDB.logger - end - end + def initialize(queue) + # Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor. + # No good way to overload methods in JRuby, so deciding which was supposed to be called here. + if (queue.is_a? String) + @shard_id = queue + return + else + @queue ||= queue + @logger ||= LogStash::Inputs::DynamoDB.logger + end + end - def process_records(records, checkpointer) - @logger.debug("Processing batch of " + records.size().to_s + " records") - records.each do |record| - @queue.push(record) - end - #checkpoint once all of the records have been consumed - checkpointer.checkpoint() - end + def process_records(records, checkpointer) + @logger.debug("Processing batch of " + records.size().to_s + " records") + records.each do |record| + @queue.push(record) + end + #checkpoint once all of the records have been consumed + checkpointer.checkpoint() + end - def shutdown(checkpointer, reason) - case reason - when ShutdownReason::TERMINATE - checkpointer.checkpoint() - when ShutdownReason::ZOMBIE - else - raise RuntimeError, "Invalid shutdown reason." - end - unless @shard_id.nil? - @logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s) + def shutdown(checkpointer, reason) + case reason + when ShutdownReason::TERMINATE + checkpointer.checkpoint() + when ShutdownReason::ZOMBIE + else + raise RuntimeError, "Invalid shutdown reason." + end + unless @shard_id.nil? + @logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s) + end + end + end end end end diff --git a/lib/logstash/inputs/LogStashRecordProcessorFactory.rb b/lib/logstash/inputs/LogStashRecordProcessorFactory.rb index 0733024..e44d18a 100644 --- a/lib/logstash/inputs/LogStashRecordProcessorFactory.rb +++ b/lib/logstash/inputs/LogStashRecordProcessorFactory.rb @@ -16,24 +16,28 @@ # require 'java' require_relative "LogStashRecordProcessor" -begin - require 'jar-dependencies' - require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' ) -end + +require "logstash-input-dynamodb_jars" module KCL include_package "com.amazonaws.services.kinesis.clientlibrary.interfaces" end -class LogStashRecordProcessorFactory - include KCL::IRecordProcessorFactory +module Logstash + module Inputs + module DynamoDB + class LogStashRecordProcessorFactory + include KCL::IRecordProcessorFactory - def initialize(queue) - @queue ||= queue - end + def initialize(queue) + @queue ||= queue + end - def create_processor - return LogStashRecordProcessor.new(@queue) - end + def create_processor + return Logstash::Inputs::DynamoDB::LogStashRecordProcessor.new(@queue) + end + end + end + end end diff --git a/lib/logstash/inputs/dynamodb.rb b/lib/logstash/inputs/dynamodb.rb index 65907d9..97efa15 100644 --- a/lib/logstash/inputs/dynamodb.rb +++ b/lib/logstash/inputs/dynamodb.rb @@ -22,26 +22,9 @@ require_relative "LogStashRecordProcessorFactory" require_relative "DynamoDBLogParser" -#Java Dependencies: KCL, Streams Adapter, Log4j, Preview AWS SDK -require 'java' -begin - require 'jar-dependencies' - require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' ) - require_jar( 'log4j', 'log4j', '1.2.17' ) - require_jar( 'com.google.guava', 'guava', '15.0' ) - require_jar( 'com.amazonaws', 'aws-java-sdk-core', '1.10.10' ) - require_jar( 'com.amazonaws', 'aws-java-sdk-cloudwatch', '1.10.8' ) - require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' ) - require_jar( 'com.amazonaws', 'aws-java-sdk-kinesis', '1.10.8' ) - require_jar( 'com.amazonaws', 'dynamodb-streams-kinesis-adapter', '1.0.0' ) - require_jar( 'com.amazonaws', 'dynamodb-import-export-tool', '1.0.0' ) - require_jar( 'commons-logging', 'commons-logging', '1.1.3') - require_jar( 'org.apache.httpcomponents', 'httpclient', '4.3.6') - require_jar( 'org.apache.httpcomponents', 'httpcore', '4.3.3') - require_jar( 'com.fasterxml.jackson.core', 'jackson-databind', '2.5.3' ) - require_jar( 'joda-time', 'joda-time', '2.8.1') -end +require "logstash-input-dynamodb_jars" +require 'java' java_import "com.amazonaws.AmazonClientException" java_import "org.apache.log4j.LogManager" java_import "org.apache.log4j.Level" @@ -145,7 +128,7 @@ class LogStash::Inputs::DynamoDB < LogStash::Inputs::Base public def build_credentials - if not @aws_access_key_id.to_s.empty? and not @aws_secret_access_key.to_s.empty? + if !@aws_access_key_id.to_s.empty? and !@aws_secret_access_key.to_s.empty? @logger.info("Using static credentials: " + @aws_access_key_id + ", " + @aws_secret_access_key) basic = AmazonCredentials::BasicAWSCredentials.new(@aws_access_key_id, @aws_secret_access_key) return AmazonCredentials::StaticCredentialsProvider.new(basic) @@ -168,7 +151,7 @@ def register if @perform_scan and @view_type == VT_OLD_IMAGE raise(LogStash::ConfigurationError, "Cannot perform scan with view type: " + @view_type + " configuration") end - if @view_type == VT_ALL_IMAGES and (not @log_format == LF_PLAIN) + if @view_type == VT_ALL_IMAGES and !(@log_format == LF_PLAIN) raise(LogStash::ConfigurationError, "Cannot show view_type: " + @view_type + ", with log_format: " + @log_format) end @@ -190,7 +173,7 @@ def register end region = RegionUtils.getRegionByEndpoint(@endpoint) - @parser ||= DynamoDBLogParser.new(@view_type, @log_format, @key_schema, region) + @parser ||= Logstash::Inputs::DynamoDB::DynamoDBLogParser.new(@view_type, @log_format, @key_schema, region) if @perform_stream setup_stream @@ -232,7 +215,7 @@ def setup_stream dynamodb_streams_client = AmazonDynamoDB::AmazonDynamoDBStreamsClient.new(@credentials, @client_configuration) adapter = Java::ComAmazonawsServicesDynamodbv2Streamsadapter::AmazonDynamoDBStreamsAdapterClient.new(@credentials) - if not @streams_endpoint.nil? + if !@streams_endpoint.nil? adapter.setEndpoint(@streams_endpoint) dynamodb_streams_client.setEndpoint(@streams_endpoint) @logger.info("DynamoDB Streams endpoint: " + @streams_endpoint) @@ -263,7 +246,7 @@ def setup_stream stream_status = stream_description.getStreamStatus() end # while not active - if not stream_status == "ENABLED" + if !(stream_status == "ENABLED") raise(LogStash::PluginLoadingError, "No streams are enabled") end # if not active @logger.info("Stream Id: " + stream_arn) @@ -283,7 +266,7 @@ def setup_stream kclMetricsLogger.setAdditivity(false) kclMetricsLogger.setLevel(Level::OFF) end # if @publish_metrics - @worker = KCL::Worker.new(LogStashRecordProcessorFactory.new(@queue), kcl_config, adapter, @dynamodb_client, cloudwatch_client) + @worker = KCL::Worker.new(Logstash::Inputs::DynamoDB::LogStashRecordProcessorFactory.new(@queue), kcl_config, adapter, @dynamodb_client, cloudwatch_client) end # def setup_stream private diff --git a/logstash-input-dynamodb.gemspec b/logstash-input-dynamodb.gemspec index b901f46..db48e70 100644 --- a/logstash-input-dynamodb.gemspec +++ b/logstash-input-dynamodb.gemspec @@ -8,6 +8,7 @@ Gem::Specification.new do |s| s.email = 'dynamodb-interest@amazon.com' s.homepage = "https://github.com/logstash-plugins/logstash-input-dynamodb" s.require_paths = ["lib"] + s.platform = 'java' # Files s.files = `git ls-files`.split($\) diff --git a/spec/inputs/dynamodb_spec.rb b/spec/inputs/dynamodb_spec.rb index 82020ca..a1e6811 100644 --- a/spec/inputs/dynamodb_spec.rb +++ b/spec/inputs/dynamodb_spec.rb @@ -14,15 +14,7 @@ #See the License for the specific language governing permissions and #limitations under the License. # -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/dynamodb" -require "rspec/expectations" -require "rspec/mocks" -require "mocha" - -RSpec.configure do |config| - config.mock_with :mocha -end +require "spec/spec_helper" class LogStash::Inputs::TestDynamoDB < LogStash::Inputs::DynamoDB default :codec, 'json' @@ -36,7 +28,7 @@ def queue_event(event, logstash_queue, host) super(event, logstash_queue, host) # Add additional item to plugin's queue to ensure run() flushes queue before shutting down. # Queue the event and then shutdown, otherwise the threads would run forever - if shutdown_count === 0 + if shutdown_count == 0 @shutdown_count += 1 @queue << "additional event stuck in queue during shutdown" raise LogStash::ShutdownSignal @@ -61,10 +53,6 @@ def parse_stream(msg) end describe 'inputs/dynamodb' do - let (:empty_config) {{}} - let (:tablename) {{'table_name' => 'test tablename', 'view_type' => "new_and_old_images", "endpoint" => "some endpoint"}} - let (:invalid_aws_credentials_config) {{'table_name' => 'test tablename', "endpoint" => "some endpoint", 'aws_access_key_id' => 'invalid', 'aws_secret_access_key' => 'invalid_also', 'view_type' => "new_and_old_images", "streams_endpoint" => "some streams endpoint"}} - let (:invalid_aws_credentials_config_no_endpoints) {{'table_name' => 'test tablename', 'aws_access_key_id' => 'invalid', 'aws_secret_access_key' => 'invalid_also', 'view_type' => "new_and_old_images"}} let (:dynamodb_client) {mock("AmazonDynamoDB::AmazonDynamoDBClient")} let (:dynamodb_streams_client) {mock("AmazonDynamoDB::AmazonDynamoDBStreamsClient")} let (:adapter) {mock("AmazonDynamoDB::AmazonDynamoDBStreamsAdapterClient")} @@ -75,7 +63,7 @@ def allow_invalid_credentials(stream_status = "ENABLED", error_to_raise = nil) AmazonDynamoDB::AmazonDynamoDBClient.expects(:new).returns(dynamodb_client) AmazonDynamoDB::AmazonDynamoDBStreamsClient.expects(:new).returns(dynamodb_streams_client) AmazonDynamoDB::AmazonDynamoDBStreamsAdapterClient.expects(:new).returns(adapter) - DynamoDBLogParser.expects(:new).returns(TestParser.new()) + Logstash::Inputs::DynamoDB::DynamoDBLogParser.expects(:new).returns(TestParser.new()) RegionUtils.expects(:getRegionByEndpoint).with("some endpoint").returns("some region") mock_table_description = stub @@ -110,18 +98,21 @@ def allow_invalid_credentials(stream_status = "ENABLED", error_to_raise = nil) end it "should need endpoint" do - tablename.delete("endpoint") - expect {LogStash::Plugin.lookup("input", "dynamodb").new(tablename)}.to raise_error(LogStash::ConfigurationError) + config = tablename + config.delete("endpoint") + expect {LogStash::Plugin.lookup("input", "dynamodb").new(config)}.to raise_error(LogStash::ConfigurationError) end it "should need table_name config" do - tablename.delete("table_name") - expect {LogStash::Plugin.lookup("input", "dynamodb").new(tablename)}.to raise_error(LogStash::ConfigurationError) + config = tablename + config.delete("table_name") + expect {LogStash::Plugin.lookup("input", "dynamodb").new(config)}.to raise_error(LogStash::ConfigurationError) end it "should need view_type config" do - tablename.delete("view_type") - expect {LogStash::Plugin.lookup("input", "dynamodb").new(tablename)}.to raise_error(LogStash::ConfigurationError) + config = tablename + config.delete("view_type") + expect {LogStash::Plugin.lookup("input", "dynamodb").new(config)}.to raise_error(LogStash::ConfigurationError) end it "should use default AWS credentials " do diff --git a/spec/log_parser_spec.rb b/spec/log_parser_spec.rb index f2a3e69..4b9e7bd 100644 --- a/spec/log_parser_spec.rb +++ b/spec/log_parser_spec.rb @@ -14,19 +14,9 @@ #See the License for the specific language governing permissions and #limitations under the License. # -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/DynamoDBLogParser" -require "logstash/inputs/dynamodb" -require "rspec/expectations" -require "rspec/mocks" -require "mocha" -require "java" +require "spec/spec_helper" -RSpec.configure do |config| - config.mock_with :mocha -end - -class DynamoDBLogParserTest < DynamoDBLogParser +class Logstash::Inputs::DynamoDB::DynamoDBLogParserTest < Logstash::Inputs::DynamoDB::DynamoDBLogParser private def calculate_key_size_in_bytes(record) @@ -37,12 +27,6 @@ def calculate_key_size_in_bytes(record) describe "inputs/LogParser" do let (:object_mapper) {mock("ObjectMapper")} - let (:key_schema) {["TBCZDPHPXUTOTYGP", "some bin key"]} - let (:sample_scan_result) {{"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, "some bin key" => {"B" => "actualbinval"}}} - let (:sample_stream_result) {{"internalObject" => {"eventID" => "0","eventName" => "INSERT","eventVersion" => "1.0", \ - "eventSource" => "aws:dynamodb","awsRegion" => "us-west-1","dynamodb" => {"keys" => {"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, \ - "some bin key" => {"B" => "actualbinval"}}, "newImage" => {"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, \ - "some bin key" => {"B" => "actualbinval"}},"sequenceNumber" => "0","sizeBytes" => 48,"streamViewType" => LogStash::Inputs::DynamoDB::VT_ALL_IMAGES.upcase}}}} before(:each) do Java::comFasterxmlJacksonDatabind::ObjectMapper.expects(:new).returns(object_mapper) @@ -63,7 +47,7 @@ def expect_parse_scan() it "should parse a scan and parse a stream the same way" do expect_parse_stream expect_parse_scan - parser = DynamoDBLogParserTest.new(LogStash::Inputs::DynamoDB::VT_ALL_IMAGES, LogStash::Inputs::DynamoDB::LF_PLAIN, key_schema, "us-west-1") + parser = Logstash::Inputs::DynamoDB::DynamoDBLogParserTest.new(LogStash::Inputs::DynamoDB::VT_ALL_IMAGES, LogStash::Inputs::DynamoDB::LF_PLAIN, key_schema, "us-west-1") scan_after_parse = parser.parse_scan(sample_scan_result, 38) stream_after_parse = parser.parse_stream(sample_stream_result) expect(scan_after_parse).to eq(stream_after_parse) @@ -71,7 +55,7 @@ def expect_parse_scan() it "should drop binary values when parsing into a json with the correct configuration" do expect_parse_scan - parser = DynamoDBLogParserTest.new(LogStash::Inputs::DynamoDB::VT_NEW_IMAGE, LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN, key_schema, "us-west-1") + parser = Logstash::Inputs::DynamoDB::DynamoDBLogParserTest.new(LogStash::Inputs::DynamoDB::VT_NEW_IMAGE, LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN, key_schema, "us-west-1") result = parser.parse_scan(sample_scan_result, 38) expect(result).to eq({"TBCZDPHPXUTOTYGP" => "sampleString"}.to_json) end diff --git a/spec/record_processor_and_factory_spec.rb b/spec/record_processor_and_factory_spec.rb index 696509e..7ab3e5d 100644 --- a/spec/record_processor_and_factory_spec.rb +++ b/spec/record_processor_and_factory_spec.rb @@ -14,24 +14,16 @@ #See the License for the specific language governing permissions and #limitations under the License. # -require "logstash/devutils/rspec/spec_helper" -require "rspec/expectations" -require "rspec/mocks" -require "mocha" - -RSpec.configure do |config| - config.mock_with :mocha -end - +require "spec/spec_helper" describe 'inputs/LogStashRecordProcessor' do before(:each) do @queue = SizedQueue.new(20) - @processor = LogStashRecordProcessor.new(@queue) + @processor = Logstash::Inputs::DynamoDB::LogStashRecordProcessor.new(@queue) end it "should call setShardId when being called with a String" do - processor_with_shard = LogStashRecordProcessor.new("test shardId") + processor_with_shard = Logstash::Inputs::DynamoDB::LogStashRecordProcessor.new("test shardId") expect(processor_with_shard.shard_id).to eq("test shardId") end @@ -70,9 +62,9 @@ it "should create a new factory correctly and create a new LogStashRecordProcessor when called upon" do queue = SizedQueue.new(20) - factory = LogStashRecordProcessorFactory.new(queue) + factory = Logstash::Inputs::DynamoDB::LogStashRecordProcessorFactory.new(queue) processor = factory.create_processor - expect(processor).to be_an_instance_of(LogStashRecordProcessor) + expect(processor).to be_an_instance_of(Logstash::Inputs::DynamoDB::LogStashRecordProcessor) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6839d5f..633ea95 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -94,3 +94,41 @@ Kernel.srand config.seed =end end + +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/DynamoDBLogParser" +require "logstash/inputs/dynamodb" +require "rspec/expectations" +require "rspec/mocks" +require "mocha" +require "java" + +RSpec.configure do |config| + config.mock_with :mocha +end + +def empty_config + {} +end + +def tablename + {'table_name' => 'test tablename', 'view_type' => "new_and_old_images", "endpoint" => "some endpoint"} +end +def invalid_aws_credentials_config + {'table_name' => 'test tablename', "endpoint" => "some endpoint", 'aws_access_key_id' => 'invalid', 'aws_secret_access_key' => 'invalid_also', 'view_type' => "new_and_old_images", "streams_endpoint" => "some streams endpoint"} +end +def invalid_aws_credentials_config_no_endpoints + {'table_name' => 'test tablename', 'aws_access_key_id' => 'invalid', 'aws_secret_access_key' => 'invalid_also', 'view_type' => "new_and_old_images"} +end +def key_schema + ["TBCZDPHPXUTOTYGP", "some bin key"] +end +def sample_scan_result + {"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, "some bin key" => {"B" => "actualbinval"}} +end +def sample_stream_result + {"internalObject" => {"eventID" => "0","eventName" => "INSERT","eventVersion" => "1.0", \ + "eventSource" => "aws:dynamodb","awsRegion" => "us-west-1","dynamodb" => {"keys" => {"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, \ + "some bin key" => {"B" => "actualbinval"}}, "newImage" => {"TBCZDPHPXUTOTYGP" => {"S" => "sampleString"}, \ + "some bin key" => {"B" => "actualbinval"}},"sequenceNumber" => "0","sizeBytes" => 48,"streamViewType" => LogStash::Inputs::DynamoDB::VT_ALL_IMAGES.upcase}}} +end