Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated records #33

Closed
benoittgt opened this issue Jan 3, 2017 · 17 comments
Closed

Duplicated records #33

benoittgt opened this issue Jan 3, 2017 · 17 comments

Comments

@benoittgt
Copy link
Contributor

Hello @IanMeyers and others

I'm getting random duplicates rows. For last day :

"id","count"
2535282,2
2543816,2
2543817,2
2549680,2
2549679,2
2535281,2
2555470,2
2565819,2

I can see the duplicate entries on s3 file but not when reading kinesis stream content.

Trying to investigate.

@benoittgt
Copy link
Contributor Author

For example I have this s3 file :

2535270|2017-01-02 09:23:26|115424|19|native|696|app|89351|114020006
2535271|2017-01-02 09:23:27|193647|19|native|2887|app|191442|124000005
2535272|2017-01-02 09:23:30|115424|19|native|696|app|89351|114020006
2535273|2017-01-02 09:23:30|83137|19|native|696|app|62830|114020006
2535274|2017-01-02 09:23:38|120666|19|native|696|app|134184|114020006
2535276|2017-01-02 09:23:41|193734|19|native|696|app|62414|114020006
2535275|2017-01-02 09:23:41|66597|17|native|696|app|64801|114020006
2535277|2017-01-02 09:23:41|58405|19|native|696|app|64047|114020006
2535278|2017-01-02 09:23:45|68352|17|native|696|app|72727|114020006
2535279|2017-01-02 09:23:46|169893|19|native|696|app|63385|114020006
2535280|2017-01-02 09:23:47|133019|19|native|696|app|91773|114020006
2535281|2017-01-02 09:23:47|110088|17|native|696|app|63424|114020006 <<<
2535282|2017-01-02 09:23:47|68352|17|native|696|app|72727|114020006
2535281|2017-01-02 09:23:47|110088|17|native|696|app|63424|114020006 <<< 
2535282|2017-01-02 09:23:47|68352|17|native|696|app|72727|114020006
2535283|2017-01-02 09:23:48|166255|19|native|696|app|62825|114020006
2535284|2017-01-02 09:23:49|166255|19|native|696|app|62825|114020006
2535285|2017-01-02 09:23:49|142708|19|native|696|app|115740|114020006
2535286|2017-01-02 09:23:49|50296|19|native|696|app|29697|114020006
2535287|2017-01-02 09:23:52|193734|19|native|696|app|62414|114020006
2535288|2017-01-02 09:23:52|80034|19|native|696|app|73788|114020006
2535289|2017-01-02 09:23:53|88816|19|native|696|app|100378|114020006
2535290|2017-01-02 09:23:53|193734|19|native|696|app|62414|114020006
2535291|2017-01-02 09:23:59|124061|19|native|696|app|62475|114020006
2535292|2017-01-02 09:24:01|115424|19|native|696|app|89351|114020006
2535293|2017-01-02 09:24:04|50490|19|native|696|app|53979|114020006
2535294|2017-01-02 09:24:10|69294|19|native|696|app|74189|114020006
2535295|2017-01-02 09:24:15|50111|17|native|696|app|43511|114020006
2535296|2017-01-02 09:24:17|115424|19|native|696|app|89351|114020006
2535298|2017-01-02 09:24:17|56702|19|native|696|app|43923|114020006
2535297|2017-01-02 09:24:17|69063|19|native|696|app|72232|114020006
2535299|2017-01-02 09:24:22|188513|19|native|696|app|174704|114020006
2535300|2017-01-02 09:24:23|188513|19|native|696|app|174704|114020006

The line starting with 2535281 is inserted twice in s3 .

@IanMeyers
Copy link
Contributor

Unfortunately, because Kinesis supports as at-least-once delivery semantic, you are unable to suppress duplications unless you track ID's using a secondary mechanism (which can then move you to 'at most once' delivery semantics, which opens the possibility of data loss. It is best to leave the duplicate records in your delivery destination in S3, and deduplicate them within the analysis system you are using.

@benoittgt
Copy link
Contributor Author

Thanks a lot for you answer @IanMeyers. It's more clearer.

What do you mean by :

deduplicate them within the analysis system you are using
?

Thanks again Ian

@benoittgt
Copy link
Contributor Author

benoittgt commented Jan 3, 2017

