Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class ForkingDruidNode extends DruidNode {
Expand All @@ -56,7 +57,7 @@ public class ForkingDruidNode extends DruidNode {

private Process druidProcess = null;

private Boolean started = false;
private final AtomicBoolean started = new AtomicBoolean(false);

private final List<String> allowedPrefixes = Lists.newArrayList(
"com.metamx",
Expand Down Expand Up @@ -122,9 +123,9 @@ public ForkingDruidNode(String nodeType,
@Override
public void start() throws IOException {
synchronized (started) {
if (started == false) {
if (started.get() == false) {
druidProcess = processBuilder.start();
started = true;
started.compareAndSet(false, true);
}
log.info("Started " + getNodeType());
}
Expand All @@ -133,7 +134,7 @@ public void start() throws IOException {
@Override
public boolean isAlive() {
synchronized (started) {
return started && druidProcess != null && druidProcess.isAlive();
return started.get() && druidProcess != null && druidProcess.isAlive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,57 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
* This class has the hooks to start and stop the external Druid Nodes
*/
public class MiniDruidCluster extends AbstractService {
private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class);

private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info";

private static final List<String> HISTORICAL_JVM_CONF = Arrays
.asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m",
COMMON_DRUID_JVM_PROPPERTIES
);

private static final List<String> COORDINATOR_JVM_CONF = Arrays
.asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m",
COMMON_DRUID_JVM_PROPPERTIES
);

private static final Map<String, String> COMMON_DRUID_CONF = ImmutableMap.of(
"druid.metadata.storage.type", "derby",
"druid.storage.type", "hdfs",
"druid.processing.buffer.sizeBytes", "213870912",
"druid.processing.numThreads", "2",
"druid.worker.capacity", "4"
);

private static final Map<String, String> COMMON_DRUID_HISTORICAL = ImmutableMap.of(
"druid.server.maxSize", "130000000000"
);

private static final Map<String, String> COMMON_COORDINATOR_INDEXER = ImmutableMap
.of(
"druid.indexer.logs.type", "file",
"druid.coordinator.asOverlord.enabled", "true",
"druid.coordinator.asOverlord.overlordService", "druid/overlord",
"druid.coordinator.period", "PT2S",
"druid.manager.segments.pollDuration", "PT2S"
);
private static final int MIN_PORT_NUMBER = 60000;
private static final String
COMMON_DRUID_JVM_PROPPERTIES =
"-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager "
+ "-Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info";

private static final List<String>
HISTORICAL_JVM_CONF =
Arrays.asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", COMMON_DRUID_JVM_PROPPERTIES);

private static final List<String>
COORDINATOR_JVM_CONF =
Arrays.asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", COMMON_DRUID_JVM_PROPPERTIES);

private static final Map<String, String>
COMMON_DRUID_CONF =
ImmutableMap.of("druid.metadata.storage.type",
"derby",
"druid.storage.type",
"hdfs",
"druid.processing.buffer.sizeBytes",
"213870912",
"druid.processing.numThreads",
"2",
"druid.worker.capacity",
"4");

private static final Map<String, String>
COMMON_DRUID_HISTORICAL =
ImmutableMap.of("druid.server.maxSize", "130000000000");

private static final Map<String, String>
COMMON_COORDINATOR_INDEXER =
ImmutableMap.of("druid.indexer.logs.type",
"file",
"druid.coordinator.asOverlord.enabled",
"true",
"druid.coordinator.asOverlord.overlordService",
"druid/overlord",
"druid.coordinator.period",
"PT2S",
"druid.manager.segments.pollDuration",
"PT2S");
private static final int MIN_PORT_NUMBER = 1999;
private static final int MAX_PORT_NUMBER = 65535;

private final DruidNode historical;
Expand All @@ -87,36 +98,42 @@ public class MiniDruidCluster extends AbstractService {

private final File logDirectory;
private final String derbyURI;

public MiniDruidCluster(String name) {
this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null);
}
private final int brokerPort;
private final int coordinatorPort;
private final int historicalPort;
private final int derbyPort;


public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) {
super(name);
this.dataDirectory = new File(tmpDir, "druid-data");
this.logDirectory = new File(logDir);
int derbyPort = findPort(MIN_PORT_NUMBER, MAX_PORT_NUMBER);

this.dataDirectory = new File(tmpDir, name + "-data");
this.logDirectory = new File(logDir, name + "-log");
int start = ThreadLocalRandom.current().nextInt(MIN_PORT_NUMBER + 1, MAX_PORT_NUMBER - 4);
coordinatorPort = findPort(start, MAX_PORT_NUMBER);
brokerPort = findPort(coordinatorPort + 1, MAX_PORT_NUMBER);
historicalPort = findPort(brokerPort + 1, MAX_PORT_NUMBER);
derbyPort = findPort(historicalPort + 1, MAX_PORT_NUMBER);
log.info("Druid ports are set to {} {} {} {}", coordinatorPort, brokerPort, historicalPort , derbyPort);

ensureCleanDirectory(dataDirectory);

derbyURI = String
.format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true",
derbyPort,
dataDirectory.getAbsolutePath()
);
String segmentsCache = String
.format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]",
dataDirectory.getAbsolutePath()
);
derbyURI =
String.format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true",
derbyPort,
dataDirectory.getAbsolutePath());
String
segmentsCache =
String.format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]",
dataDirectory.getAbsolutePath());
String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath();

ImmutableMap.Builder<String, String> coordinatorMapBuilder = new ImmutableMap.Builder();
ImmutableMap.Builder<String, String> historicalMapBuilder = new ImmutableMap.Builder();
ImmutableMap.Builder<String, String> brokerMapBuilder = new ImmutableMap.Builder();

Map<String, String> coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF)
Map<String, String>
coordinatorProperties =
coordinatorMapBuilder.putAll(COMMON_DRUID_CONF)
.putAll(COMMON_COORDINATOR_INDEXER)
.put("druid.metadata.storage.connector.connectURI", derbyURI)
.put("druid.metadata.storage.connector.port", String.valueOf(derbyPort))
Expand All @@ -125,33 +142,40 @@ public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zooke
.put("druid.coordinator.startDelay", "PT1S")
.put("druid.indexer.runner", "local")
.put("druid.storage.storageDirectory", getDeepStorageDir())
.put("druid.plaintextPort", String.valueOf(coordinatorPort))
.put("druid.zk.paths.base", "/" + name)
.build();
Map<String, String> historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF)
Map<String, String>
historicalProperties =
historicalMapBuilder.putAll(COMMON_DRUID_CONF)
.putAll(COMMON_DRUID_HISTORICAL)
.put("druid.zk.service.host", "localhost:" + zookeeperPort)
.put("druid.segmentCache.locations", segmentsCache)
.put("druid.storage.storageDirectory", getDeepStorageDir())
.put("druid.plaintextPort", String.valueOf(historicalPort))
.put("druid.zk.paths.base", "/" + name)
.build();
coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties,
COORDINATOR_JVM_CONF,
logDirectory, null
);
historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF,
logDirectory, null
);
broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF,
logDirectory, null
);
Map<String, String> brokerProperties = brokerMapBuilder.putAll(COMMON_DRUID_CONF)
.put("druid.zk.service.host", "localhost:" + zookeeperPort)
.put("druid.segmentCache.locations", segmentsCache)
.put("druid.storage.storageDirectory", getDeepStorageDir())
.put("druid.plaintextPort", String.valueOf(brokerPort))
.put("druid.zk.paths.base", "/" + name)
.build();
coordinator =
new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, logDirectory, null);
historical =
new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null);
broker = new ForkingDruidNode("broker", classpath, brokerProperties, HISTORICAL_JVM_CONF, logDirectory, null);
druidNodes = Arrays.asList(coordinator, historical, broker);

}

