Skip to content

Commit

Permalink
Rewrite FunctionStateListener to use receiveAsyc (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 84a8040 commit 0c113d7
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 149 deletions.
Expand Up @@ -68,7 +68,7 @@ public static void main(String[] args) throws Exception {
} }


final Worker worker = new Worker(workerConfig); final Worker worker = new Worker(workerConfig);
worker.start(); worker.startAsync();
} }


} }
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.worker;

import java.io.IOException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest;
import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest;

@Slf4j
public class FunctionStateConsumer
implements java.util.function.Consumer<Message>, Function<Throwable, Void>, AutoCloseable {

private final FunctionStateManager functionStateManager;
private final Consumer consumer;

public FunctionStateConsumer(FunctionStateManager functionStateManager,
Consumer consumer)
throws PulsarClientException {
this.functionStateManager = functionStateManager;
this.consumer = consumer;
}

public void start() {
receiveOne();
}

private void receiveOne() {
consumer.receiveAsync()
.thenAccept(this)
.exceptionally(this);
}

@Override
public void close() {
log.info("Stopping function state consumer");
try {
consumer.close();
} catch (PulsarClientException e) {
log.error("Failed to stop function state consumer", e);
}
log.info("Stopped function state consumer");
}

@Override
public void accept(Message msg) {
ServiceRequest serviceRequest;
try {
serviceRequest = (ServiceRequest) Utils.getObject(msg.getData());
} catch (IOException | ClassNotFoundException e) {
log.error("Received bad service request at message {}", msg.getMessageId(), e);
// TODO: find a better way to handle bad request
throw new RuntimeException(e);
}
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}

switch(serviceRequest.getRequestType()) {
case UPDATE:
this.functionStateManager.processUpdate((UpdateRequest) serviceRequest);
break;
case DELETE:
this.functionStateManager.proccessDeregister((DeregisterRequest) serviceRequest);
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}

// ack msg
consumer.acknowledgeAsync(msg);
// receive next request
receiveOne();
}

@Override
public Void apply(Throwable cause) {
log.error("Failed to retrieve messages from function state topic", cause);
// TODO: find a better way to handle consumer functions
throw new RuntimeException(cause);
}
}
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.runtime.worker; package org.apache.pulsar.functions.runtime.worker;