Will probably follow Brent Nash, even if I didn't found the way to trigger all of this for the moment using this lambda and firehose.

We first load telemetry data into a temporary staging table and then perform a modified merge operation to remove duplicates. We use a SELECT DISTINCT query and then LEFT JOIN our staging table against our destination table on event_id to get rid of duplicate records that may have been introduced by Amazon Kinesis-related retries or during backfills of old data. We also make sure to load our data in sort key order to reduce or even eliminate the need for VACUUM operations.

From : https://aws.amazon.com/blogs/big-data/building-an-event-based-analytics-pipeline-for-amazon-game-studios-breakaway/

Closing the issue because it's not related to the lambda itself.

@IanMeyers
Copy link
Contributor

Yes, Brent is correct in one way to correctly merge new data that may contain duplicates. Thanks!

@ghost
Copy link

ghost commented Jan 3, 2017

Hey @benoittgt ,

I'm on vacation in the mountains at the moment, so my internet is spotty, but let me try to share a few details.

As Ian mentioned, Kinesis has "at least once" semantics, so you can get the occasional duplicate. In my experience, they're mostly due to producer retries. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html for more details.

In the system(s) I've built, we do what Ian mentioned where we archive all events (including duplicates) to S3 as sort of the "raw" record of what was received. Deduplication happens when we moving data from Kinesis or S3 into destination data stores like Redshift or ElasticSearchService.

The basic gist of it is that the producer (i.e. the thing sending data to Kinesis) generates a unique v4 UUID (an "event_id") and sends it as part of every event. This is the field that can be used to deduplicate.

When loading into Redshift, use a temporary staging table and then we perform a LEFT JOIN against the destination table(s) and only take event_ids that don't already exist in the destination table(s).

In the case of ElasticSearchService, you can do a similar thing by using the event_id as the id in the standard index/type/id mapping ESS provides (this actually results in duplicates just getting overwritten with the same record, but has the effect of deduping).

You can come up with similar mechanisms for other data stores as well.

Hope that helps and let me know if you need any further details.

@benoittgt
Copy link
Contributor Author

Thanks a lot @brentnash. Sorry to bother you during your vacation.

Deduplication happens when we moving data from Kinesis
Maybe it's a secret but how do you move it ? Lambda with cron job ? Firehose ?

I'm using Firehose for the moment to copy from S3 to Redshift. I will probably COPY with firehose data to a staging table and then do like you (remove duplicates, reorder and then merge) with a lambda scheduled with a cronjob. I would probably have to think about how to properly clean the staging table after a merge without disturb Firehose invoking COPY command. I don't know yet.

@benoittgt
Copy link
Contributor Author

I get an other answer on : http://disq.us/p/1eyg90w

I will add this mechanism tomorrow and will publish a blog post about the implementation next month.

Thanks again to both of you.

@benoittgt
Copy link
Contributor Author

@brentnash The only issue I will have with keeping this lambda with firehose copying into Redshift is that the staging table will constantly received data. I'm gonna probably use two staging table and

  1. alter the name of both tables to have always a table that can received data (staging_table) from firehose command and the other one processing previously received records (processing_table)
  2. process and merge the data to final table
  3. empty "processing_table" and swap name again

That looks quite complicate for few duplicates but I think this is the best to do.

@benoittgt
Copy link
Contributor Author

I have something similar to

BEGIN;
ALTER TABLE active_connections_temp
RENAME TO active_connections_process;
CREATE TABLE active_connections_temp (LIKE active_connections_process);
COMMIT;

BEGIN;
INSERT INTO active_connections_final
SELECT DISTINCT active_connections_process.*
FROM active_connections_process
LEFT JOIN active_connections_final ON active_connections_final.id = active_connections_process.id
WHERE active_connections_final.id IS NULL
ORDER BY active_connections_process.id;
DROP TABLE active_connections_process;
COMMIT;

@ghost
Copy link

ghost commented Jan 9, 2017

The other thing to consider is that you'll get relatively few duplicates while running normally, but the two places I see large numbers of duplicates are:

  1. Lambda/KCL failure - if your consumer reading from your Kinesis stream happens to process part of a batch, transmit it and die/crash before it checkpoints, you can end up getting a chunk of duplicates when it starts up again.

  2. Backfilling - if you ever have the need to replay old data (recovering from a crash or user error for instance), having a system that can drop duplicates makes this process MUCH easier. You can just replay data from the last N hours or days and not have to worry about introducing even more duplicates.

