Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data inconsistency/loss following CONCURRENT_MAP_REPLACE_IF_SAME / java.lang.ClassCastException #462

Closed
JamieMcNaught opened this issue Apr 16, 2013 · 2 comments

Comments

Projects
None yet
2 participants
@JamieMcNaught
Copy link

commented Apr 16, 2013

Originally reported to Hazelcast forums however I've now been asked to report this to this issue tracker (so sorry for the cross posting)
https://groups.google.com/forum/?fromgroups=#!topic/hazelcast/5G5zhyBUC2U

I have an issue where following the above exception and the log entries below, the cluster I am working with loses data. Approx procedure is:

  1. Bring up two processes (port 51941 and 51946), each with an embedded Hazelcast instance and five threads modifying a shared map.
  2. An additional node is brought up to join the cluster, but does not modify the cluster. (port 51951)
  3. After approx 15 seconds the new node from step 2 is killed.
  4. Steps 2 and 3 are repeated with port number increasing by 5 each time.

Eventually an exception is thrown and the Java code running on the first two nodes notices that the data is inconsistent (an update appears to have been lost).

Would anyone expect the procedure above to lose data?
Could anyone give an explanation of the error message below?

I'm in the process of cleaning up the Java code for this and will post tomorrow assuming you don't tell me this is a know issue! This is on Linux 64 (RHEL5.5)

Thanks,

Jamie

Apr 16, 2013 11:22:16 PM com.hazelcast.impl.ConcurrentMapManager
WARNING: [10.1.2.48]:51941 [testCluster] Target[Address[10.1.2.48]:51966] is dead! Hazelcast will retry CONCURRENT_MAP_REPLACE_IF_SAME
Apr 16, 2013 11:22:16 PM com.hazelcast.impl.ConcurrentMapManager
WARNING: [10.1.2.48]:51941 [testCluster] Store thrown exception for CONCURRENT_MAP_REPLACE_IF_SAME
java.lang.ClassCastException: java.lang.String cannot be cast to com.hazelcast.impl.concurrentmap.MultiData
at com.hazelcast.impl.ConcurrentMapManager$ReplaceOperationHandler$ReplaceTask.doMapStoreOperation(ConcurrentMapManager.java:2891)
at com.hazelcast.impl.ConcurrentMapManager$AbstractMapStoreOperation.run(ConcurrentMapManager.java:3860)
at com.hazelcast.impl.executor.ParallelExecutorService$ParallelExecutorImpl$ExecutionSegment.run(ParallelExecutorService.java:212)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
at com.hazelcast.impl.ExecutorThreadFactory$1.run(ExecutorThreadFactory.java:38)

@JamieMcNaught

This comment has been minimized.

Copy link
Author

commented Apr 19, 2013

Hi again,

Please find the Java code which demonstrates the issue described above.

This is almost 100% reproducible.

#!/bin/sh
#
#  To check for errors, grep the output log files:
#  grep incon 1.log 2.log
#  there should be no lines returned, but currently most test runs produce errors.
#  Removing the for loop (which starts up addtional nodes) stops the errors occurring.
#

#HAZELCAST_VER=2.4.1
HAZELCAST_VER=2.5
HAZELCAST=/shared/distcache/hazelcast/lib/${HAZELCAST_VER}/community/
#HAZELCAST=/shared/distcache/hazelcast/lib/2.4.1/community/
#OUT_DIR=/tmp/

java  -cp ${HAZELCAST}/hazelcast-all-${HAZELCAST_VER}.jar:.  Robustness007 0 > ${OUT_DIR}1.log 2>&1&
echo $!
sleep 5
java  -cp ${HAZELCAST}/hazelcast-all-${HAZELCAST_VER}.jar:.  Robustness007 5 > ${OUT_DIR}2.log 2>&1&
echo $!

sleep 5
for ID in `echo 10 15 20 25 30 35 40 45 50 55 60`
do
    echo "java  -cp ${HAZELCAST}/hazelcast-all-${HAZELCAST_VER}.jar:. Robustness007 ${ID}"
    java  -cp ${HAZELCAST}/hazelcast-all-${HAZELCAST_VER}.jar:.  Robustness007 ${ID} > ${OUT_DIR}${ID}.log 2>&1&
    PID=$!
    sleep 15
    kill ${PID}
done
/**
 *  Test code for Hazelcast robustness.
 *
 *  Approx procedure (see the driver script test.sh):
 *  1) Bring up two processes (port 51941 and 51946), each with an embedded
 *     Hazelcast instance and five threads modifying a shared map.
 *  2) An additional node is brought up to join the cluster, but does not modify
 *     the cluster. (port 51951)
 *  3) After approx 15 seconds the new node from step 2 is killed.
 *  4) Steps 2 and 3 are repeated with port number increasing by 5 each time.
 *
 *
 */
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MembershipListener;
import com.hazelcast.core.MembershipEvent;

