Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Ddb To ES Lambda - Not updating removed or edited items. #4913

Closed
Rafcin opened this issue Jul 23, 2020 · 25 comments
Closed

Elasticsearch Ddb To ES Lambda - Not updating removed or edited items. #4913

Rafcin opened this issue Jul 23, 2020 · 25 comments
Labels
graphql-transformer-v1 Issue related to GraphQL Transformer v1 pending-triage Issue is pending triage @searchable Issues related to the @searchable GraphQL directive

Comments

@Rafcin
Copy link

Rafcin commented Jul 23, 2020

Describe the bug
I noticed when I remove items manually from DynamoDB the items on the ES index don't update at all and are left there. However adding items seems to work just fine.

To Reproduce
I'm not sure how one can reproduce it, I was using the latest amplify released a month ago if that helps but i'm unsure of what version I used.

If you would like logs or anything just let me know what logs from where and I will get them. As far as I can tell I looked around and couldn't find any errors regarding the deletion of an item.

Expected behavior
When removing items from Dynamo they should automatically update the ES index and update or remove the item.

@manueliglesias manueliglesias transferred this issue from aws-amplify/amplify-js Jul 23, 2020
@nikhname nikhname added @searchable Issues related to the @searchable GraphQL directive pending-triage Issue is pending triage graphql-transformer-v1 Issue related to GraphQL Transformer v1 labels Jul 23, 2020
@Rafcin
Copy link
Author

Rafcin commented Aug 5, 2020

From the people I have spoken with, so far no one knows why this is the case.

@Rafcin
Copy link
Author

Rafcin commented Aug 5, 2020

I've attached below the generated python script.

import base64
import datetime
import json
import logging
import os
import time
import traceback
from urllib.parse import urlparse, quote

from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.endpoint import BotocoreHTTPSession
from botocore.session import Session
from boto3.dynamodb.types import TypeDeserializer


# The following parameters are required to configure the ES cluster
ES_ENDPOINT = os.environ['ES_ENDPOINT']
ES_REGION = os.environ['ES_REGION']
DEBUG = True if os.environ['DEBUG'] == "1" else False
ES_USE_EXTERNAL_VERSIONING = True if os.environ['ES_USE_EXTERNAL_VERSIONING'] == "true" else False

# ElasticSearch 6 deprecated having multiple mapping types in an index. Default to doc.
DOC_TYPE = 'doc'
ES_MAX_RETRIES = 3              # Max number of retries for exponential backoff

logger = logging.getLogger()
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.info("Streaming to ElasticSearch")

# custom encoder changes
# - sets to lists
class DDBTypesEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)

# Subclass of boto's TypeDeserializer for DynamoDB to adjust for DynamoDB Stream format.
class StreamTypeDeserializer(TypeDeserializer):
    def _deserialize_n(self, value):
        return float(value)

    def _deserialize_b(self, value):
        return value  # Already in Base64


class ES_Exception(Exception):
    '''Capture status_code from request'''
    status_code = 0
    payload = ''

    def __init__(self, status_code, payload):
        self.status_code = status_code
        self.payload = payload
        Exception.__init__(
            self, 'ES_Exception: status_code={}, payload={}'.format(status_code, payload))


# Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request
def post_data_to_es(payload, region, creds, host, path, method='POST', proto='https://'):
    '''Post data to ES endpoint with SigV4 signed http headers'''
    req = AWSRequest(method=method, url=proto + host +
                    quote(path), data=payload, headers={'Host': host, 'Content-Type': 'application/json'})
    SigV4Auth(creds, 'es', region).add_auth(req)
    http_session = BotocoreHTTPSession()
    res = http_session.send(req.prepare())
    if res.status_code >= 200 and res.status_code <= 299:
        return res._content
    else:
        raise ES_Exception(res.status_code, res._content)