That being said, you're right, I don't think Firehose is a great fit for this at the moment. It's a lot of effort for potentially not much gain like you said. If you really want to use Firehose to write all the way to Redshift, then you're either going to have to do something complicated like you mentioned above or you'll have to live with the dupes. I'll try to find the actual SQL I use tomorrow to compare against what you posted above, but yours looks about like what I'd expect with the SELECT DISTINCT and the LEFT JOIN on id (which I assume is your unique ID).

If you're not concerned about #2 I mentioned above (i.e. you only care about duplicates that happen within a limited time window), you could also consider using the new Firehose embedded Lambda function feature (https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) to do some clever deduping. Maybe you could have it store the last hour or day worth of unique event IDs that it has seen in a Dynamo DB table and try to dedupe that way. I haven't tried it...just an idea.

Good luck!

@benoittgt
Copy link
Contributor Author

Thanks @brentnash for your answer. We have now a lambda that run in production with a stagging table.

Code looks like :

'use strict';
const config = require('./redshift_config_from_env');
const redshiftConn = `pg://${config.user}${config.password}@${config.host}/${config.database}`;
const pgp = require('pg-promise')();

var tableCopyQuery = function(tableName) {
  return `
      ALTER TABLE ${tableName}_temp
      RENAME TO ${tableName}_process;
      CREATE TABLE ${tableName}_temp (LIKE ${tableName}_process);`;
};
var insertQuery = function(tableName) {
  return `
      INSERT INTO ${tableName}
      SELECT DISTINCT ${tableName}_process.*
        FROM ${tableName}_process
        LEFT JOIN ${tableName} USING (id)
       WHERE ${tableName}.id IS NULL
       ORDER BY ${tableName}_process.id;
      DROP TABLE ${tableName}_process;`;
};

exports.handler = function(event, context) {
  const client = pgp(redshiftConn);

  return client.tx(function (t) {
    return t.batch([
      t.none(tableCopyQuery('user_stats')),
      t.none(insertQuery('user_stats')),
      t.none(tableCopyQuery('admin_stats')),
      t.none(insertQuery('admin_stats'))
    ]);
  })
    .then(function () {
      return context.succeed(`Successfully merged.`);
    })
    .catch(function (error) {
      return context.fail(`Failed to run queries : ${JSON.stringify(error)}`);
    });
};

For the moment it's working but will wait few days to be sure.

@ghost
Copy link

ghost commented Jan 10, 2017

Hey @benoittgt,

Just to follow up, I checked my merge SQL and it looks pretty similar to yours. The only differences I see are:

  1. We dedupe on the combination of "id" and "timestamp" rather than just "id" to cover the unlikely scenario that an ID repeats. This is probably unnecessary in most cases.

  2. We use time-series tables in Redshift to keep down the size (and need for vacuums) on our destination tables so we actually end up doing multiple merges (staging to November events, staging to December events, etc.). Probably not necessary in your use case unless you decide to go the time-series route.

One other thought is that you may want to check what the PRIMARY KEY/SORT KEY/DIST KEY are set to on your staging table. If your "id" is not part of those, your merges make take longer than necessary. Though since you're using CREATE TABLE ... LIKE ... you might not have a choice since you'll inherit those values from your parent table.

Glad to hear it seems to be working for you!

@benoittgt
Copy link
Contributor Author

Hello @brentnash

  1. The id is uuid generated by the backend. So it's unique.
  2. Didn't thought about time-series tables we not using them. Merges will be easier :)

Thanks a lot ! 2 days and it's still working as wanted.

@benoittgt
Copy link
Contributor Author

We finally had sometimes issue with other insert queries with Redshift Serializable isolation violation on table. We finally remove the redshift insert from Firehose and let it run only the s3 insert. The copy from s3 to a temp table and insert into final table are made in one transaction by a lambda.

Also all insert query transaction lock the Redshift table before doing anything. It works quite perfectly (expect #37).

It takes some times but we finally have a solution that can be easily debugged and that is very efficient.

Thanks again for the help.

@benoittgt
Copy link
Contributor Author

@ghost
Copy link

ghost commented Apr 16, 2017 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants