Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse DynamoDB Stream events? #1212

Closed
phstc opened this issue Jun 10, 2016 · 25 comments · Fixed by #2460
Closed

Parse DynamoDB Stream events? #1212

phstc opened this issue Jun 10, 2016 · 25 comments · Fixed by #2460
Labels
feature-request A feature should be added or improved.

Comments

@phstc
Copy link

phstc commented Jun 10, 2016

Hey

Is there a way to parse DynamoDB Stream events using the SDK? I receive a JSON from a DynamoDB stream in a process, and I would like to parse the NewImage into a "normal" hash, without the types S, M etc as root keys.

The JSON I receive:

{
  "eventID":"",
  "eventName":"MODIFY",
  "eventVersion":"1.1",
  "eventSource":"aws:dynamodb",
  "awsRegion":"us-east-1",
  "dynamodb":{
    "ApproximateCreationDateTime":123,
    "Keys":{
      "object_id":{
        "S":"123"
      }
    },
    "NewImage":{
      "name":{
        "S":"NAME"
      },
      "hash_value":{
        "M":{
          "sub_hash_value":{
            "M":{
              "second_name":{
                "S":"name"
              },

And I would like to parse the NewImage to:

{ 
  "object_id": "123", 
  "name": "name", 
  "hash_value": { 
    "sub_hash_value":  { "second_name": "name" } 
  }
}

Is there any helper in the SDK that parses it?

@phstc
Copy link
Author

phstc commented Jun 11, 2016

Is there any helper in the SDK that parses it?

Something similar to https://github.com/awslabs/logstash-input-dynamodb/blob/master/lib/logstash/inputs/DynamoDBLogParser.rb#L121-L161

@trevorrowe
Copy link
Member

There is not a public interface for converting DynamoDB stream events to vanilla Ruby hashes and types. I think this would be a helpful interface though. We can track this as a feature request.

@trevorrowe trevorrowe added feature-request A feature should be added or improved. Version 2 labels Jun 15, 2016
@trevorrowe
Copy link
Member

I've moved this to our public backlog.

@phstc
Copy link
Author

phstc commented Jun 16, 2016

Thanks @trevorrowe

awood45 added a commit that referenced this issue Jun 21, 2016
@cjyclaire
Copy link
Contributor

@phstc could you provide more information for the usage scenario? I'm going to work on this feature request, so it would be nice if you could provide more information.

For example, How did you receive this JSON output? Perhaps you are integrating DynamoDB stream with Lambda? Or it comes from GetRecord api of DynamoDB stream?

Cheers!

@phstc
Copy link
Author

phstc commented Jun 27, 2016

Hey @cjyclaire

I'm going to work on this feature request

🍻

I have a stream connected to a lambda that enqueues DynamoDB records to an SQS queue. Then I process these messages using a Ruby worker, that needs this event parser.

But in the Logstash snippet I used as an example above, they are receiving the JSON from the DynamoDB stream directly.

@cjyclaire
Copy link
Contributor

@phstc , good to know, thanks!

@trevorrowe
Copy link
Member

Currently Aws::DynamoDBStreams::Client#get_records returns the DynamoDB attribute values as Ruby struct objects that have the union of possible types, i.e. #<struct s=nil ss=nil n=123 ns=nil ...>. The Aws::DynamoDB::Client has a plugin that automatically converts these attribute value structs to vanilla Ruby values (e.g. 123 from the previous example). It is possible to update this method to behave the like Aws::DynamoDB::Client operations, but it would have to be an opt-in feature to preserve backwards compatibility.

Adding a utlity to parse the JSON document passed to the Lambda function would be pretty straight-forward. We have two options for this interface:

  • Return a vanilla Ruby hash as a result of a JSON parse, converting only the attribute values.
  • Return a struct object in the same format as #get_records with the attribute values converted to simple Ruby objects.

Thoughts?

@cjyclaire
Copy link
Contributor

Agreed on preserving backwards compatibility.

I'd prefer the 2nd option while treats "structs-to-vanilla-hash" as an opt-in feature. In this way, the consistency in response is maintained and vanilla hash enhancement is also available.

@phstc
Copy link
Author

phstc commented Jun 28, 2016

Return a vanilla Ruby hash as a result of a JSON parse, converting only the attribute values.

I kind of prefer the 1st option, so I can freely use the hash in the same way I use hashes returned from the DynamoDB queries, scans etc.

@cjyclaire
Copy link
Contributor

That's true, in terms of usage, the 1st option does make more sense.

@trevorrowe
Copy link
Member

I kind of prefer the 1st option, so I can freely use the hash in the same way I use hashes returned from the DynamoDB queries, scans etc.

@phstc Can you clarify what you mean? Aws::DynamoDB::Client today does not return a hash object in response #get_item, #query or #scan. These responses are primary composed of nested struct objects. Only when you reach the attribute values of the response, does it convert down to a hash.

I think maybe the options I proposed above were poorly phrased. Given the following, completely made-up, JSON document as sent to a lambda function:

{ 
   "awsRegion": "us-east-1",
   "dynamodb": { 
      "ApproximateCreationDateTime": 123456789,
      "Keys": { 
         "id" : { 
            "S": "abc",
         }
      },
      "NewImage": { 
         "id" : { 
            "S": "abc",
         },
         "name" : { 
            "S": "new-name"
         },
         "size" : { 
            "N": 456
         },
         "enabled" : { 
            "BOOL": true
         }
      },
      "OldImage": { 
         "id" : { 
            "S": "abc",
         },
         "name" : { 
            "S": "old-name"
         },
         "size" : { 
            "N": 123
         },
         "enabled" : { 
            "BOOL": false
         }
      },
      "SequenceNumber": "seq-num",
      "SizeBytes": 1245,
      "StreamViewType": "view-type"
   },
   "eventID": "event-id",
   "eventName": "event-name",
   "eventSource": "event-source",
   "eventVersion": "event-version"
}

Option 1: Minimal conversion of attribute values

JSON parses the document and convert only attribute values to their ruby equivalents. Other values, like the ApproximateCreationDateTime would not be converted and would remain numeric instead of being converted to a Time object.

{ 
  "awsRegion" => "us-east-1",
  "dynamodb" => { 
     "ApproximateCreationDateTime" => 123456789,
     "Keys" => { 
       "id" => "abc"
     },
     "NewImage" => { 
       "id" => "abc",
       "name" => "new-name"
       "size" => 456,
       "enabled" => true
     },
     "OldImage" => { 
       "id" => "abc",
       "name" => "old-name"
       "size" => 123,
       "enabled" => false
     },
     "SequenceNumber" => "seq-num",
     "SizeBytes" => 1245,
     "StreamViewType" => "view-type"
  },
  "eventID" => "event-id",
  "eventName" => "event-name",
  "eventSource" => "event-source",
  "eventVersion" => "event-version"
}

Option 2: Convert to SDK struct types + attribute values

Convert the JSON into the types defined by the SDK. Attribute values would be represented with vanilla ruby hashes / values, not their struct type. Note that non-attribute value types would be converted (like the approximate_creation_date_time below). Also, structs use snake_case instead of the default casing.

#<struct Aws::DynamoDBStreams::Types::Record
 event_id="event-id",
 event_name="event-name",
 event_version="event-version",
 event_source="event-source",
 aws_region="us-east-1",
 dynamodb=
  #<struct Aws::DynamoDBStreams::Types::StreamRecord
   approximate_creation_date_time=1973-11-29 13:33:09 -0800,
   keys={"id"=>"abc"},
   new_image={"id"=>"abc","name"=>"new-name", "size"=>456, "enabled"=>true},
   old_image={"id"=>"abc","name"=>"old-name", "size"=>123, "enabled"=>false},
   sequence_number="seq-num",
   size_bytes=1245,
   stream_view_type="view-type">>

Thoughts?

@phstc
Copy link
Author

phstc commented Jun 28, 2016

@trevorrowe sure, I was thinking about response.items that returns an array with Hashes. But I understand your point.

For my specific use case, what I honestly need only is something that converts NewImage into a Ruby vanilla hash. But I have the entire event payload as well, so something that converts it entirely or NewImage is good enough for me. Regarding to 1st or 2nd, both work for me. The second option feels like more the aws-sdk-ruby way.

@RedaBenh
Copy link

you can use this quick and dirty method to convert
(this manage only the basic types)


def convert_dynamodb_stream_to_hash(db_record)
  records = {}
  db_record.each do |k, v|
    val = nil
     v.each do |key, value|
      val = case key.to_s
            when 'S' then value.to_s
            when 'N' then value.to_i
            when 'L' then value.map{|i| i.values}.flatten
            when 'M' then convert_dynamodb_stream_to_hash(value)
            else value
          end
    end
    records[k] = val
  end
  records
end

@alexperto
Copy link

In case someone else needs it, based on @RedaBenh's example, I created a class for parsing the whole dynamoDB event:

https://gist.github.com/alexperto/eb88db235d66bda85979fe08d047b18f

@mullermp
Copy link
Contributor

Reopening - deprecating usage of Feature Requests backlog markdown file.

@mullermp mullermp reopened this Oct 21, 2019
@mullermp mullermp removed the v2 label Oct 21, 2019
@jmonsanto
Copy link

Is this still on the pipeline? Really having problems converting DynamoDB JSON to plain ruby hashes to the lambda. Also, the examples given cannot handle deeply nested structures.

@mullermp
Copy link
Contributor

mullermp commented Oct 2, 2020

This isn't actively being worked on. I'm happy to take any pull requests. I imagine that the implementation would be very similar to the dynamodb simple attributes plugin (https://github.com/aws/aws-sdk-ruby/blob/master/gems/aws-sdk-dynamodb/lib/aws-sdk-dynamodb/plugins/simple_attributes.rb). If I'm understanding the problem correctly, the output of dynamodb streams is similar to those of dynamodb and you'd want to use the unmarshaller to convert to simple types.

@watmin
Copy link

watmin commented Dec 27, 2020

Just dug through the sdk code and came up with this, needs some polish but got me unblocked.

def extract_item(image)
  shape_ref = Aws::DynamoDB::ClientApi::Shapes::ShapeRef.new(shape: Aws::DynamoDB::ClientApi::GetItemOutput)
  parser = Aws::Json::Parser.new(shape_ref)
  translator = Aws::DynamoDB::Plugins::SimpleAttributes::ValueTranslator.new(shape_ref, :unmarshal)
  translator.apply(parser.parse(JSON.generate('Item' => image))).item
end

Demo

Creating a complex demo item:

ddb.put_item(table_name: 'Demo', item: { pk: 'demo', sk: 'watwat3', some: 'data', nested: { array: [1,2,3,'four'], hash: { complex: true, set: Set.new([1,1,1,1]) } } })

Extracting the new image from each of context records:

[136] pry(main)> context['Records'].map { |record| extract_item(record['dynamodb']['NewImage']) }
=> [{"some"=>"data",
  "sk"=>"watwat3",
  "pk"=>"demo",
  "nested"=>
   {"array"=>[0.1e1, 0.2e1, 0.3e1, "four"],
    "hash"=>{"set"=>#<Set: {0.1e1}>, "complex"=>true}}}]
[137] pry(main)>

DDB Trigger context:

{
    "Records": [
        {
            "eventID": "bc69ab5e64ef472f0a4b330fa3bfe29f",
            "eventName": "MODIFY",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-west-2",
            "dynamodb": {
                "ApproximateCreationDateTime": 1609037147,
                "Keys": {
                    "sk": {
                        "S": "watwat3"
                    },
                    "pk": {
                        "S": "demo"
                    }
                },
                "NewImage": {
                    "some": {
                        "S": "data"
                    },
                    "sk": {
                        "S": "watwat3"
                    },
                    "pk": {
                        "S": "demo"
                    },
                    "nested": {
                        "M": {
                            "array": {
                                "L": [
                                    {
                                        "N": "1"
                                    },
                                    {
                                        "N": "2"
                                    },
                                    {
                                        "N": "3"
                                    },
                                    {
                                        "S": "four"
                                    }
                                ]
                            },
                            "hash": {
                                "M": {
                                    "set": {
                                        "NS": [
                                            "1"
                                        ]
                                    },
                                    "complex": {
                                        "BOOL": true
                                    }
                                }
                            }
                        }
                    }
                },
                "OldImage": {
                    "some": {
                        "S": "data"
                    },
                    "sk": {
                        "S": "watwat3"
                    },
                    "pk": {
                        "S": "demo"
                    }
                },
                "SequenceNumber": "78600000000028145232611",
                "SizeBytes": 116,
                "StreamViewType": "NEW_AND_OLD_IMAGES"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-west-2:<not you>:table/Demo/stream/2020-12-27T01:31:08.439"
        }
    ]
}

@saluzafa
Copy link

saluzafa commented Jan 6, 2021

Hello,

I was facing the same problem, I needed to parse DynamoDB stream records, so I created these gems:

Test file:

require 'aws-sdk-dynamodb-attribute-deserializer'
require 'aws-sdk-dynamodbstreams-event-parser'

test_item = {
  'string' => {
    'S' => 'test string'
  },
  'list' => {
    'L' => [{ 'S' => 's1' }, { 'S' => 's2' }, { 'N' => '5' }]
  },
  'stringset' => {
    'SS' => ['ss1']
  },
  'numberset' => {
    'NS' => ['123']
  },
  'map' => {
    'M' => {
      'country_code' => { 'S' => 'FR' },
      'sublist' => { 'L' => [{ 'N' => '123' }] },
      'ns' => { 'NS' => ['123'] },
      'submap' => {
        'M' => {
          'subkey' => { 'S' => 'substring' },
          'sublist' => { 'L' => [{ 'S' => 'test' }] },
          'submap2' => { 'M' => { 'submap2_key1' => { 'S' => 'test' } } },
          'sublist2' => { 'L' => [{ 'M' => { 'submap2_key1' => { 'S' => 'test' } } }] }
        }
      }
    }
  },
  'boolean_true' => {
    'BOOL' => true
  },
  'boolean_false' => {
    'BOOL' => false
  },
  'binary' => { 'B' => 'QftNXxB13kBXD2x5ZmYrDQ==' },
  'binaryset' => { 'BS' => ['QftNXxB13kBXD2x5ZmYrDQ==', '9ubi62eYsx0H/MK6uQQgDA=='] }
}

raw_event = {
  'eventID' => '64f3f4c0c43db06b5c00418a42a6fff2',
  'eventName' => 'INSERT',
  'eventVersion' => '1.1',
  'eventSource' => 'aws:dynamodb',
  'awsRegion' => 'eu-west-3',
  'dynamodb' => {
    'ApproximateCreationDateTime' => 1609926587,
    'Keys' => {
      'string' => {
        'S' => 'test string'
      },
      'number' => {
        'N' => '123456'
      }
    },
    'NewImage' => test_item,
    'SequenceNumber' => '82061200000000005864545644',
    'SizeBytes' => 405,
    'StreamViewType' => 'NEW_AND_OLD_IMAGES'
  },
  'eventSourceARN' => 'arn:aws:dynamodb:eu-west-3:xxxxxxxx:table/xxxxxxxxx/stream/1970-01-06T00:00:00.000'
}

raw_event_str = raw_event.to_json

puts 'Parse item from raw DynamoDB attributes'
pp Aws::DynamoDB::AttributeDeserializer.call(test_item)
puts '-----------------------------------------------'

puts 'Parse event from JSON object'
event_parser = Aws::DynamoDBStreams::EventParser.new
event = event_parser.from(raw_event)
pp event
puts '-----------------------------------------------'

puts 'Parse event from JSON-encoded string'
event = event_parser.parse(raw_event_str)
pp event

Test file results:

Parse item from raw DynamoDB attributes
{"string"=>"test string",
 "list"=>["s1", "s2", "5"],
 "stringset"=>#<Set: {"ss1"}>,
 "numberset"=>#<Set: {"123"}>,
 "map"=>
  {"country_code"=>"FR",
   "sublist"=>["123"],
   "ns"=>#<Set: {"123"}>,
   "submap"=>
    {"subkey"=>"substring",
     "sublist"=>["test"],
     "submap2"=>{"submap2_key1"=>"test"},
     "sublist2"=>[{"submap2_key1"=>"test"}]}},
 "boolean_true"=>true,
 "boolean_false"=>false,
 "binary"=>"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
 "binaryset"=>
  #<Set: {"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
   "\xF6\xE6\xE2\xEBg\x98\xB3\x1D\a\xFC\xC2\xBA\xB9\x04 \f"}>}
-----------------------------------------------
Parse event from JSON object
#<struct Aws::DynamoDBStreams::EventParser::EventStruct
 event_id="64f3f4c0c43db06b5c00418a42a6fff2",
 event_name="INSERT",
 event_version="1.1",
 event_source="aws:dynamodb",
 event_source_arn=
  "arn:aws:dynamodb:eu-west-3:xxxxxxxx:table/xxxxxxxxx/stream/1970-01-06T00:00:00.000",
 aws_region="eu-west-3",
 dynamodb=
  #<struct Aws::DynamoDBStreams::EventParser::StreamRecordStruct
   approximate_creation_date_time=1609926587,
   keys={"string"=>"test string", "number"=>"123456"},
   new_image=
    {"string"=>"test string",
     "list"=>["s1", "s2", "5"],
     "stringset"=>#<Set: {"ss1"}>,
     "numberset"=>#<Set: {"123"}>,
     "map"=>
      {"country_code"=>"FR",
       "sublist"=>["123"],
       "ns"=>#<Set: {"123"}>,
       "submap"=>
        {"subkey"=>"substring",
         "sublist"=>["test"],
         "submap2"=>{"submap2_key1"=>"test"},
         "sublist2"=>[{"submap2_key1"=>"test"}]}},
     "boolean_true"=>true,
     "boolean_false"=>false,
     "binary"=>"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
     "binaryset"=>
      #<Set: {"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
       "\xF6\xE6\xE2\xEBg\x98\xB3\x1D\a\xFC\xC2\xBA\xB9\x04 \f"}>},
   old_image=nil,
   sequence_number="82061200000000005864545644",
   size_bytes=405,
   stream_view_type="NEW_AND_OLD_IMAGES">,
 user_identity=nil>
-----------------------------------------------
Parse event from JSON-encoded string
#<struct Aws::DynamoDBStreams::EventParser::EventStruct
 event_id="64f3f4c0c43db06b5c00418a42a6fff2",
 event_name="INSERT",
 event_version="1.1",
 event_source="aws:dynamodb",
 event_source_arn=
  "arn:aws:dynamodb:eu-west-3:xxxxxxxx:table/xxxxxxxxx/stream/1970-01-06T00:00:00.000",
 aws_region="eu-west-3",
 dynamodb=
  #<struct Aws::DynamoDBStreams::EventParser::StreamRecordStruct
   approximate_creation_date_time=1609926587,
   keys={"string"=>"test string", "number"=>"123456"},
   new_image=
    {"string"=>"test string",
     "list"=>["s1", "s2", "5"],
     "stringset"=>#<Set: {"ss1"}>,
     "numberset"=>#<Set: {"123"}>,
     "map"=>
      {"country_code"=>"FR",
       "sublist"=>["123"],
       "ns"=>#<Set: {"123"}>,
       "submap"=>
        {"subkey"=>"substring",
         "sublist"=>["test"],
         "submap2"=>{"submap2_key1"=>"test"},
         "sublist2"=>[{"submap2_key1"=>"test"}]}},
     "boolean_true"=>true,
     "boolean_false"=>false,
     "binary"=>"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
     "binaryset"=>
      #<Set: {"A\xFBM_\x10u\xDE@W\x0Flyff+\r",
       "\xF6\xE6\xE2\xEBg\x98\xB3\x1D\a\xFC\xC2\xBA\xB9\x04 \f"}>},
   old_image=nil,
   sequence_number="82061200000000005864545644",
   size_bytes=405,
   stream_view_type="NEW_AND_OLD_IMAGES">,
 user_identity=nil>

Cheers!

@mullermp
Copy link
Contributor

mullermp commented Jan 7, 2021

Thank you both for providing solutions. I've set aside some time to look at this and I'm currently investigating how this should fit in the SDK. This could either be a customization (utility class) for parsing, or it could be a plug-in that directly modifies the response records. If it were a plugin, it would have to be default turned off for backwards compatibility.

@saluzafa
Copy link

saluzafa commented Jan 8, 2021

Hello @mullermp

Best solution would be an utility class for parsing I think :)

Cheers!

@mullermp
Copy link
Contributor

mullermp commented Jan 8, 2021

@saluzafa Yes, there will be a utility class, but I will also add a plugin. There are two different inputs here - those using lambda to get an event (ruby hash), and those using get_records in the SDK (nested structs with hashes). Both have different input types for the same data, but we need the same output.

@github-actions
Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@mullermp
Copy link
Contributor

mullermp commented Jan 11, 2021

We've added a utility that you can use to parse lambda events into simple attributes:

event = Aws::DynamoDBStreams::AttributeTranslator.from_event(event)
puts event.records.first.dynamodb.new_image

If you're using the SDK with get_records (outside of lambda) you can use the simple_attributes: true option on your DynamoDBStreams Client.

client = Aws::DynamoDBStreams::Client.new(simple_attributes: true)
resp = client.get_records(...)
puts resp.records.first.dynamodb.new_image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request A feature should be added or improved.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants