Skip to content

Commit

Permalink
KYLIN-4489 Create a tool for migration cross clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed May 25, 2020
1 parent c5e1550 commit d0d4ff3
Show file tree
Hide file tree
Showing 17 changed files with 1,595 additions and 22 deletions.
20 changes: 20 additions & 0 deletions core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.threadlocal.InternalThreadLocal;
import org.apache.kylin.common.util.ClassUtil;
Expand Down Expand Up @@ -571,6 +572,25 @@ public synchronized void reloadFromSiteProperties() {
reloadKylinConfig(buildSiteProperties());
}

public static String getConfigAsString(Configuration conf) {
final StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : conf) {
sb.append(entry.getKey() + "=" + entry.getValue()).append('\n');
}
return sb.toString();
}

public static Configuration getConfigFromString(String configInStr) throws IOException {
Properties props = new Properties();
props.load(new StringReader(configInStr));

Configuration config = new Configuration();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
config.set((String) entry.getKey(), (String) entry.getValue());
}
return config;
}

public KylinConfig base() {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import javax.xml.bind.DatatypeConverter;

import com.google.common.base.Strings;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand All @@ -55,6 +54,7 @@
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;

/**
*/
Expand Down Expand Up @@ -175,6 +175,25 @@ public Pair<String, String> getJobServerWithState() throws IOException {
}
}

public void announceWipeCache(String entity, String event, String cacheKey) throws IOException {
String url = baseUrl + "/cache/announce/" + entity + "/" + cacheKey + "/" + event;
HttpPut request = new HttpPut(url);

try {
HttpResponse response = client.execute(request);

if (response.getStatusLine().getStatusCode() != 200) {
String msg = EntityUtils.toString(response.getEntity());
throw new IOException("Invalid response " + response.getStatusLine().getStatusCode()
+ " with announce cache wipe url " + url + "\n" + msg);
}
} catch (Exception ex) {
throw new IOException(ex);
} finally {
request.releaseConnection();
}
}

public void wipeCache(String entity, String event, String cacheKey) throws IOException {
HttpPut request;
String url;
Expand Down Expand Up @@ -202,8 +221,19 @@ public void wipeCache(String entity, String event, String cacheKey) throws IOExc
}

public String getKylinProperties() throws IOException {
String url = baseUrl + "/admin/config";
HttpGet request = new HttpGet(url);
return getConfiguration(baseUrl + "/admin/config", false);
}

public String getHDFSConfiguration() throws IOException {
return getConfiguration(baseUrl + "/admin/config/hdfs", true);
}

public String getHBaseConfiguration() throws IOException {
return getConfiguration(baseUrl + "/admin/config/hbase", true);
}