# High-level POST data to Amazon Elasticsearch Service with exponential backoff
# according to suggested algorithm: http://docs.aws.amazon.com/general/latest/gr/api-retries.html
def post_to_es(payload):
    '''Post data to ES cluster with exponential backoff'''

    # Get aws_region and credentials to post signed URL to ES
    es_region = ES_REGION or os.environ['AWS_REGION']
    session = Session({'region': es_region})
    creds = get_credentials(session)
    es_url = urlparse(ES_ENDPOINT)
    # Extract the domain name in ES_ENDPOINT
    es_endpoint = es_url.netloc or es_url.path

    # Post data with exponential backoff
    retries = 0
    while retries < ES_MAX_RETRIES:
        if retries > 0:
            seconds = (2 ** retries) * .1
            logger.debug('Waiting for %.1f seconds', seconds)
            time.sleep(seconds)

        try:
            es_ret_str = post_data_to_es(
                payload, es_region, creds, es_endpoint, '/_bulk')
            logger.debug('Return from ES: %s', es_ret_str)
            es_ret = json.loads(es_ret_str)

            if es_ret['errors']:
                logger.error(
                    'ES post unsuccessful, errors present, took=%sms', es_ret['took'])
                # Filter errors
                es_errors = [item for item in es_ret['items']
                            if item.get('index').get('error')]
                logger.error('List of items with errors: %s',
                            json.dumps(es_errors))
            else:
                logger.info('ES post successful, took=%sms', es_ret['took'])
            break  # Sending to ES was ok, break retry loop
        except ES_Exception as e:
            if (e.status_code >= 500) and (e.status_code <= 599):
                retries += 1  # Candidate for retry
            else:
                raise  # Stop retrying, re-raise exception


# Extracts the DynamoDB table from an ARN
# ex: arn:aws:dynamodb:eu-west-1:123456789012:table/table-name/stream/2015-11-13T09:23:17.104 should return 'table-name'
def get_table_name_from_arn(arn):
    return arn.split(':')[5].split('/')[1]


# Compute a compound doc index from the key(s) of the object in lexicographic order: "k1=key_val1|k2=key_val2"
def compute_doc_index(keys_raw, deserializer, formatIndex=False):
    index = []
    for key in sorted(keys_raw):
        if formatIndex:
            index.append('{}={}'.format(
                key, deserializer.deserialize(keys_raw[key])))
        else:
            index.append(deserializer.deserialize(keys_raw[key]))
    return '|'.join(index)

def _lambda_handler(event, context):
    logger.debug('Event: %s', event)
    records = event['Records']
    now = datetime.datetime.utcnow()

    ddb_deserializer = StreamTypeDeserializer()
    es_actions = []  # Items to be added/updated/removed from ES - for bulk API
    cnt_insert = cnt_modify = cnt_remove = 0

    for record in records:
        # Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay)
        logger.debug('Record: %s', record)
        if record.get('eventSource') == 'aws:dynamodb':
            ddb = record['dynamodb']
            ddb_table_name = get_table_name_from_arn(record['eventSourceARN'])
            doc_seq = ddb['SequenceNumber']
        elif record.get('eventSource') == 'aws:kinesis':
            ddb = json.loads(base64.b64decode(record['kinesis']['data']))
            ddb_table_name = ddb['SourceTable']
            doc_seq = record['kinesis']['sequenceNumber']
        else:
            logger.error('Ignoring non-DynamoDB event sources: %s',
                        record.get('eventSource'))
            continue

        # Compute DynamoDB table, type and index for item
        doc_table = ddb_table_name.lower()
        doc_type = DOC_TYPE
        doc_table_parts = doc_table.split('-')
        doc_es_index_name = doc_table_parts[0] if len(doc_table_parts) > 0  else doc_table

        # Dispatch according to event TYPE
        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        logger.debug('doc_table=%s, event_name=%s, seq=%s',
                    doc_table, event_name, doc_seq)

        # Treat events from a Kinesis stream as INSERTs
        if event_name == 'AWS:KINESIS:RECORD':
            event_name = 'INSERT'

        is_ddb_insert_or_update = (event_name == 'INSERT') or (event_name == 'MODIFY')
        is_ddb_delete = event_name == 'REMOVE'
        image_name = 'NewImage' if is_ddb_insert_or_update else 'OldImage'

        if image_name not in ddb:
            logger.warning(
                'Cannot process stream if it does not contain ' + image_name)
            continue
        logger.debug(image_name + ': %s', ddb[image_name])
        # Deserialize DynamoDB type to Python types
        doc_fields = ddb_deserializer.deserialize({'M': ddb[image_name]})
        
        # Sync enabled APIs do soft delete. We need to delete the record in ES if _deleted field is set
        if ES_USE_EXTERNAL_VERSIONING and event_name == 'MODIFY' and '_deleted' in  doc_fields and doc_fields['_deleted']:
            is_ddb_insert_or_update = False
            is_ddb_delete = True
            
         # Update counters
        if event_name == 'INSERT':
            cnt_insert += 1
        elif event_name == 'MODIFY':
            cnt_modify += 1
        elif event_name == 'REMOVE':
            cnt_remove += 1
        else:
            logger.warning('Unsupported event_name: %s', event_name)

        logger.debug('Deserialized doc_fields: %s', doc_fields)

        if ('Keys' in ddb):
            doc_id = compute_doc_index(ddb['Keys'], ddb_deserializer)
        else:
            logger.error('Cannot find keys in ddb record')

        # If DynamoDB INSERT or MODIFY, send 'index' to ES
        if is_ddb_insert_or_update:
            # Generate ES payload for item
            action = {'index': {'_index': doc_es_index_name,
                                '_type': doc_type,
                                '_id': doc_id}}
            # Add external versioning if necessary
            if ES_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['index'].update([
                    ('version_type', 'external'),
                    ('_version', doc_fields['_version'])
                ])
                doc_fields.pop('_ttl', None)
                doc_fields.pop('_version', None)
            # Append ES Action line with 'index' directive
            es_actions.append(json.dumps(action))
            # Append JSON payload
            es_actions.append(json.dumps(doc_fields, cls=DDBTypesEncoder))
            # migration step remove old key if it exists
            if ('id' in doc_fields) and (event_name == 'MODIFY') :
                action = {'delete': {'_index': doc_es_index_name, '_type': doc_type,
                    '_id': compute_doc_index(ddb['Keys'], ddb_deserializer, True)}}
                es_actions.append(json.dumps(action))
        # If DynamoDB REMOVE, send 'delete' to ES
        elif is_ddb_delete:
            action = {'delete': {'_index': doc_es_index_name,
                                '_type': doc_type, '_id': doc_id}}
            if ES_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['delete'].update([
                    ('version_type', 'external'),
                    ('_version', doc_fields['_version'])
                ])
            # Action line with 'delete' directive
            es_actions.append(json.dumps(action))

    # Prepare bulk payload
    es_actions.append('')  # Add one empty line to force final \n
    es_payload = '\n'.join(es_actions)
    logger.info('Posting to ES: inserts=%s updates=%s deletes=%s, total_lines=%s, bytes_total=%s',
                cnt_insert, cnt_modify, cnt_remove, len(es_actions) - 1, len(es_payload))
    post_to_es(es_payload)  # Post to ES with exponential backoff