private int findPort(int start, int end) {
int port = start;
while (!available(port)) {
port++;
if (port == end) {
throw new RuntimeException("can not find free port for range " + start + ":" + end);
throw new RuntimeException("can not find free port for range " + start + ":" + end);
}
}
return port;
Expand All @@ -162,7 +186,7 @@ private int findPort(int start, int end) {
*
* @param port the port to check for availability
*/
public static boolean available(int port) {
private static boolean available(int port) {
if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
throw new IllegalArgumentException("Invalid start port: " + port);
}
Expand Down Expand Up @@ -193,7 +217,7 @@ public static boolean available(int port) {
return false;
}

private static void ensureCleanDirectory(File dir){
private static void ensureCleanDirectory(File dir) {
try {
if (dir.exists()) {
// need to clean data directory to ensure that there is no interference from old runs
Expand All @@ -211,15 +235,13 @@ private static void ensureCleanDirectory(File dir){
}
}

@Override
protected void serviceStart() throws Exception {
druidNodes.stream().forEach(node -> {
@Override protected void serviceStart() {
druidNodes.forEach(node -> {
try {
node.start();
} catch (IOException e) {
log.error("Failed to start node " + node.getNodeType()
+ " Consequently will destroy the cluster");
druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> {
log.error("Failed to start node " + node.getNodeType() + " Consequently will destroy the cluster");
druidNodes.stream().filter(DruidNode::isAlive).forEach(nodeToStop -> {
try {
log.info("Stopping Node " + nodeToStop.getNodeType());
nodeToStop.close();
Expand All @@ -232,9 +254,8 @@ protected void serviceStart() throws Exception {
});
}

@Override
protected void serviceStop() throws Exception {
druidNodes.stream().forEach(node -> {
@Override protected void serviceStop() {
druidNodes.forEach(node -> {
try {
node.close();
} catch (IOException e) {
Expand All @@ -244,7 +265,6 @@ protected void serviceStop() throws Exception {
});
}


public String getMetadataURI() {
return derbyURI;
}
Expand All @@ -253,12 +273,16 @@ public String getDeepStorageDir() {
return dataDirectory.getAbsolutePath() + File.separator + "deep-storage";
}

public String getCoordinatorURI(){
return "localhost:8081";
public String getCoordinatorURI() {
return "localhost:" + coordinatorPort;
}

public String getOverlordURI(){
public String getOverlordURI() {
// Overlord and coordinator both run in same JVM.
return getCoordinatorURI();
}

public String getBrokerURI() {
return "localhost:" + brokerPort;
}
}
14 changes: 7 additions & 7 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -1836,7 +1836,12 @@ spark.perf.disabled.query.files=query14.q,\
cbo_query99.q,\
mv_query44.q

druid.query.files=druidmini_test1.q,\
druid.query.files= druidkafkamini_basic.q, \
druidkafkamini_avro.q, \
druidkafkamini_csv.q, \
druidkafkamini_delimited.q, \
kafka_storage_handler.q, \
druidmini_test1.q,\
druidmini_test_ts.q,\
druidmini_joins.q,\
druidmini_test_insert.q,\
Expand All @@ -1848,12 +1853,7 @@ druid.query.files=druidmini_test1.q,\
druidmini_extractTime.q,\
druidmini_test_alter.q,\
druidmini_floorTime.q, \
druidmini_masking.q, \
druidkafkamini_basic.q, \
druidkafkamini_avro.q, \
druidkafkamini_csv.q, \
druidkafkamini_delimited.q, \
kafka_storage_handler.q
druidmini_masking.q

druid.llap.local.query.files=druidmini_noop.q

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -306,6 +307,7 @@ public void initConf() throws Exception {
conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI());
conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI());
conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI());
conf.set("hive.druid.broker.address.default", druidCluster.getBrokerURI());
final Path scratchDir = fs
.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir"));
fs.mkdirs(scratchDir);
Expand Down Expand Up @@ -607,7 +609,8 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws
if (clusterType == MiniClusterType.druidKafka
|| clusterType == MiniClusterType.druidLocal) {
final String tempDir = System.getProperty("test.tmp.dir");
druidCluster = new MiniDruidCluster("mini-druid",
String randomId = UUID.randomUUID().toString();
druidCluster = new MiniDruidCluster("mini-druid-" + randomId,
logDir,
tempDir,
setup.zkPort,
Expand Down
3 changes: 2 additions & 1 deletion ql/src/test/queries/clientpositive/druidkafkamini_avro.q
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string,

ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');

!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;
--@TODO Nishant can you explain why this needed ?
--!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;

-- Sleep for some time for ingestion tasks to ingest events
!sleep 60;
Expand Down
3 changes: 2 additions & 1 deletion ql/src/test/queries/clientpositive/druidkafkamini_basic.q
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ CREATE EXTERNAL TABLE druid_kafka_test(`__time` timestamp, page string, `user` s

ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');

!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;
--@TODO Nishant can you explain why this needed ?
--!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor;

-- Sleep for some time for ingestion tasks to ingest events
!sleep 60;
Expand Down
Loading