Skip to content

Commit

Permalink
issue-5: add auto failover support
Browse files Browse the repository at this point in the history
  • Loading branch information
junrao committed Dec 30, 2014
1 parent 571e2f8 commit e88ef8a
Show file tree
Hide file tree
Showing 45 changed files with 2,030 additions and 607 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
schema-registry
===============

Schema registry for Kafka

Quickstart
Expand All @@ -13,7 +12,7 @@ Quickstart
./bin/kafka-server-start.sh config/server.properties

3. Start the REST server by running io.confluent.kafka.schemaregistry.rest.Main
mvn exec:java
mvn exec:java -Dexec.mainClass="io.confluent.kafka.schemaregistry.rest.Main" -Dexec.args="config/schema-registry.properties"

4. Register a schema
curl -v -H "Content-Type: application/vnd.schemaregistry.v1+json" -X POST -i http://localhost:8080/topics/Kafka/value/versions -d '
Expand Down
15 changes: 15 additions & 0 deletions config/schema-registry.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

kafkastore.connection.url=localhost:2181
23 changes: 19 additions & 4 deletions src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.kafka.schemaregistry.rest;

import org.eclipse.jetty.server.Server;
Expand All @@ -6,7 +21,7 @@

import java.io.IOException;

import io.confluent.rest.ConfigurationException;
import io.confluent.rest.RestConfigException;

