Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventJournal loses data if 2 nodes terminate #12300

Closed
gurbuzali opened this issue Feb 9, 2018 · 12 comments
Closed

EventJournal loses data if 2 nodes terminate #12300

gurbuzali opened this issue Feb 9, 2018 · 12 comments

Comments

@gurbuzali
Copy link
Member

@gurbuzali gurbuzali commented Feb 9, 2018

Here is a reproducer for the issue
https://gist.github.com/gurbuzali/897f5cd15d3347e29fb5af6c4ed2e453

  • I start a 4 node cluster and a client
  • start a thread to produce some data for event journal
  • terminate one instance
  • wait for some time
  • terminate second instance
  • check total count of events in the journal
@gurbuzali
Copy link
Member Author

@gurbuzali gurbuzali commented Feb 16, 2018

I've got even simpler reproducer. This shows that the problem is not related with not applied backups. Migration is the new suspect I guess

import com.hazelcast.config.Config;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.spi.properties.GroupProperty;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;

import static java.util.concurrent.TimeUnit.SECONDS;

public class EventJournalTest {

    private static int partitionCount = 12;
    private static String mapName = "map";
    private static int instanceCount = 3;
    private static int perKeyCount = 2;
    private static int totalCount = 1000;

    public static void main(String[] args) throws Exception {


        Config config = getConfig();

        HazelcastInstance[] instances = new HazelcastInstance[instanceCount];
        Arrays.parallelSetAll(instances, i -> Hazelcast.newHazelcastInstance(config));

        MapProxyImpl<Integer, Integer> map = (MapProxyImpl) instances[0].getMap(mapName);

        for (int i = 0; i < totalCount; i++) {
            for (int j = 0; j < perKeyCount; j++) {
                map.set(i, j);
            }
        }

        check(map);

        instances[instanceCount - 1].getLifecycleService().terminate();

        SECONDS.sleep(15);

        check(map);

        instances[instanceCount - 2].getLifecycleService().terminate();

        SECONDS.sleep(5);

        check(map);

    }

    private static void check(MapProxyImpl<Integer, Integer> map)
            throws ExecutionException, InterruptedException {
        int total = 0;
        for (int i = 0; i < partitionCount; i++) {
            long seq = 0;
            int readCount = 1;
            while (readCount > 0) {
                ICompletableFuture<ReadResultSet<EventJournalMapEvent<Integer, Integer>>> f =
                        map.readFromEventJournal(seq, 0, 10_000, i, null, null);
                ReadResultSet<EventJournalMapEvent<Integer, Integer>> resultSet = f.get();
                readCount = resultSet.readCount();
                seq += readCount;
            }
            System.out.println("for partition " + i + ", seq: " + seq);
            total += seq;
        }
        System.out.println("total: " + total + ", expected: " + (totalCount * perKeyCount) +
                ", mapSize: " + map.size() + " (" + perKeyCount + ")");
    }

    private static Config getConfig() {
        Config config = new Config();
        config.setProperty(GroupProperty.PARTITION_COUNT.getName(), String.valueOf(partitionCount));
        EventJournalConfig eventJournalConfig = new EventJournalConfig();
        eventJournalConfig.setEnabled(true).setMapName(mapName + "*").setCapacity(partitionCount * 200_000);
        config.addEventJournalConfig(eventJournalConfig);
        return config;
    }
}
@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 16, 2018

As far as I see, EventJournal has no backups;

    @Override
    public RingbufferConfig toRingbufferConfig(EventJournalConfig config) {
        int partitionCount = nodeEngine.getPartitionService().getPartitionCount();
        return new RingbufferConfig()
                .setAsyncBackupCount(0)
                .setBackupCount(0)
                .setInMemoryFormat(InMemoryFormat.OBJECT)
                .setCapacity(config.getCapacity() / partitionCount)
                .setTimeToLiveSeconds(config.getTimeToLiveSeconds());
    }
@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Feb 16, 2018

The event journal replica count is equal to the map replica count. So if there are 3 replicas, each replica will have a local ringbuffer and map operations will only write to the local ringbuffer.

