Skip to content

Commit

Permalink
adding initialize routine to launch existing assignments (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent aa19226 commit d5955d8
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 21 deletions.
Expand Up @@ -22,6 +22,7 @@
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.functions.fs.FunctionConfig; import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest; import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest;
import org.apache.pulsar.functions.runtime.worker.request.MarkerRequest;
import org.apache.pulsar.functions.runtime.worker.request.RequestResult; import org.apache.pulsar.functions.runtime.worker.request.RequestResult;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest; import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager; import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
Expand All @@ -32,6 +33,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -55,6 +57,9 @@ public class FunctionMetaDataManager implements AutoCloseable {


private LinkedBlockingQueue<FunctionAction> actionQueue; private LinkedBlockingQueue<FunctionAction> actionQueue;


private boolean initializePhase = true;
private final String initializeMarkerRequestId = UUID.randomUUID().toString();

public FunctionMetaDataManager(WorkerConfig workerConfig, public FunctionMetaDataManager(WorkerConfig workerConfig,
ServiceRequestManager serviceRequestManager, ServiceRequestManager serviceRequestManager,
LinkedBlockingQueue<FunctionAction> actionQueue) { LinkedBlockingQueue<FunctionAction> actionQueue) {
Expand All @@ -63,6 +68,19 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
this.actionQueue = actionQueue; this.actionQueue = actionQueue;
} }


public boolean isInitializePhase() {
return initializePhase;
}

public void setInitializePhase(boolean initializePhase) {
this.initializePhase = initializePhase;
}

public void sendIntializationMarker() {
log.info("sending marking message...");
this.serviceRequestManager.submitRequest(new MarkerRequest(this.workerConfig.getWorkerId(), this.initializeMarkerRequestId));
}

@Override @Override
public void close() { public void close() {
serviceRequestManager.close(); serviceRequestManager.close();
Expand Down Expand Up @@ -208,7 +226,7 @@ public void proccessDeregister(DeregisterRequest deregisterRequest) {
// check if request is outdated // check if request is outdated
if (!isRequestOutdated(deregisterRequest)) { if (!isRequestOutdated(deregisterRequest)) {
// Check if this worker is suppose to run the function // Check if this worker is suppose to run the function
if (isMyRequest(deregisterRequest)) { if (shouldProcessRequest(deregisterRequest)) {
// stop running the function // stop running the function
insertStopAction(deregisterRequestFs); insertStopAction(deregisterRequestFs);
} }
Expand Down Expand Up @@ -247,7 +265,7 @@ public void processUpdate(UpdateRequest updateRequest) {
// Since this is the first time worker has seen function, just put it into internal function metadata store // Since this is the first time worker has seen function, just put it into internal function metadata store
addFunctionToFunctionMap(updateRequestFs); addFunctionToFunctionMap(updateRequestFs);
// Check if this worker is suppose to run the function // Check if this worker is suppose to run the function
if (isMyRequest(updateRequest)) { if (shouldProcessRequest(updateRequest)) {
insertStartAction(updateRequestFs); insertStartAction(updateRequestFs);
} }
completeRequest(updateRequest, true); completeRequest(updateRequest, true);
Expand All @@ -265,7 +283,7 @@ public void processUpdate(UpdateRequest updateRequest) {
// update the function metadata // update the function metadata
addFunctionToFunctionMap(updateRequestFs); addFunctionToFunctionMap(updateRequestFs);
// check if this worker should run the update // check if this worker should run the update
if (isMyRequest(updateRequest)) { if (shouldProcessRequest(updateRequest)) {
// Update the function // Update the function
insertStartAction(updateRequestFs); insertStartAction(updateRequestFs);
} }
Expand Down Expand Up @@ -299,29 +317,60 @@ private boolean isRequestOutdated(ServiceRequest serviceRequest) {
return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion(); return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
} }


private boolean isMyRequest(ServiceRequest serviceRequest) { private boolean shouldProcessRequest(ServiceRequest serviceRequest) {
return this.workerConfig.getWorkerId().equals(serviceRequest.getFunctionMetaData().getWorkerId()); return this.workerConfig.getWorkerId().equals(serviceRequest.getFunctionMetaData().getWorkerId());
} }


private boolean isSendByMe(ServiceRequest serviceRequest) {
return this.workerConfig.getWorkerId().equals(serviceRequest.getWorkerId());
}

private void insertStopAction(FunctionMetaData functionMetaData) { private void insertStopAction(FunctionMetaData functionMetaData) {
FunctionAction functionAction = new FunctionAction(); if (!this.isInitializePhase()) {
functionAction.setAction(FunctionAction.Action.STOP); FunctionAction functionAction = new FunctionAction();
functionAction.setFunctionMetaData(functionMetaData); functionAction.setAction(FunctionAction.Action.STOP);
try { functionAction.setFunctionMetaData(functionMetaData);
actionQueue.put(functionAction); try {
} catch (InterruptedException ex) { actionQueue.put(functionAction);
throw new RuntimeException("Interrupted while putting action"); } catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
} }
} }


private void insertStartAction(FunctionMetaData functionMetaData) { private void insertStartAction(FunctionMetaData functionMetaData) {
FunctionAction functionAction = new FunctionAction(); if (!this.isInitializePhase()) {
functionAction.setAction(FunctionAction.Action.START); FunctionAction functionAction = new FunctionAction();
functionAction.setFunctionMetaData(functionMetaData); functionAction.setAction(FunctionAction.Action.START);
try { functionAction.setFunctionMetaData(functionMetaData);
actionQueue.put(functionAction); try {
} catch (InterruptedException ex) { actionQueue.put(functionAction);
throw new RuntimeException("Interrupted while putting action"); } catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}
}

private boolean isMyInitializeMarkerRequest(MarkerRequest serviceRequest) {
return isSendByMe(serviceRequest) && this.initializeMarkerRequestId.equals(serviceRequest.getRequestId());
}

public void processInitializeMarker(MarkerRequest serviceRequest) {
if (isMyInitializeMarkerRequest(serviceRequest)) {
this.setInitializePhase(false);
log.info("Initializing Metadata state done!");
log.info("Launching existing assignments...");
// materialize current assignments
for (Map<String, Map<String, FunctionMetaData>> i: this.functionMap.values()) {
for (Map<String, FunctionMetaData> k : i.values()) {
for (FunctionMetaData functionMetaData : k.values()) {
// if I should run this
if (this.workerConfig.getWorkerId().equals(functionMetaData.getWorkerId())) {
insertStartAction(functionMetaData);
}
}
}
}
} }
} }
} }
Expand Up @@ -25,7 +25,9 @@
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest; import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest;
import org.apache.pulsar.functions.runtime.worker.request.MarkerRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest; import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest; import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest;


@Slf4j @Slf4j
Expand All @@ -43,9 +45,15 @@ public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManag
} }


public void start() { public void start() {
initialize();
receiveOne(); receiveOne();
} }


public void initialize() {
log.info("Initializing Metadata state...");
this.functionMetaDataManager.sendIntializationMarker();
}

private void receiveOne() { private void receiveOne() {
reader.readNextAsync() reader.readNextAsync()
.thenAccept(this) .thenAccept(this)
Expand Down Expand Up @@ -78,6 +86,9 @@ public void accept(Message msg) {
} }


switch(serviceRequest.getRequestType()) { switch(serviceRequest.getRequestType()) {
case MARKER:
this.functionMetaDataManager.processInitializeMarker((MarkerRequest) serviceRequest);
break;
case UPDATE: case UPDATE:
this.functionMetaDataManager.processUpdate((UpdateRequest) serviceRequest); this.functionMetaDataManager.processUpdate((UpdateRequest) serviceRequest);
break; break;
Expand Down
Expand Up @@ -22,11 +22,10 @@


public class DeregisterRequest extends ServiceRequest{ public class DeregisterRequest extends ServiceRequest{


private String functionName; private static final long serialVersionUID = 4041129758880596555L;


public DeregisterRequest(String workerId, FunctionMetaData functionMetaData) { public DeregisterRequest(String workerId, FunctionMetaData functionMetaData) {
super(workerId, functionMetaData, ServiceRequestType.DELETE); super(workerId, functionMetaData, ServiceRequestType.DELETE);
this.functionName = functionName;
} }


public static DeregisterRequest of(String workerId, FunctionMetaData functionMetaData) { public static DeregisterRequest of(String workerId, FunctionMetaData functionMetaData) {
Expand Down
@@ -0,0 +1,16 @@
package org.apache.pulsar.functions.runtime.worker.request;

public class MarkerRequest extends ServiceRequest{
private static final long serialVersionUID = -6074909284607487042L;

public MarkerRequest(String workerId, String requestId) {
super(workerId, null, ServiceRequestType.MARKER, requestId);
}

@Override
public String toString() {
return "MarkerRequest{"
+ super.toString()
+ "}";
}
}
Expand Up @@ -46,7 +46,8 @@ public String toString() {


public enum ServiceRequestType { public enum ServiceRequestType {
UPDATE, UPDATE,
DELETE DELETE,
MARKER
} }


public ServiceRequest(String workerId, FunctionMetaData functionMetaData, ServiceRequestType serviceRequestType) { public ServiceRequest(String workerId, FunctionMetaData functionMetaData, ServiceRequestType serviceRequestType) {
Expand Down
Expand Up @@ -22,6 +22,8 @@


public class UpdateRequest extends ServiceRequest { public class UpdateRequest extends ServiceRequest {


private static final long serialVersionUID = 4874557910196094342L;

private UpdateRequest(String workerId, FunctionMetaData functionMetaData) { private UpdateRequest(String workerId, FunctionMetaData functionMetaData) {
super(workerId, functionMetaData, ServiceRequestType.UPDATE); super(workerId, functionMetaData, ServiceRequestType.UPDATE);
} }
Expand Down

0 comments on commit d5955d8

Please sign in to comment.