Skip to content

Commit

Permalink
Refactoring of DiscoveryService
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and rdhabalia committed Sep 20, 2016
1 parent d93b233 commit de2c7ee
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 493 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Client;
Expand All @@ -38,7 +39,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
Expand All @@ -64,22 +64,22 @@ protected void cleanup() throws Exception {
/**
* 1. Start : Broker and Discovery service. 2. Provide started broker server as active broker to Discovery service
* 3. Call GET, PUT, POST request to discovery service that redirects to Broker service and receives response
*
*
* @throws Exception
*/
@Test
public void testRiderectUrlWithServerStarted() throws Exception {

// 1. start server
List<String> resources = Lists.newArrayList(DiscoveryService.class.getPackage().getName());
System.setProperty("zookeeperServers", "dummy-value");
System.setProperty("zooKeeperSessionTimeoutMillis", "1000");

int port = PortManager.nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
server.start(resources);

Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zooKeeperSessionTimeoutMillis", "1000");
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();

ZookeeperCacheLoader.availableActiveBrokers.add(super.brokerUrl.getHost() + ":" + super.brokerUrl.getPort());

Expand Down Expand Up @@ -122,7 +122,8 @@ public String hitBrokerService(String method, String url, BundlesData bundle) th
fail();
}

JSONObject jsonObject = new JSONObject(response.readEntity(String.class));
String s = response.readEntity(String.class);
JSONObject jsonObject = new JSONObject();
String serviceResponse = jsonObject.getString("reason");
return serviceResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ZookeeperCacheLoader(String zookeeperServers) throws InterruptedException
// dummy constructor
}

public List<String> getAvailableActiveBrokers() {
public List<String> getAvailableBrokers() {
return this.availableActiveBrokers;
}

Expand Down
8 changes: 8 additions & 0 deletions pulsar-discovery-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-zookeeper-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

</dependencies>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;

/**
* Acts a load-balancer that receives any incoming request and discover active-available broker in round-robin manner
* and redirect request to that broker.
* <p>
* Accepts any {@value GET, PUT, POST} request and redirects to available broker-server to serve the request
* </p>
*
*/
public class DiscoveryServiceServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

private final AtomicInteger counter = new AtomicInteger();

private ZookeeperCacheLoader zkCache;

@Override
public void init(ServletConfig config) throws ServletException {
log.info("Initializing DiscoveryServiceServlet resource");

String zookeeperServers = config.getInitParameter("zookeeperServers");
String zookeeperClientFactoryClassName = config.getInitParameter("zookeeperClientFactoryClass");
if (zookeeperClientFactoryClassName == null) {
zookeeperClientFactoryClassName = ZookeeperClientFactoryImpl.class.getName();
}

log.info("zookeeperServers={} zookeeperClientFactoryClass={}", zookeeperServers,
zookeeperClientFactoryClassName);

try {
ZooKeeperClientFactory zkClientFactory = (ZooKeeperClientFactory) Class
.forName(zookeeperClientFactoryClassName).newInstance();

zkCache = new ZookeeperCacheLoader(zkClientFactory, zookeeperServers);
} catch (Throwable t) {
throw new ServletException(t);
}
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doOptions(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

@Override
protected void doTrace(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
redirect(req, resp);
}

private void redirect(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

try {
LoadReport broker = nextBroker();

URI brokerURI;
if (request.getScheme().equals("http")) {
// Use normal HTTP url
brokerURI = new URI(broker.getWebServiceUrl());
} else {
brokerURI = new URI(broker.getWebServiceUrlTls());
}

StringBuilder location = new StringBuilder();
location.append(brokerURI.getScheme()).append("://").append(brokerURI.getHost()).append(':')
.append(brokerURI.getPort()).append(request.getRequestURI());
if (request.getQueryString() != null) {
location.append('?').append(request.getQueryString());
}

log.info("Redirecting to {}", location);
response.sendRedirect(location.toString());
} catch (URISyntaxException e) {
log.warn("No broker found in zookeeper {}", e.getMessage(), e);
throw new RestException(Status.SERVICE_UNAVAILABLE, "Broker is not available");
}
}

/**
* Find next broke url in round-robin
*
* @return
*/
LoadReport nextBroker() {
List<LoadReport> availableBrokers = zkCache.getAvailableBrokers();

if (availableBrokers.isEmpty()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
} else {
int brokersCount = availableBrokers.size();
int nextIdx = Math.abs(counter.getAndIncrement()) % brokersCount;
return availableBrokers.get(nextIdx);
}
}

private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceServlet.class);
}
Loading

0 comments on commit de2c7ee

Please sign in to comment.