private String getConfiguration(String url, boolean ifAuth) throws IOException {
HttpGet request = ifAuth ? newGet(url) : new HttpGet(url);
HttpResponse response = null;
try {
response = client.execute(request);
Expand Down Expand Up @@ -372,7 +402,7 @@ private void checkCompatibility(String jsonRequest, String url) throws IOExcepti
String msg = getContent(response);
Map<String, String> kvMap = JsonUtil.readValueAsMap(msg);
String exception = kvMap.containsKey("exception") ? kvMap.get("exception") : "unknown";
throw new IOException(exception);
throw new IOException("Error code: " + response.getStatusLine().getStatusCode() + "\n" + exception);
}
} finally {
post.releaseConnection();
Expand Down Expand Up @@ -411,7 +441,7 @@ private HttpPut newPut(String url) {
}

private HttpGet newGet(String url) {
HttpGet get = new HttpGet();
HttpGet get = new HttpGet(url);
addHttpHeaders(get);
return get;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.WriteConflictException;
Expand All @@ -48,6 +49,8 @@
public class DataModelManager {

private static final Logger logger = LoggerFactory.getLogger(DataModelManager.class);
public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<DataModelDesc>(
DataModelDesc.class);

public static DataModelManager getInstance(KylinConfig config) {
return config.getManager(DataModelManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
import org.apache.kylin.metadata.TableMetadataManager;
Expand All @@ -54,7 +56,9 @@

public class ProjectManager {
private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class);

public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(
ProjectInstance.class);

public static ProjectManager getInstance(KylinConfig config) {
return config.getManager(ProjectManager.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void checkCompatibility(@RequestBody CompatibilityCheckRequest request) {
}
DataModelDesc dataModelDesc = JsonUtil.readValue(request.getModelDescData(), DataModelDesc.class);
logger.info("Schema compatibility check for model {}", dataModelDesc.getName());
modelService.checkModelCompatibility(request.getProjectName(), dataModelDesc, tableDescList);
modelService.checkModelCompatibility(dataModelDesc, tableDescList);
logger.info("Pass schema compatibility check for model {}", dataModelDesc.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@
<version>${hadoop2.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>${hadoop2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ public GeneralResponse getPublicConfig() throws IOException {
return configRes;
}

@RequestMapping(value = "/config/hdfs", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
public GeneralResponse getHDFSConfig() throws IOException {
String config = adminService.getHadoopConfigAsString();

GeneralResponse configRes = new GeneralResponse();
configRes.put("config", config);

return configRes;
}

@RequestMapping(value = "/config/hbase", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
public GeneralResponse getHBaseConfig() throws IOException {
String config = adminService.getHBaseConfigAsString();

GeneralResponse configRes = new GeneralResponse();
configRes.put("config", config);

return configRes;
}

@RequestMapping(value = "/metrics/cubes", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
public MetricsResponse cubeMetrics(MetricsRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OrderedProperties;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
Expand Down Expand Up @@ -112,4 +114,16 @@ public String getPublicConfig() throws IOException {

return KylinConfig.getInstanceFromEnv().exportToString(propertyKeys);
}

public String getHadoopConfigAsString() throws IOException {
logger.debug("Get Kylin Hadoop Config");

return KylinConfig.getConfigAsString(HadoopUtil.getCurrentConfiguration());
}

public String getHBaseConfigAsString() throws IOException {
logger.debug("Get Kylin HBase Config");

return KylinConfig.getConfigAsString(HBaseConnection.getCurrentHBaseConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
Expand All @@ -28,13 +29,17 @@
import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;

public class HBaseInfoUtil {

@SuppressWarnings("unused") // used by reflection
public static HBaseResponse getHBaseInfo(String tableName, KylinConfig config) throws IOException {
if (!config.getStorageUrl().getScheme().equals("hbase"))
return null;

Connection conn = HBaseUnionUtil.getConnection(config, tableName);
return getHBaseInfo(tableName, conn);
}

public static HBaseResponse getHBaseInfo(String tableName, Connection conn) throws IOException {
HBaseResponse hr = null;
long tableSize = 0;
int regionCount = 0;
Expand All @@ -54,4 +59,12 @@ public static HBaseResponse getHBaseInfo(String tableName, KylinConfig config) t
hr.setRegionCount(regionCount);
return hr;
}

public static boolean checkEquals(HBaseResponse hbaseR1, HBaseResponse hbaseR2) {
if (hbaseR1 == hbaseR2)
return true;
return Objects.equals(hbaseR1.getTableName(), hbaseR2.getTableName())
&& hbaseR1.getTableSize() == hbaseR2.getTableSize()
&& hbaseR1.getRegionCount() == hbaseR2.getRegionCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,7 @@ public void checkModelCompatibility(String project, DataModelDesc dataModalDesc)
result.raiseExceptionWhenInvalid();
}

public void checkModelCompatibility(String project, DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
ProjectInstance prjInstance = getProjectManager().getProject(project);
if (prjInstance == null) {
throw new BadRequestException("Project " + project + " does not exist");
}
public void checkModelCompatibility(DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
ModelSchemaUpdateChecker checker = new ModelSchemaUpdateChecker(getTableManager(), getCubeManager(),
getDataModelManager());

Expand All @@ -181,7 +177,7 @@ public void checkModelCompatibility(String project, DataModelDesc dataModalDesc,
tableDescMap.put(tableDesc.getIdentity(), tableDesc);
}
dataModalDesc.init(getConfig(), tableDescMap);
ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, project, false);
ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, null, false);
result.raiseExceptionWhenInvalid();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.UUID;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -91,10 +92,14 @@ public HBaseResourceStore(KylinConfig kylinConfig) throws IOException {
.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
}

Connection getConnection() throws IOException {
protected Connection getConnection() throws IOException {
return HBaseConnection.get(metadataUrl);
}

protected Configuration getCurrentHBaseConfiguration() {
return HBaseConnection.getCurrentHBaseConfiguration();
}

private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException {
StorageURL url = kylinConfig.getMetadataUrl();
if (!url.getScheme().equals("hbase"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void run() {
}

public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, getHDFSWorkingDirectory(config));
FileStatus newestJar = null;
for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
if (fileStatus.getPath().toString().endsWith(".jar")) {
Expand All @@ -440,16 +440,22 @@ public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSy
return path;
}

public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
Set<String> oldJarPaths) throws IOException {
public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
Set<String> oldJarPaths) throws IOException {
String hdfsWorkingDirectory = getHDFSWorkingDirectory(KylinConfig.getInstanceFromEnv());
return uploadCoprocessorJar(localCoprocessorJar, fileSystem, hdfsWorkingDirectory, oldJarPaths);
}

public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
String hdfsWorkingDirectory, Set<String> oldJarPaths) throws IOException {
Path uploadPath = null;
File localCoprocessorFile = new File(localCoprocessorJar);

// check existing jars
if (oldJarPaths == null) {
oldJarPaths = new HashSet<String>();
}
Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, hdfsWorkingDirectory);
for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
if (isSame(localCoprocessorFile, fileStatus)) {
uploadPath = fileStatus.getPath();
Expand Down Expand Up @@ -511,9 +517,12 @@ private static String getBaseFileName(String localCoprocessorJar) {
return baseName;
}

private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
private static String getHDFSWorkingDirectory(KylinConfig config) {
String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
hdfsWorkingDirectory = HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
return HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
}

private static Path getCoprocessorHDFSDir(FileSystem fileSystem, String hdfsWorkingDirectory) throws IOException {
Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
fileSystem.mkdirs(coprocessorDir);
return coprocessorDir;
Expand Down
11 changes: 11 additions & 0 deletions tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>hbase-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
Expand All @@ -86,6 +91,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>provided</scope>
</dependency>

<!--Spring-->
<dependency>
<groupId>org.springframework</groupId>
Expand Down

0 comments on commit d0d4ff3

Please sign in to comment.