# Global lambda handler - catches all exceptions to avoid dead letter in the DynamoDB Stream
def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        logger.error(traceback.format_exc())

@SwaySway
Copy link
Contributor

SwaySway commented Aug 5, 2020

Hello @Rafcin are you using @key in your schema?

@Rafcin
Copy link
Author

Rafcin commented Aug 5, 2020

I have my schema structured where the main object that holds everything does not use a key. It has a connection but only the smaller objects in it have keys. The main object in this case is a restaurant that stores menu items and then the menu items have reviews and those using @key. In the end its the main object that i'm trying to update.

@SwaySway
Copy link
Contributor

SwaySway commented Aug 7, 2020

I tried to repro with the following schema

type Todo
  @model
  @searchable
  @key(name: "nameCreatedAt", fields: ["name", "createdAt"], queryField: "byNameCreatedAt")
{
  id: ID!
  name: String!
  description: String
  createdAt: AWSDateTime
  updatedAt: AWSDateTime
} 

Though it's deleting records for me. Did you use the ddb_to_es.py script to backfill your data?

@SwaySway SwaySway added the pending-response Issue is pending response from the issue author label Aug 7, 2020
@Rafcin
Copy link
Author

Rafcin commented Aug 7, 2020

I haven't modified ddb_to_es.py in any way. For me it hasn't removed any records it simply adds records and never updates or deletes them if a change occurs in DDB. The only difference with the above schema I see is you use queryField, something I didn't add.

Here is a chunk of my Schema:

type Restaurant
@model 
@searchable
@auth(
  rules: 
    [
      { allow: owner }
      { allow: public, provider: iam, operations: [read] }
      { allow: private, provider: iam }
      { allow: groups, groups: ["User"], operations: [read] }
      { allow: groups, groups: ["RestaurantUser"], operations: [read,create,update] }
      { allow: groups, groups: ["Admin"], operations: [read,create,update,delete] }
    ]
)
{
  id: ID!
  restaurantName: String!
  restaurantAbout: String!
  
  restaurantAddress: String!
  restaurantRoute: String!
  restaurantLocality: String
  restaurantZip: String!
  
  restaurantPhone: String!
  restaurantWebsite: String
  restaurantEmail: String!

  restaurantOwner: [RestaurantOwner!]!

  location: Location!

  restaurantOpeningHours: [String]

  restaurantAmenities: [RestaurantAmenities!]!
  restaurantPaymentOptions: [RestaurantPaymentOptions!]!

  restaurantRating: Float

  restaurantTuesdaySpecials: [RestaurantSpecial] @connection(keyName: "byRestaurant", fields: ["id"])
  restaurantTuesdaySpecialsBlurb: String

  restaurantImageHeaders: [String!]!

  restaurantPostmates: String
  restaurantDoorDash: String
  restaurantUberEats: String
  restaurantGrubhub: String

  restaurantPlaceID: String!
}

