Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/src/main/java/com/metamx/druid/BaseNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.index.v1.serde.ComplexMetricRegistererer;
import com.metamx.druid.index.v1.serde.Registererer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkClientConfig;
Expand Down Expand Up @@ -174,7 +174,7 @@ public T registerJacksonSubtype(NamedType... namedTypes)
}

@SuppressWarnings("unchecked")
public T registerComplexMetric(ComplexMetricRegistererer registererer)
public T registerHandler(Registererer registererer)
{
registererer.register();
return (T) this;
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/com/metamx/druid/http/BrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void initializeDiscovery() throws Exception

final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig.getZkHosts(), lifecycle
serviceDiscoveryConfig, lifecycle
);

final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package com.metamx.druid.initialization;

import org.skife.config.Config;
import org.skife.config.Default;

/**
*/
public abstract class CuratorConfig
{
@Config("druid.zk.service.host")
public abstract String getZkHosts();

@Config("druid.zk.service.sessionTimeoutMs")
@Default("15000")
public abstract int getZkSessionTimeoutMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class Initialization
"druid.zk.paths.announcementsPath",
"druid.zk.paths.servedSegmentsPath",
"druid.zk.paths.loadQueuePath",
"druid.zk.paths.masterPath"};
"druid.zk.paths.masterPath"
};
public static final String DEFAULT_ZPATH = "/druid";

public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle)
Expand Down Expand Up @@ -119,10 +120,12 @@ public static ZKPhoneBook createPhoneBook(
}


/** Load properties.
/**
* Load properties.
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper.
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host=none then do not load properties from zookeeper.
*
* @return Properties ready to use.
*/
public synchronized static Properties loadProperties()
Expand All @@ -139,7 +142,9 @@ public synchronized static Properties loadProperties()

final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties");
if (stream == null) {
log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now.");
log.info(
"runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."
);
} else {
log.info("Loading properties from runtime.properties");
try {
Expand Down Expand Up @@ -202,7 +207,7 @@ public String getZkHosts()
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination.");
}
// validate properties now that all levels of precedence are loaded
if (! validateResolveProps(tmp_props)) {
if (!validateResolveProps(tmp_props)) {
log.error("Properties failed to validate, cannot continue");
throw new RuntimeException("Properties failed to validate");
}
Expand Down Expand Up @@ -231,14 +236,19 @@ public static Server makeJettyServer(ServerConfig config)
}

public static CuratorFramework makeCuratorFrameworkClient(
String zkHosts,
CuratorConfig curatorConfig,
Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(zkHosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 30))
.connectString(curatorConfig.getZkHosts())
.retryPolicy(
new ExponentialBackoffRetry(
1000,
30
)
)
.build();

lifecycle.addHandler(
Expand Down Expand Up @@ -353,12 +363,15 @@ public static String makePropPath(String basePath)
return String.format("%s/%s", basePath, PROP_SUBPATH);
}

/** Validate and Resolve Properties.
/**
* Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
* if none set, then construct defaults relative to druid.zk.paths.base and add these
* to the properties chain.
*
* @param props
*
* @return true if valid zpath properties.
*/
public static boolean validateResolveProps(Properties props)
Expand All @@ -374,7 +387,9 @@ public static boolean validateResolveProps(Properties props)

final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");

if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective);
if (!zpathValidateFailed) {
System.out.println("Effective zpath prefix=" + zpathEffective);
}

// validate druid.zk.paths.*Path properties
//
Expand Down Expand Up @@ -403,22 +418,25 @@ public static boolean validateResolveProps(Properties props)
}
}
if (zpathOverridesNotAbs) {
System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:");
System.err.println(
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:"
);
System.err.print(sbErrors.toString());
}
if (zpathOverrideCount > 0) {
if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) {
if (zpathOverrideCount < SUB_PATH_PROPS.length) {
zpathValidateFailed = true;
System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
"all must be overridden together; missing overrides:");
System.err.println(
"When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
"all must be overridden together; missing overrides:"
);
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val == null) {
System.err.println(" " + SUB_PATH_PROPS[i]);
}
}
if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath");
} else { // proper overrides
// do not prefix with property druid.zk.paths.base
; // fallthru
Expand All @@ -435,13 +453,16 @@ public static boolean validateResolveProps(Properties props)
}
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
}
return ! zpathValidateFailed;
return !zpathValidateFailed;
}

/** Check znode zpath base for proper slash, no trailing slash.
* @param zpathBase znode base path, if null then this method does nothing.
/**
* Check znode zpath base for proper slash, no trailing slash.
*
* @param zpathBase znode base path, if null then this method does nothing.
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
* where the zpathBase value came from.
*
* @return true if validate failed.
*/
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@

/**
*/
public abstract class ServiceDiscoveryConfig
public abstract class ServiceDiscoveryConfig extends CuratorConfig
{
@Config("druid.service")
public abstract String getServiceName();

@Config("druid.port")
public abstract int getPort();

@Config("druid.zk.service.host")
public abstract String getZkHosts();

@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package com.metamx.druid.index.v1.serde;

/**
* This is a "factory" interface for registering complex metrics in the system. It exists because I'm unaware of
* This is a "factory" interface for registering handlers in the system. It exists because I'm unaware of
* another way to register the complex serdes in the MR jobs that run on Hadoop. As such, instances of this interface
* must be instantiatable via a no argument default constructor (the MR jobs on Hadoop use reflection to instantiate
* instances).
*
* The name is not a typo, I felt that it needed an extra "er" to make the pronunciation that much more difficult.
*/
public interface ComplexMetricRegistererer
public interface Registererer
{
public void register();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
/**
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
* is sorted, supports binary search index lookups. If input is not sorted, only supports array-like index lookups.
*
* <p/>
* V1 Storage Format:
*
* <p/>
* byte 1: version (0x1)
* byte 2 == 0x1 => allowReverseLookup
* bytes 3-6 => numBytesUsed
Expand Down Expand Up @@ -253,6 +253,9 @@ public String fromByteBuffer(ByteBuffer buffer, int numBytes)
@Override
public byte[] toBytes(String val)
{
if (val == null) {
return new byte[]{};
}
return val.getBytes(Charsets.UTF_8);
}

Expand Down
6 changes: 5 additions & 1 deletion merger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-merger</artifactId>
Expand Down Expand Up @@ -178,6 +178,10 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected MergeTask(final String dataSource, final List<DataSegment> segments)
@Override
public boolean apply(@Nullable DataSegment segment)
{
return segment == null || !segment.getDataSource().equals(dataSource);
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
}
}
)
Expand Down
Loading