import com.hazelcast.core.Cluster;
import java.util.Map;
import java.util.Queue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.lang.Runtime;

class MutatorThread implements Runnable {
    private final int threadId;
    private final long count;
    private final IMap<String, String> table;
    private long[] lastCounter;
    private final int primes[] = {2,   3,   5,   7,  11,  13,  17,  19,  23,  29, 
                                 31,  37,  41,  43,  47,  53,  59,  61,  67,  71, 
                                 73,  79,  83,  89,  97, 101, 103, 107, 109, 113, 
                                127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 
                                179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 
                                233, 239, 241, 251, 257, 263, 269, 271, 277, 281};

    MutatorThread(int threadId, long count, IMap<String, String> table) {
        this.threadId = threadId;
        this.count = count;
        this.table = table;
        this.lastCounter = new long[10000];
    }

    public void run() {
        int rowIndex = 0;

        for(long i = 0; (i<this.count) && (!Thread.currentThread().isInterrupted()) ;i++) {

            rowIndex = (rowIndex + primes[threadId]) % 10000; 

            boolean commited = false;
            while(!commited) {
                String key = String.valueOf(rowIndex);
                String oldrow = table.get(key);
                String newrow = oldrow;
                if(newrow == null)
                    newrow = "|";
                newrow = newrow + Integer.toString(threadId)+"="+String.format("%07d", i)+"|";

                // Check to see if we can find the number we added on our last iteration.
                if (lastCounter[rowIndex]!=0) {
                    if(oldrow==null) {
                        System.out.println("Oldrow null for threadid="+Integer.toString(threadId)+" rowIndex="+Integer.toString(rowIndex)+" lastCounter="+Long.toString(lastCounter[rowIndex]));
                    } else {
                        String expected = Integer.toString(threadId)+"="+String.format("%07d", lastCounter[rowIndex]);
                        int index = oldrow.indexOf(expected);
                        if(index == -1) {
                            System.out.println("Row inconsistency found (rowIndex="+rowIndex+"). Searching of \""+expected+"\" in \""+oldrow+"\""); 
                        }
                    }
                }

                if(oldrow==null) {
                    commited = (table.putIfAbsent(key,newrow) == null);
                } else {  
                    commited = table.replace(key,oldrow,newrow); 
                }
            }
            lastCounter[rowIndex] = i; 
        }
        System.out.println("Thread "+this.threadId+" done.");   
    }
}

public class Robustness007 {

    private static final String tableName = "testTable";
    private static IMap<String, String> imap;
    private static int totalIterations = 1 * 300 * 1000;
    private static HazelcastInstance instance;

    public static void main(String[] args) throws java.lang.InterruptedException{

        int startIdx = Integer.parseInt(args[0]);

        Config config = new Config();
        config.setProperty("hazelcast.icmp.enabled","true");
        config.getNetworkConfig().setPort(51941+startIdx);
        config.getGroupConfig().setName("testCluster");
        System.out.println("Backup Count = "+config.getMapConfig(tableName).getBackupCount());

        try 
        {
            instance = Hazelcast.newHazelcastInstance(config);
            imap = instance.<String,String>getMap(tableName);
        } 
        catch (Exception e)
        {
            System.out.println("Failed to create store, exiting.");
            return;
        }
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("\nExiting.");
                instance.getLifecycleService().shutdown();
            }
        });

        // Only actually execute the Hazelcast threads for processes 1 & 2.
        // Other processes connect, but don't try and modify the data.
        if(startIdx < 10) {
            List<Thread> threads = new ArrayList<Thread>();
            List<MutatorThread> containers = new ArrayList<MutatorThread>();
            for(int i = startIdx; i < startIdx+5 ; i++) {
                MutatorThread run1 = new MutatorThread(i, totalIterations, imap);
                Thread t1 = new Thread(run1);
                t1.start();
                threads.add(t1);
                containers.add(run1);
            }

            // Wait on the threads.
            for(int i = startIdx; i < (startIdx+5) ; i++) {
                System.out.println("Waiting on thread "+i);
                threads.get(i-startIdx).join();
                System.out.println("Thread "+i+" finished");
            }

            // Display all the data loaded (useful for checking what went missing).
            for(int i = 0; i < 10000; i++) {
                String row = imap.get(String.valueOf(i));
                if(row == null)
                    System.out.println("Row = "+i+" = null");
                else
                    System.out.println("Row = "+i+" = "+row);
            }
        }
        // Wait for second process to finish updating before exiting.
        for(int i=0; i<100; i++) {
            Thread.sleep(1000);
            System.out.print('.');
        }
    }
}
@JamieMcNaught

This comment has been minimized.

Copy link
Author

commented Apr 30, 2013

Thanks @mdogan - I've tested this with our tests and this is now working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.