Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionIteratingOperation should be non blocking #4889

Closed
pveentjer opened this issue Mar 24, 2015 · 0 comments

Comments

Projects
None yet
3 participants
@pveentjer
Copy link
Member

commented Mar 24, 2015

Currently the PartitionIteratingOperation consumes a generic operation thread, while the operations for the partitions are running. This means that the generic threadpool can dry up and this can cause a lot of problems; also problems for important tasks that can't be executed because the threadpool is empty.

See code below for how it can be done. The issue however is that with the current approach it is very easy to check if an operation is still running by looking at the generic threads their operationrunners. But with the non blocking approach this won't work so you need to make the Operation an TraceableOperation. However you don't want to ask the service which is configured for the PartitionIteratingOperation, because that will be the service of its child operations.

Probably best to introduce a PartitionIteratingService where the system can lookup TraceableOperations.

/*
 * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.hazelcast.spi.impl.operationservice.impl.operations;

import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationAccessor;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.ResponseHandler;
import com.hazelcast.spi.TraceableOperation;
import com.hazelcast.spi.impl.SpiDataSerializerHook;
import com.hazelcast.spi.impl.operationservice.impl.responses.NormalResponse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

//todo: this can be a long running operation, so we need to register ourselves.

/**
 * An {@link com.hazelcast.spi.Operation} that executes another operation on a set of partitions. For example it can
 * be used to implement the map.size, since each partition needs to be asked for its size for that given map.
 *
 * Internally it uses a {@link com.hazelcast.spi.OperationFactory} to create an operation for each partition.
 *
 * This operation is non blocking; it will not directly return a result. The advantage is that no thread is being consumed
 * that waits for all operations to complete.
 *
 * It works like this: on each of the desired partitions a Operation is creates using the {@link OperationFactory} and on
 * each of the Operations a PartitionResponseHandler is registered. Each PartitionResponseHandler add the result to the
 * 'results' map and when all results are returned, a response is returned.
 */
public final class PartitionIteratingOperation
        extends AbstractOperation implements IdentifiedDataSerializable, TraceableOperation {

    private static final AtomicIntegerFieldUpdater pendingUpdater
            = AtomicIntegerFieldUpdater.newUpdater(PartitionIteratingOperation.class, "pending");

    private String traceId;
    private List<Integer> partitions;
    private OperationFactory operationFactory;
    private Map<Integer, Object> results;
    private volatile int pending;

    public PartitionIteratingOperation() {
    }

    public PartitionIteratingOperation(List<Integer> partitions, OperationFactory operationFactory, String traceId) {
        this.partitions = partitions == null ? Collections.<Integer>emptyList() : partitions;
        this.operationFactory = operationFactory;
        this.traceId = traceId;
    }

    @Override
    public Object getTraceIdentifier() {
        return traceId;
    }

    @Override
    public void run() throws Exception {
        pending = partitions.size();
        // needs to be threadsafe: the PartitionResponseHandler that access the results map are called from different threads.
        results = new ConcurrentHashMap<Integer, Object>(partitions.size());

        NodeEngine nodeEngine = getNodeEngine();
        OperationService operationService = nodeEngine.getOperationService();

        for (int partitionId : partitions) {
            Operation operation = operationFactory.createOperation();
            operation.setNodeEngine(nodeEngine)
                    .setPartitionId(partitionId)
                    .setReplicaIndex(getReplicaIndex())
                    .setResponseHandler(new PartitionResponseHandler(operation))
                    .setServiceName(getServiceName())
                    .setService(getService())
                    .setCallerUuid(getCallerUuid());
            OperationAccessor.setCallerAddress(operation, getCallerAddress());
            operationService.executeOperation(operation);
        }
    }

    @Override
    public boolean returnsResponse() {
        return false;
    }

    @Override
    public Object getResponse() {
        return new PartitionResponse(results);
    }

    @Override
    public int getFactoryId() {
        return SpiDataSerializerHook.F_ID;
    }

    @Override
    public int getId() {
        return SpiDataSerializerHook.PARTITION_ITERATOR;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeUTF(traceId);
        int pCount = partitions.size();
        out.writeInt(pCount);
        for (Integer partition : partitions) {
            out.writeInt(partition);
        }
        out.writeObject(operationFactory);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        traceId = in.readUTF();
        int pCount = in.readInt();
        partitions = new ArrayList<Integer>(pCount);
        for (int i = 0; i < pCount; i++) {
            partitions.add(in.readInt());
        }
        operationFactory = in.readObject();
    }

    private class PartitionResponseHandler implements ResponseHandler {
        private final Operation op;

        public PartitionResponseHandler(Operation op) {
            this.op = op;
        }

        @Override
        public void sendResponse(Object result) {
            if (result instanceof NormalResponse) {
                result = ((NormalResponse) result).getValue();
            }

            results.put(op.getPartitionId(), result);

            if (pendingUpdater.decrementAndGet(PartitionIteratingOperation.this) != 0) {
                return;
            }

            // all results have completed, so send that back to the caller.
            PartitionResponse partitionResponse = new PartitionResponse(results);
            NormalResponse normalResponse = new NormalResponse(partitionResponse, getCallId(), 0, isUrgent());
            getResponseHandler().sendResponse(normalResponse);
        }

        @Override
        public boolean isLocal() {
            return false;
        }
    }

    // To make serialization of HashMap faster.
    public static final class PartitionResponse implements IdentifiedDataSerializable {

        private Map<Integer, Object> results;

        public PartitionResponse() {
        }

        public PartitionResponse(Map<Integer, Object> results) {
            this.results = results != null ? results : Collections.<Integer, Object>emptyMap();
        }

        public Map<? extends Integer, ?> asMap() {
            return results;
        }

        @Override
        public int getFactoryId() {
            return SpiDataSerializerHook.F_ID;
        }

        @Override
        public int getId() {
            return SpiDataSerializerHook.PARTITION_RESPONSE;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            int length = results != null ? results.size() : 0;
            out.writeInt(length);
            if (length > 0) {
                for (Map.Entry<Integer, Object> entry : results.entrySet()) {
                    out.writeInt(entry.getKey());
                    out.writeObject(entry.getValue());
                }
            }
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            int length = in.readInt();
            if (length > 0) {
                results = new HashMap<Integer, Object>(length);
                for (int i = 0; i < length; i++) {
                    int pid = in.readInt();
                    Object value = in.readObject();
                    results.put(pid, value);
                }
            } else {
                results = Collections.emptyMap();
            }
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.