import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.fs.FunctionConfig; import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest; import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest;
import org.apache.pulsar.functions.runtime.worker.request.RequestResult; import org.apache.pulsar.functions.runtime.worker.request.RequestResult;
Expand All @@ -39,7 +38,7 @@
/** /**
* A manager manages function states. * A manager manages function states.
*/ */
public class FunctionStateManager { public class FunctionStateManager implements AutoCloseable {


private static final Logger LOG = LoggerFactory.getLogger(FunctionStateManager.class); private static final Logger LOG = LoggerFactory.getLogger(FunctionStateManager.class);


Expand All @@ -53,15 +52,17 @@ public class FunctionStateManager {


private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;


public FunctionStateManager(WorkerConfig workerConfig) throws PulsarClientException { public FunctionStateManager(WorkerConfig workerConfig,
this(workerConfig, new ServiceRequestManager(workerConfig)); ServiceRequestManager serviceRequestManager) {
}

public FunctionStateManager(WorkerConfig workerConfig, ServiceRequestManager serviceRequestManager) {
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
this.serviceRequestManager = serviceRequestManager; this.serviceRequestManager = serviceRequestManager;
} }


@Override
public void close() {
serviceRequestManager.close();
}

public FunctionState getFunction(String tenant, String namespace, String functionName) { public FunctionState getFunction(String tenant, String namespace, String functionName) {
return this.functionStateMap.get(tenant).get(namespace).get(functionName); return this.functionStateMap.get(tenant).get(namespace).get(functionName);
} }
Expand Down
Expand Up @@ -18,42 +18,84 @@
*/ */
package org.apache.pulsar.functions.runtime.worker; package org.apache.pulsar.functions.runtime.worker;


import com.google.common.util.concurrent.AbstractService;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.runtime.worker.rest.FunctionStateListener; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.runtime.worker.rest.WorkerServer; import org.apache.pulsar.functions.runtime.worker.rest.WorkerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class Worker { @Slf4j

public class Worker extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);


private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final FunctionStateManager functionStateManager; private PulsarClient client;
private FunctionStateManager functionStateManager;
private FunctionStateConsumer functionStateConsumer;
private Thread serverThread;


public Worker(WorkerConfig workerConfig) throws PulsarClientException { public Worker(WorkerConfig workerConfig) {
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
this.functionStateManager = new FunctionStateManager(workerConfig);
} }


public void start() throws InterruptedException, PulsarClientException { @Override
LOG.info("Start worker {}...", workerConfig.getWorkerId()); protected void doStart() {
try {
this.client = PulsarClient.create(workerConfig.getPulsarServiceUrl());
ServiceRequestManager reqMgr = new ServiceRequestManager(
client.createProducer(workerConfig.getFunctionMetadataTopic()));
this.functionStateManager = new FunctionStateManager(
workerConfig, reqMgr);


final WorkerServer workerServer = new WorkerServer(this.workerConfig, this.functionStateManager); ConsumerConfiguration consumerConf = new ConsumerConfiguration();
consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
this.functionStateConsumer = new FunctionStateConsumer(
functionStateManager,
client.subscribe(
workerConfig.getFunctionMetadataTopic(),
workerConfig.getFunctionMetadataTopicSubscription(),
consumerConf));


FunctionStateListener functionStateListener = new FunctionStateListener(this.workerConfig, this.functionStateManager); log.info("Start worker {}...", workerConfig.getWorkerId());
} catch (PulsarClientException e) {
log.error("Failed to create pulsar client to {}",
workerConfig.getPulsarServiceUrl(), e);
throw new RuntimeException(e);
}


Thread serverThread = new Thread(workerServer); WorkerServer server = new WorkerServer(workerConfig, functionStateManager);
serverThread.setName(workerServer.getThreadName()); this.serverThread = new Thread(server, server.getThreadName());
Thread listenerThread = new Thread(functionStateListener);
listenerThread.setName(functionStateListener.getThreadName());


LOG.info("Start worker server on port {}...", workerConfig.getWorkerPort()); log.info("Start worker server on port {}...", workerConfig.getWorkerPort());
serverThread.start(); serverThread.start();
LOG.info("Start worker metadata listener..."); log.info("Start worker function state consumer ...");
listenerThread.start(); functionStateConsumer.start();
}


serverThread.join(); @Override
listenerThread.join(); protected void doStop() {
if (null != serverThread) {
serverThread.interrupt();
try {
serverThread.join();
} catch (InterruptedException e) {
log.warn("Worker server thread is interrupted", e);
}
}
if (null != functionStateConsumer) {
functionStateConsumer.close();
}
if (null != functionStateManager) {
functionStateManager.close();
}
if (null != client) {
try {
client.close();
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar client", e);
}
}
} }
} }
Expand Up @@ -18,51 +18,45 @@
*/ */
package org.apache.pulsar.functions.runtime.worker.request; package org.apache.pulsar.functions.runtime.worker.request;


import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.functions.runtime.worker.Utils; import org.apache.pulsar.functions.runtime.worker.Utils;
import org.apache.pulsar.functions.runtime.worker.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


public class ServiceRequestManager { @Slf4j
public class ServiceRequestManager implements AutoCloseable {


private static final Logger LOG = LoggerFactory.getLogger(ServiceRequestManager.class); private final Producer producer;


PulsarClient client; public ServiceRequestManager(Producer producer) throws PulsarClientException {
Producer producer; this.producer = producer;

public ServiceRequestManager(WorkerConfig workerConfig) throws PulsarClientException {
String pulsarBrokerRootUrl = workerConfig.getPulsarServiceUrl();
client = PulsarClient.create(pulsarBrokerRootUrl);
String topic = workerConfig.getFunctionMetadataTopic();

producer = client.createProducer(topic);
} }


public CompletableFuture<MessageId> submitRequest(ServiceRequest serviceRequest) { public CompletableFuture<MessageId> submitRequest(ServiceRequest serviceRequest) {
LOG.debug("Submitting Service Request: {}", serviceRequest); if (log.isDebugEnabled()) {
log.debug("Submitting Service Request: {}", serviceRequest);
}
byte[] bytes; byte[] bytes;
try { try {
bytes = Utils.toByteArray(serviceRequest); bytes = Utils.toByteArray(serviceRequest);
} catch (IOException e) { } catch (IOException e) {
LOG.error("error serializing request: " + serviceRequest); log.error("error serializing request {}", serviceRequest, e);
throw new RuntimeException(e); return FutureUtil.failedFuture(e);
} }

return producer.sendAsync(bytes);
CompletableFuture<MessageId> messageIdCompletableFuture = send(bytes);

return messageIdCompletableFuture;
} }


public CompletableFuture<MessageId> send(byte[] message) { @Override
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message); public void close() {

try {
return messageIdCompletableFuture; this.producer.close();
} catch (PulsarClientException e) {
log.warn("Failed to close producer for service request manager", e);
}
} }
} }

0 comments on commit 0c113d7

Please sign in to comment.