type Location @aws_iam @aws_cognito_user_pools{
 lat: Float
 lon: Float
}

input LocationInput {
  lat: Float
  lon: Float
}

type RestaurantAmenities {
  value: String!
  label: String!
}

type RestaurantPaymentOptions {
  value: String!
  label: String!
}

type RestaurantOwner {
  name: String!
  ownerid: String!
  group: String!
}

type RestaurantSpecial 
@model 
@key(name: "byRestaurant", fields: ["restaurantSpecialID"])
@auth(
  rules: 
    [
      { allow: owner }
      { allow: public, provider: iam, operations: [read] }
      { allow: private, provider: iam }
      { allow: groups, groups: ["User"], operations: [read,create,update,delete] }
      { allow: groups, groups: ["RestaurantUser"], operations: [read,create,update,delete] }
      { allow: groups, groups: ["Admin"], operations: [read,create,update,delete] }
    ]
){
  id: ID!
  restaurantSpecialID: ID! 
  restaurantName: String!
  title: String!
  description: String!
  price: String!
  picture: String!
  reviews: [SpecialReview] @connection(keyName: "byRestaurantSpecial", fields: ["id"])
  restaurant: Restaurant @connection(fields: ["restaurantSpecialID"])
}

@idanlo
Copy link

idanlo commented Aug 12, 2020

Same for me - deleting an item directly from DynamoDB console does not affect the elasticsearch indice

@yonatanganot
Copy link

We are having this issue as well.
@SwaySway help will be appreciated.

@SwaySway
Copy link
Contributor

SwaySway commented Aug 12, 2020

I tried to repro with version 4.27 given the schema provided by @racfin.
I did remove the reviews portion as the SpecialReview type was not included but the script was still able to delete an item for me.

@idanlo @yonatanganot
If the delete is failing could you provide the log for the lambda? The name of the lambda follows this format

DdbToEsFn-<YOUR_API_ID>-<ENV>

If the information is sensitive you can also send them to amplify-cli@amazon.com

@Rafcin
Copy link
Author

Rafcin commented Aug 12, 2020 via email

@Rafcin
Copy link
Author

Rafcin commented Aug 13, 2020

Sorry it's late... I broke a few auth things... anyways I forwarded the logs to the email above!

@SwaySway
Copy link
Contributor

Hello @Rafcin
I noticed the logs did not have any errors. Are there any logs where the script failed to delete an item?
If you used this script before ddb_to_es.py we recently fixed a bug to support the use of secondary indexes.

re #3786

@Rafcin
Copy link
Author

Rafcin commented Aug 17, 2020

Is this the new python script? I can switch it and see if it works. As far as I could see I never saw an error but I can check again

@Rafcin
Copy link
Author

Rafcin commented Aug 17, 2020

I just triggered a change to an object in Dynamo and in cloudwatch and the only lambda error was

[ERROR]	2020-08-17T18:50:38.946Z	193e9b01-76af-4d6a-9dae-022966564add	Traceback (most recent call last):
  File "/var/task/python_streaming_function.py", line 257, in lambda_handler
    return _lambda_handler(event, context)
  File "/var/task/python_streaming_function.py", line 251, in _lambda_handler
    post_to_es(es_payload)  # Post to ES with exponential backoff
  File "/var/task/python_streaming_function.py", line 106, in post_to_es
    es_errors = [item for item in es_ret['items']
  File "/var/task/python_streaming_function.py", line 107, in <listcomp>
    if item.get('index').get('error')]
AttributeError: 'NoneType' object has no attribute 'get'

@Rafcin
Copy link
Author

Rafcin commented Aug 17, 2020

What should be done in place of the older handler? The function requires a handler to be set

@Rafcin
Copy link
Author

Rafcin commented Aug 19, 2020

Sorry it was late and I had put the ddb_to_es.py in the streaming function...
anyways I updated the streaming function and its throwing some new errors.

