Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled task remains cancelled after migration #10603

Closed
tkountis opened this issue May 17, 2017 · 0 comments

Comments

Projects
None yet
1 participant
@tkountis
Copy link
Contributor

commented May 17, 2017

I'm trying to use hazelcast ScheduledExecutorService to execute some periodic tasks. I'm using hazelcast 3.8.1.

I start one node and then the other, and the tasks are distributed between both nodes and properly executed.

If I shutdown the first node, then the second one will start to execute the periodic tasks that were previously on the first node.

The problem is that, if I stop the second node instead of the first, then its tasks are not rescheduled to the first one. This happens even if I have more nodes. If I shutdown the last node to receive tasks, those tasks are lost.

The shutdown is always done with ctrl+c

I've created a test application, with some sample code from hazelcast examples and with some pieces of code I've found on the web. I start two instances of this app.

public class MasterMember {

/**
 * The constant LOG.
 */
final static Logger logger = LoggerFactory.getLogger(MasterMember.class);

public static void main(String[] args) throws Exception {

    Config config = new Config();
    config.setProperty("hazelcast.logging.type", "slf4j");
    config.getScheduledExecutorConfig("scheduler").
    setPoolSize(16).setCapacity(100).setDurability(1);

    final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);

    Runtime.getRuntime().addShutdownHook(new Thread() {

        HazelcastInstance threadInstance = instance;

        @Override
        public void run() {
            logger.info("Application shutdown");

            for (int i = 0; i < 12; i++) {
                logger.info("Verifying whether it is safe to close this instance");
                boolean isSafe = getResultsForAllInstances(hzi -> {
                    if (hzi.getLifecycleService().isRunning()) {
                        return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS);
                    }
                    return true;
                });

                if (isSafe) {
                    logger.info("Verifying whether cluster is safe.");
                    isSafe = getResultsForAllInstances(hzi -> {
                        if (hzi.getLifecycleService().isRunning()) {
                            return hzi.getPartitionService().isClusterSafe();
                        }
                        return true;
                    });

                    if (isSafe) {
                        System.out.println("is safe.");
                        break;
                    }
                }

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            threadInstance.shutdown();

        }

        private boolean getResultsForAllInstances(
                Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) {

            return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true,
                    (old, next) -> old && next);
        }
    });

    new Thread(() -> {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
        scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS);
    }).start();

    new Thread(() -> {

        try {
            // delays init
            Thread.sleep(20000);

            while (true) {

                IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
                final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures =
                        scheduler.getAllScheduledFutures();

                // check if the subscription already exists as a task, if so, stop it
                for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) {
                    for (final IScheduledFuture<Object> objectIScheduledFuture : entry) {
                        logger.info(
                                "TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ",
                                objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(),
                                objectIScheduledFuture.isCancelled(),
                                objectIScheduledFuture.getStats().getTotalRuns(),
                                objectIScheduledFuture.getDelay(TimeUnit.SECONDS),
                                objectIScheduledFuture.getStats());
                    }
                }

                Thread.sleep(15000);

            }

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }).start();

    while (true) {
        Thread.sleep(1000);
    }
    // Hazelcast.shutdownAll();
}
}
public class EchoTask implements Runnable, Serializable {

/**
 * serialVersionUID
 */
private static final long serialVersionUID = 5505122140975508363L;

final Logger logger = LoggerFactory.getLogger(EchoTask.class);

private final String msg;

public EchoTask(String msg) {
    this.msg = msg;
}

@Override
public void run() {
    logger.info("--> " + msg);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.