Skip to content

Commit

Permalink
Adding a config file to override quickstart configs (#8059)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 24, 2022
1 parent 9f40d1b commit e7ea235
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 34 deletions.
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;

Expand All @@ -37,10 +36,6 @@ public String getAuthToken() {
return null;
}

public Map<String, Object> getConfigOverrides() {
return null;
}

public void execute()
throws Exception {
File quickstartTmpDir = new File(_dataDir.getAbsolutePath());
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
Expand All @@ -33,7 +35,6 @@
import org.slf4j.LoggerFactory;

import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
import static org.apache.pinot.tools.Quickstart.printStatus;


/**
Expand All @@ -48,7 +49,7 @@
* ingestion_job_spec.json
* </code>
*/
public class GenericQuickstart {
public class GenericQuickstart extends QuickStartBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericQuickstart.class);
private final File _schemaFile;
private final File _tableConfigFile;
Expand All @@ -57,6 +58,11 @@ public class GenericQuickstart {
private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;

public GenericQuickstart() {
this(GenericQuickstart.class.getClassLoader().getResource("examples/batch/starbucksStores").getPath(),
"starbucksStores");
}

public GenericQuickstart(String tableDirectoryPath, String tableName) {
_tableDirectory = new File(tableDirectoryPath);
_tableName = tableName;
Expand All @@ -80,13 +86,19 @@ private void startKafka() {
_kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2));
}

@Override
public List<String> types() {
return Arrays.asList("GENERIC");
}

public void execute()
throws Exception {

File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
QuickstartTableRequest request = new QuickstartTableRequest(_tableDirectory.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
startKafka();
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
Expand All @@ -34,7 +36,6 @@
import org.slf4j.LoggerFactory;

import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
import static org.apache.pinot.tools.Quickstart.printStatus;


/**
Expand All @@ -43,10 +44,14 @@
* Creates a realtime table pullRequestMergedEvents
* Starts the {@link PullRequestMergedEventsStream} to publish pullRequestMergedEvents into the topic
*/
public class GitHubEventsQuickstart {
public class GitHubEventsQuickstart extends QuickStartBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class);
private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private String _personalAccessToken;

public GitHubEventsQuickstart() {
}

private void startKafka() {
_zookeeperInstance = ZkStarter.startLocalZkServer();
Expand All @@ -60,7 +65,7 @@ private void startKafka() {
_kafkaStarter.createTopic("pullRequestMergedEvents", KafkaStarterUtils.getTopicCreationProps(2));
}

public void execute(String personalAccessToken)
private void execute(String personalAccessToken)
throws Exception {
final File quickStartDataDir =
new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
Expand All @@ -84,7 +89,8 @@ public void execute(String personalAccessToken)
File tempDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
Preconditions.checkState(tempDir.mkdirs());
QuickstartTableRequest request = new QuickstartTableRequest(quickStartDataDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
startKafka();
Expand Down Expand Up @@ -149,4 +155,20 @@ public void execute(String personalAccessToken)

printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
}

@Override
public List<String> types() {
return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS");
}

@Override
public void execute()
throws Exception {
execute(_personalAccessToken);
}

public GitHubEventsQuickstart setPersonalAccessToken(String personalAccessToken) {
_personalAccessToken = personalAccessToken;
return this;
}
}
Expand Up @@ -115,7 +115,7 @@ public void execute()
Preconditions.checkState(dataDir.mkdirs());
QuickstartTableRequest bootstrapTableRequest = prepareTableRequest(baseDir);
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(bootstrapTableRequest),
1, 1, 1, 0, dataDir);
1, 1, 1, 0, dataDir, getConfigOverrides());
printStatus(Color.YELLOW, "***** Starting Kafka *****");
startKafka();
printStatus(Color.YELLOW, "***** Starting airline data stream and publishing to Kafka *****");
Expand Down
Expand Up @@ -80,7 +80,8 @@ public void execute()

File tempDir = new File(quickstartTmpDir, "tmp");
FileUtils.forceMkdir(tempDir);
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, 0, tempDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request, dimTableRequest), 1, 1, 3, 0, tempDir, getConfigOverrides());

printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Expand Up @@ -63,7 +63,8 @@ public void execute()
FileUtils.copyURLToFile(resource, ingestionJobSpecFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Expand Up @@ -65,7 +65,8 @@ public void execute()
FileUtils.copyURLToFile(resource, ingestionJobSpecFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.plugin.PluginManager;
Expand All @@ -36,7 +38,7 @@
import static org.apache.pinot.tools.Quickstart.printStatus;


public class PartialUpsertQuickStart {
public class PartialUpsertQuickStart extends QuickStartBase {
private StreamDataServerStartable _kafkaStarter;

public static void main(String[] args)
Expand All @@ -45,6 +47,11 @@ public static void main(String[] args)
new PartialUpsertQuickStart().execute();
}

@Override
public List<String> types() {
return Arrays.asList("PARTIAL-UPSERT", "PARTIAL_UPSERT");
}

// Todo: add a quick start demo
public void execute()
throws Exception {
Expand All @@ -66,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -18,15 +18,21 @@
*/
package org.apache.pinot.tools;

import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.utils.PinotConfigUtils;


public abstract class QuickStartBase {
protected File _dataDir = FileUtils.getTempDirectory();
protected String _zkExternalAddress;
protected String _configFilePath;

public QuickStartBase setDataDir(String dataDir) {
_dataDir = new File(dataDir);
Expand All @@ -38,6 +44,11 @@ public QuickStartBase setZkExternalAddress(String zkExternalAddress) {
return this;
}

public QuickStartBase setConfigFilePath(String configFilePath) {
_configFilePath = configFilePath;
return this;
}

public abstract List<String> types();

protected void waitForBootstrapToComplete(QuickstartRunner runner)
Expand All @@ -53,4 +64,13 @@ public static void printStatus(Quickstart.Color color, String message) {

public abstract void execute()
throws Exception;

protected Map<String, Object> getConfigOverrides() {
try {
return StringUtils.isEmpty(_configFilePath) ? ImmutableMap.of()
: PinotConfigUtils.readConfigFromFile(_configFilePath);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
}
}
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
Expand Down Expand Up @@ -67,10 +66,6 @@ public String getAuthToken() {
return null;
}

public Map<String, Object> getConfigOverrides() {
return null;
}

public static String prettyPrintResponse(JsonNode response) {
StringBuilder responseBuilder = new StringBuilder();

Expand Down
Expand Up @@ -74,7 +74,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -64,8 +63,8 @@ public static void main(String[] args)
}

public Map<String, Object> getConfigOverrides() {
Map<String, Object> properties = new HashMap<>();
properties.put("controller.task.scheduler.enabled", true);
Map<String, Object> properties = super.getConfigOverrides();
properties.putIfAbsent("controller.task.scheduler.enabled", true);
return properties;
}

Expand Down
Expand Up @@ -73,7 +73,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -74,7 +74,8 @@ public void execute()
FileUtils.copyURLToFile(resource, tableConfigFile);

QuickstartTableRequest request = new QuickstartTableRequest(bootstrapTableDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir);
final QuickstartRunner runner
= new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, dataDir, getConfigOverrides());

printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
Expand Down
Expand Up @@ -68,7 +68,7 @@ public String description() {
public boolean execute()
throws Exception {
PluginManager.get().init();
new GitHubEventsQuickstart().execute(_personalAccessToken);
new GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).execute();
return true;
}
}
Expand Up @@ -47,6 +47,10 @@ public class QuickStartCommand extends AbstractBaseAdminCommand implements Comma
description = "URL for an external Zookeeper instance instead of using the default embedded instance")
private String _zkExternalAddress;

@CommandLine.Option(names = {"-configFile", "-configFilePath"}, required = false,
description = "Config file path to override default pinot configs")
private String _configFilePath;

@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false,
description = "Print this message.")
private boolean _help = false;
Expand Down Expand Up @@ -130,6 +134,10 @@ public boolean execute() throws Exception {
quickstart.setZkExternalAddress(_zkExternalAddress);
}

if (_configFilePath != null) {
quickstart.setConfigFilePath(_configFilePath);
}

quickstart.execute();
return true;
}
Expand Down

0 comments on commit e7ea235

Please sign in to comment.