@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 16, 2018

Ah, map backup replicas have their own journal ringbuffers. I see.

@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 16, 2018

Interestingly, map survives the crash but journal can't.

Before crash:

for partition 0, seq: 162
for partition 1, seq: 174
for partition 2, seq: 152
for partition 3, seq: 150
for partition 4, seq: 168
for partition 5, seq: 170
for partition 6, seq: 150
for partition 7, seq: 170
for partition 8, seq: 156
for partition 9, seq: 188
for partition 10, seq: 172
for partition 11, seq: 188
total: 2000, expected:  2000, mapSize: 1000

After crash:

for partition 0, seq: 162
for partition 1, seq: 87
for partition 2, seq: 152
for partition 3, seq: 75
for partition 4, seq: 168
for partition 5, seq: 170
for partition 6, seq: 150
for partition 7, seq: 170
for partition 8, seq: 156
for partition 9, seq: 188
for partition 10, seq: 86
for partition 11, seq: 94
total: 1658, expected:  2000, mapSize: 1000

Also, there's no complete partition loss for journal, but half of the events in a single partition are lost.

@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Feb 16, 2018

It might just as well be that some map backup operation does not add the same amount of entries into the backup ringbuffer as the operation on the primary replica. So we end up with a different amount of entries on the primary vs backup.

@gurbuzali
Copy link
Member Author

@gurbuzali gurbuzali commented Feb 16, 2018

@mmedenjak keep in mind that we don't see any data lost after first termination

@metanet
Copy link
Contributor

@metanet metanet commented Feb 16, 2018

could it be that event journal data is not migrated correctly? after the first crash, 1st backup will be promoted to the primary, and it will create a new backup replica. If it does not replicate the event journal data to the new backup replica correctly, 2nd crash will lead to data loss

@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 16, 2018

Problem is, anti-entropy and backup replication don't work for ringbuffers backing event journal. Reason is, these ringbuffers are created with 0 backups. That means, when a member leaves un-gracefully, missing backup replicas for these journals are not created by either migration system or anti-entropy.
Instead, when a map partition is replicated, each map record creates an ADD event in journal (using eventJournal.writeAddEvent(..)). That's why missing ringbuffers are created using latest map records and previous write/update events are lost.

@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 16, 2018

If we change following method to;

 @Override
    public RingbufferConfig toRingbufferConfig(EventJournalConfig config) {
        int partitionCount = nodeEngine.getPartitionService().getPartitionCount();
        return new RingbufferConfig()
                .setAsyncBackupCount(mapAsyncBackupCount)
                .setBackupCount(mapSyncBackupCount)
                .setInMemoryFormat(InMemoryFormat.OBJECT)
                .setCapacity(config.getCapacity() / partitionCount)
                .setTimeToLiveSeconds(config.getTimeToLiveSeconds());
    }

and remove adding journal events using eventJournal.writeAddEvent(..) during map replication, then everything works as expected.

@viliam-durina viliam-durina changed the title EventJournal loses data if 2 nodes terminates EventJournal loses data if 2 nodes terminate Feb 16, 2018
@mmedenjak
Copy link
Contributor

@mmedenjak mmedenjak commented Feb 16, 2018

Good analysis, thanks @mdogan !

@mdogan
Copy link
Contributor

@mdogan mdogan commented Feb 19, 2018

This issue is not resolving with just setting correct backup counts to ringbuffer. RingbufferService manages its own replication and split-brain handling but for event-journal, cache and map services are also trying to replicate and merge event-journal based ringbuffers. This is causing event-journal data loss and inconsistencies.

Map and cache should only write event-journal while adding/updating events due to map/cache writes and additionally while clearing and destroying data structures. But currently, map/cache replication and split-brain merge operations clear & rewrite the event-journal, even though RingbufferService handles these separately.

My current difficulty is, map/cache recordstore impls are reusing existing methods both to insert/update their data during map/cache write operations and to replicate data during migration and split brain merge. Those should be different paths to distinguish when to modify event-journal, when to not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.