Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table engine with auto deduplication by key for realtime #5125

Open
pikhovkin opened this issue Apr 26, 2019 · 16 comments

Comments

@pikhovkin
Copy link

commented Apr 26, 2019

Add table engine with auto deduplication by key.

When inserting a string with an existing key into the table, is there a way to automatically mark the old string so that it is no longer used in the select request?

@pikhovkin pikhovkin added the feature label Apr 26, 2019
@den-crane

This comment has been minimized.

Copy link
Contributor

commented Apr 26, 2019

create table xx (A String, X UInt64) engine=ReplacingMergeTree order by A;
insert into xx values ('a', 1);
insert into xx values ('a', 2);
select  * from xx final;
┌─A─┬─X─┐
│ a │ 2 │
└───┴───┘

optimize table xx final;
select count() from xx
┌─count()─┐
│       1 │
└─────────┘
@pikhovkin

This comment has been minimized.

Copy link
Author

commented Apr 26, 2019

@pikhovkin

This comment has been minimized.

Copy link
Author

commented Apr 26, 2019

select * from xx final;

This is no nothing different by speed from ordinary relations databases

optimize table xx final;

It's bulllshit for realtime

@pikhovkin pikhovkin changed the title Add table engine with auto deduplication by key Add table engine with auto deduplication by key in realtime Apr 26, 2019
@pikhovkin pikhovkin changed the title Add table engine with auto deduplication by key in realtime Add table engine with auto deduplication by key for realtime Apr 26, 2019
@den-crane

This comment has been minimized.

Copy link
Contributor

commented Apr 26, 2019

This is no nothing different by speed from ordinary relations databases

¯_(ツ)_/¯

You need another realm then, with real magic.

@den-crane

This comment has been minimized.

Copy link
Contributor

commented Apr 26, 2019

AresDB is in-memory database.
Do you want the size of your data be equal of your's server RAM as well?

Of course everybody dreams about this feature, but How? Disks are too slow for real-time merge within ingestion.

@zombiemonkey

This comment has been minimized.

Copy link

commented Apr 27, 2019

@pikhovkin. @den-crane is right. If you dig into the link there AresDB uses batch based processing for late data. Being creative - you could use the StorageJoin engine which allows for real-time in-memory updates/deduplication (you can also just run normal selects/queries from this and it is disk backed) and combine that with Replacing or CollapsingMergeTree (if you need volume). Basically track the key and a time-stamp then use joinGet to determine if the record is valid/current. You'll need to perform two inserts but thats not a big deal for CH. Depending on your exact use-case it will be reasonably fast, real-time and will automatically handle background merging of late data.

@zombiemonkey

This comment has been minimized.

Copy link

commented Apr 27, 2019

Check out the example of StorageJoin here. The docs probably need to be updated and don't provide the full context of what is possible with this engine.

https://github.com/yandex/ClickHouse/blob/master/dbms/tests/queries/0_stateless/00800_versatile_storage_join.sql

@pikhovkin

This comment has been minimized.

Copy link
Author

commented Apr 29, 2019

@den-crane I talk about the idea for fast (more or less fast) updating of data.
Now the speed of updating of data in CH is ~8qps (only updates) and ~16qps (1/3 updates and 2/3 inserts). This is through ALTER ... UPDATE ... mechanism for each row. Аnd after 100к-200к of these updates, the memory starts to leak.
I need the speed will be as minimum 70-100 qps for updates. Better yet, over 150-200 qps.
I tried Vertica, it has MERGE mechanism for updates-inserts. The speed for merge was ~55-70qps (only updates). And Vertica works very stable.
Read my realtime words as near realtime.

@zombiemonkey

This comment has been minimized.

Copy link

commented Apr 29, 2019

@pikhovkin. It would be good to understand your full requirements/data volumes. Basically you're describing an OLTP or key/value store and not an OLAP store.

I noted a way to achieve the same functionality as AresDB above by using StorageJoin. This is an in-memory table that syncs to disk. You can update records by key and it will deduplicate/replace the record with the newest key (you have to use INSERTS - the most recent INSERT will overwrite the previous key value). You will have to insert the same record into a Collapsing or Replacing merge tree. You can then use the key with joinGet and the 'current day' to filter out old records and rely on background merging to address historical data. In this way StorageJoin acts as a mask. This will only work if the data volume window you need to update fits within memory.

If you need the ability to update 200+ records/s in memory and disk in 'real-time' then you need a key/value store (redis etc..) or OLTP database with decent storage/IO (PostgreSQL or MySQL will do this with SSDs). You could then use this as a remote/ODBC table source with ClickHouse if you need OLAP capabilities or want to join it with other less 'real-time' data. If you can maintain a masking filter though with StorageJoin or some other external dictionary you will achieve 10s of millions of row 'updates' per second with ClickHouse inserts.

@pikhovkin

This comment has been minimized.

Copy link
Author

commented Apr 29, 2019