[DEBUG]	2020-08-19T20:08:52.635Z	6309177d-8d7e-4b3e-aca5-26abcfcbbc71	Return from ES: b'
{
    "took": 10,
    "errors": true,
    "items": [
        {
            "index": {
                "_index": "restaurant",
                "_type": "doc",
                "_id": "TacoTuesdayRestaurant_ZZQyFMn4SJlAIRhygkummKsQHbWfjFc3dPgLvkKg",
                "status": 409,
                "error": {
                    "type": "version_conflict_engine_exception",
                    "reason": "[doc][TacoTuesdayRestaurant_ZZQyFMn4SJlAIRhygkummKsQHbWfjFc3dPgLvkKg]: version conflict, current version [1] is higher or equal to the one provided [1]",
                    "index_uuid": "MjgE7XpzS-uIG8qJB3IalQ",
                    "shard": "0",
                    "index": "restaurant"
                }
            }
        },
        {
            "delete": {
                "_index": "restaurant",
                "_type": "doc",
                "_id": "id=TacoTuesdayRestaurant_ZZQyFMn4SJlAIRhygkummKsQHbWfjFc3dPgLvkKg",
                "_version": 1,
                "result": "not_found",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 8,
                "_primary_term": 3,
                "status": 404
            }
        }
    ]
}
'

[ERROR]	2020-08-19T20:08:52.635Z	6309177d-8d7e-4b3e-aca5-26abcfcbbc71	ES post unsuccessful, errors present, took=10ms 


[ERROR]	2020-08-19T20:08:52.635Z	6309177d-8d7e-4b3e-aca5-26abcfcbbc71	Traceback (most recent call last):
  File "/var/task/python_streaming_function.py", line 258, in lambda_handler
    return _lambda_handler(event, context)
  File "/var/task/python_streaming_function.py", line 252, in _lambda_handler
    post_to_es(es_payload)  # Post to ES with exponential backoff
  File "/var/task/python_streaming_function.py", line 107, in post_to_es
    es_errors = [item for item in es_ret['items']
  File "/var/task/python_streaming_function.py", line 108, in <listcomp>
    if item.get('index').get('error')]
AttributeError: 'NoneType' object has no attribute 'get'

@Rafcin
Copy link
Author

Rafcin commented Sep 2, 2020

So far none of the changes have fixed it.

@SwaySway SwaySway removed their assignment Oct 13, 2020
@SwaySway
Copy link
Contributor

Hello @Rafcin
Are you still experiencing issues with this?
I noticed the error provided in the error logs was a due to a version mismatch.
version_conflict_engine_exception

Do you have datastore features enabled?

@SwaySway SwaySway removed the pending-response Issue is pending response from the issue author label Oct 23, 2020
@r0zar
Copy link
Contributor

r0zar commented Nov 13, 2020

Is anyone still dealing with this issue?

@Rafcin
Copy link
Author

Rafcin commented Nov 13, 2020

Hi sorry, I commented on this issue somewhere else, I ended up going into one of the templates and disabling versioning and it seems work as it should

@Rafcin
Copy link
Author

Rafcin commented Nov 14, 2020

Actually sorry I just checked again, an array of strings I changed did not update at all.

@Rafcin
Copy link
Author

Rafcin commented Nov 19, 2020

I think the issue is resolved, I looked at both lambda functions in my dev branch and production and they are exactly the same. It seems to work after the version conflict detection was disabled. I did some tests and the reason some images don't update after the change is because of the extreme caching I have setup for the production branch that has now been fixed.

@r0zar I would suggest disabling the conflict detection, if I remember correctly I had to go into my templates as well to forcefully remove it because every time I ran amplify push it complained.

@idobleicher
Copy link

I might have an idea why it's happened to you, I had also the same issue.
what I did I have change the name of the table so the old name still indexed in Kianna so I was thinking that it didn't
update the table data. but what I forgot to do is to create a new index in Kianna. therefore I saw old data which I thought that the table didn't update but I was needed to create the new index. : )

I hope this helps.
In this picture I am about to remove the old index, assetsearch, and keep the new one. the new one have updated data correctly
image

@SwaySway
Copy link
Contributor

SwaySway commented Dec 8, 2020

Closing this issue as this is happening due to conflict versioning enabled.

@SwaySway SwaySway closed this as completed Dec 8, 2020
@github-actions
Copy link

This issue has been automatically locked since there hasn't been any recent activity after it was closed. Please open a new issue for related bugs.

Looking for a help forum? We recommend joining the Amplify Community Discord server *-help channels for those types of questions.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators May 25, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
graphql-transformer-v1 Issue related to GraphQL Transformer v1 pending-triage Issue is pending triage @searchable Issues related to the @searchable GraphQL directive
Projects
None yet
Development

No branches or pull requests

7 participants