public class Main {

Expand All @@ -18,14 +33,14 @@ public class Main {
public static void main(String[] args) throws IOException {

try {
SchemaRegistryRestConfiguration config =
new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null));
SchemaRegistryConfig config =
new SchemaRegistryConfig((args.length > 0 ? args[0] : null));
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
log.info("Server started, listening for requests...");
server.join();
} catch (ConfigurationException e) {
} catch (RestConfigException e) {
log.error("Server configuration failed: ", e);
System.exit(1);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.kafka.schemaregistry.rest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

import io.confluent.kafka.schemaregistry.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.utils.RestUtils;

/**
* An agent responsible for forwarding an incoming registering schema request to another HTTP server
*/
public class RegisterSchemaForwardingAgent {

private static final Logger log = LoggerFactory.getLogger(RegisterSchemaForwardingAgent.class);
private final Map<String, String> requestProperties;
private final String topic;
private final boolean isKey;
private final RegisterSchemaRequest registerSchemaRequest;

public RegisterSchemaForwardingAgent(Map<String, String> requestProperties, String topic,
boolean isKey, RegisterSchemaRequest registerSchemaRequest) {
this.requestProperties = requestProperties;
this.topic = topic;
this.isKey = isKey;
this.registerSchemaRequest = registerSchemaRequest;
}

/**
* Forward the request
*
* @param host host to forward the request to
* @param port port to forward the request to
* @return The version id of the schema if registration is successful. Otherwise, throw a
* WebApplicationException.
*/
public int forward(String host, int port) throws SchemaRegistryException {
String baseUrl = String.format("http://%s:%d", host, port);
log.debug(String.format("Forwarding registering schema request %s to %s",
registerSchemaRequest, baseUrl));
try {
int version = RestUtils.registerSchema(baseUrl, requestProperties, registerSchemaRequest,
topic, isKey);
return version;
} catch (IOException e) {
throw new SchemaRegistryException(
String.format("Unexpected error while forwarding the registering schema request %s to %s",
registerSchemaRequest, baseUrl),
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.kafka.schemaregistry.rest;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;

import io.confluent.common.config.ConfigDef;
import io.confluent.common.config.ConfigException;
import io.confluent.rest.RestConfig;
import io.confluent.rest.RestConfigException;

import static io.confluent.common.config.ConfigDef.Range.atLeast;

public class SchemaRegistryConfig extends RestConfig {

public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url";
protected static final String KAFKASTORE_CONNECTION_URL_DOC =
"Zookeeper url for the Kafka cluster";

/**
* <code>kafkastore.zk.session.timeout.ms</code>
*/
public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG
= "kafkastore.zk.session.timeout.ms";
protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC =
"Zookeeper session timeout";

/**
* <code>kafkastore.topic</code>
*/
public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic";
public static final String DEFAULT_KAFKASTORE_TOPIC = "_schemas";
protected static final String KAFKASTORE_TOPIC_DOC =
"The durable single partition topic that acts" +
"as the durable log for the data";

/**
* <code>kafkastore.timeout.ms</code>
*/
public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms";
protected static final String KAFKASTORE_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";

/**
* <code>kafkastore.commit.interval.ms</code>
*/
public static final String KAFKASTORE_COMMIT_INTERVAL_MS_CONFIG = "kafkastore.commit.interval.ms";
protected static final String KAFKASTORE_COMMIT_INTERVAL_MS_DOC =
"The interval to commit offsets while consuming the Kafka topic";
public static final int OFFSET_COMMIT_OFF = -1;
// TODO: turn off offset commit by default for now since we only have an in-memory store
private static final int KAFKASTORE_COMMIT_INTERVAL_MS_DEFAULT = OFFSET_COMMIT_OFF;

/**
* <code>advertised.host</code>
*/
public static final String ADVERTISED_HOST_CONFIG = "advertised.host";
protected static final String ADVERTISED_HOST_DOC = "The host name advertised in Zookeeper";

static {
config
.defineOverride(RESPONSE_MEDIATYPE_PREFERRED_CONFIG, ConfigDef.Type.LIST,
Versions.PREFERRED_RESPONSE_TYPES, ConfigDef.Importance.HIGH,
RESPONSE_MEDIATYPE_PREFERRED_CONFIG_DOC)
.defineOverride(RESPONSE_MEDIATYPE_DEFAULT_CONFIG, ConfigDef.Type.STRING,
Versions.SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT, ConfigDef.Importance.HIGH,
RESPONSE_MEDIATYPE_DEFAULT_CONFIG_DOC)
.define(KAFKASTORE_CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
KAFKASTORE_CONNECTION_URL_DOC)
.define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 10000, atLeast(0),
ConfigDef.Importance.LOW, KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC)
.define(KAFKASTORE_TOPIC_CONFIG, ConfigDef.Type.STRING, DEFAULT_KAFKASTORE_TOPIC,
ConfigDef.Importance.HIGH, KAFKASTORE_TOPIC_DOC)
.define(KAFKASTORE_TIMEOUT_CONFIG, ConfigDef.Type.INT, 500, atLeast(0),
ConfigDef.Importance.MEDIUM, KAFKASTORE_TIMEOUT_DOC)
.define(KAFKASTORE_COMMIT_INTERVAL_MS_CONFIG, ConfigDef.Type.INT,
KAFKASTORE_COMMIT_INTERVAL_MS_DEFAULT, ConfigDef.Importance.MEDIUM,
KAFKASTORE_COMMIT_INTERVAL_MS_DOC)
.define(ADVERTISED_HOST_CONFIG, ConfigDef.Type.STRING, getDefaultHost(),
ConfigDef.Importance.LOW, ADVERTISED_HOST_DOC);
}

private static String getDefaultHost() {
try {
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
throw new ConfigException("Unknown local hostname", e);
}
}

public SchemaRegistryConfig(Map<? extends Object, ? extends Object> props) {
super(props);
}

public SchemaRegistryConfig(String propsFile) throws RestConfigException {
this(getPropsFromFile(propsFile));
}

public static void main(String[] args) {
System.out.println(config.toHtmlTable());
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.kafka.schemaregistry.rest;

import org.slf4j.Logger;
Expand All @@ -11,40 +26,30 @@
import io.confluent.kafka.schemaregistry.rest.resources.SchemasResource;
import io.confluent.kafka.schemaregistry.rest.resources.TopicsResource;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreConfig;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaSerializer;
import io.confluent.rest.Application;
import io.confluent.rest.ConfigurationException;
import io.confluent.rest.RestConfigException;

public class SchemaRegistryRestApplication extends Application<SchemaRegistryRestConfiguration> {
public class SchemaRegistryRestApplication extends Application<SchemaRegistryConfig> {

private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class);
private SchemaRegistry schemaRegistry = null;

public SchemaRegistryRestApplication() throws ConfigurationException {
this(new Properties());
public SchemaRegistryRestApplication(Properties props) throws RestConfigException {
this(new SchemaRegistryConfig(props));
}

public SchemaRegistryRestApplication(Properties props) throws ConfigurationException {
this(new SchemaRegistryRestConfiguration(props));
}

public SchemaRegistryRestApplication(SchemaRegistryRestConfiguration config) {
public SchemaRegistryRestApplication(SchemaRegistryConfig config) {
this.config = config;
}

@Override
public void setupResources(Configurable<?> config, SchemaRegistryRestConfiguration appConfig) {
Properties props = new Properties();
props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, appConfig.zookeeperConnect);
props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, appConfig.kafkastoreTopic);
SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props);

SchemaRegistry schemaRegistry = null;
public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRegistryConfig) {
try {
schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig, new SchemaSerializer());
schemaRegistry.init();
} catch (SchemaRegistryException e) {
log.error("Error starting the schema registry", e);
System.exit(1);
Expand All @@ -55,7 +60,12 @@ public void setupResources(Configurable<?> config, SchemaRegistryRestConfigurati
}

@Override
public SchemaRegistryRestConfiguration configure() throws ConfigurationException {
public SchemaRegistryConfig configure() {
return config;
}

// for testing purpose only
public SchemaRegistry schemaRegistry() {
return schemaRegistry;
}
}

0 comments on commit e88ef8a

Please sign in to comment.