It sounds very cool. When I loaded data in CH (40kk) it didnt collapse the duplicates while ~10-12 hours. Duplicates was collapsed only after optimize ... command.

@zombiemonkey

This comment has been minimized.

Copy link

commented Apr 29, 2019

Really? I just managed 5 million updates on a 'current day' table ala AresDB in < 1 second. Haven't tested multiple inserts/single inserts but these can always be dealt with using buffered inserts or buffer tables. You could also chain the updates together behind a materialized view if required. You will need some orchestration to 'close off' the day. Late updates/data may have to be handled with some manual management of optimize or create partition/replace partition.

drop table if exists current_day_mask;
drop table if exists current_day_data;
drop table if exists mydata;

create table current_day_mask (
	key UInt64,
	epoch UInt64 
) engine = Join(ANY, LEFT, key) SETTINGS join_any_take_last_row = 1;

create table current_day_data (
	key UInt64,
	epoch UInt64,
	iscurrent UInt8 ALIAS joinGet('current_day_mask', 'epoch', key) == epoch,
	col1 UInt16,
	col2 String
) engine = ReplacingMergeTree(epoch)
partition by toDate(toDateTime(epoch/1000))
order by (key);

create view mydata as select * from current_day_data where iscurrent;

insert into current_day_mask select number, 1556544940000 from system.numbers limit 1000000;
insert into current_day_data select number, 1556544940000, rand(1), 'sometext' from system.numbers limit 1000000;

SELECT 
    epoch, 
    count()
FROM mydata 
GROUP BY epoch
FORMAT PrettySpace

        epoch     count()

1556544940000   100000000

1 rows in set. Elapsed: 0.915 sec. Processed 100.00 million rows, 1.60 GB (109.30 million rows/s., 1.75 GB/s.) 

hulk :) insert into current_day_mask select number, 1556545940000 + 500 from system.numbers limit 5000000;

INSERT INTO current_day_mask SELECT 
    number, 
    1556545940000 + 500
FROM system.numbers 
LIMIT 5000000

Ok.

0 rows in set. Elapsed: 0.268 sec. Processed 5.05 million rows, 40.37 MB (18.84 million rows/s., 150.71 MB/s.) 

hulk :) insert into current_day_data select number, 1556545940000 + 500, rand(1), 'myupdatedtext' from system.numbers limit 5000000;

INSERT INTO current_day_data SELECT 
    number, 
    1556545940000 + 500, 
    rand(1), 
    'myupdatedtext'
FROM system.numbers 
LIMIT 5000000

Ok.

0 rows in set. Elapsed: 0.385 sec. Processed 5.05 million rows, 40.37 MB (13.09 million rows/s., 104.72 MB/s.) 

hulk :) select epoch, count() from mydata group by epoch format PrettySpace;

SELECT 
    epoch, 
    count()
FROM mydata 
GROUP BY epoch
FORMAT PrettySpace

        epoch    count()

1556545940500    5000000
1556544940000   95000000

2 rows in set. Elapsed: 0.997 sec. Processed 105.00 million rows, 1.68 GB (105.36 million rows/s., 1.69 GB/s.) 
@den-crane

This comment has been minimized.

Copy link
Contributor

commented Apr 29, 2019

I tried Vertica, it has MERGE mechanism for updates-inserts.
The speed for merge was ~55-70qps (only updates). And Vertica works very stable.

@pikhovkin
Did you try it with 2B rows table?
~55-70qps of MERGE INTO ? Or batched update by one MERGE INTO ?

@den-crane

This comment has been minimized.

Copy link
Contributor

commented Apr 29, 2019

It sounds very cool. When I loaded data in CH (40kk) it didnt collapse the duplicates while ~10-12 hours. Duplicates was collapsed only after optimize ... command.

Merges are eventual. They may never happen because of parts size for example.

@pikhovkin

This comment has been minimized.

Copy link
Author

commented Apr 30, 2019

@zombiemonkey I need to update the data periodically (every 5-15 minutes) for 3 days. There are text fields in the data.

@den-crane I load 40kk rows by 1k rows. Rows contains ~100 columns.

@zombiemonkey

This comment has been minimized.

Copy link

commented May 6, 2019

@pikhovkin I think we've discussed the two solutions and I won't be commenting further.

  1. Use CollapsingMergeTree with appropriately specced hardware will allow for real-time ingestion and reporting. Note that you have to use group by with CollapsingMergeTree to achieve real-time. You will need to either using versioning or have something that is aware of the last state of the row to insert the correct 'sign' record.
  2. Use StorageJoin coupled with ReplacingMergeTree and some clever management. I provided an example of this method that, with a 100 million row set, allowed for updating 5 million rows ~ 700ms on a single machine. StorageJoin is used as a mask here and enables complete real-time updates. Single statement updates of several hundred individual keys per second should be possible. Bulk updates in 10s of millions of rows/s are probably possible.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.