Skip to content

Commit

Permalink
[KYLIN-4939] Transform lookup table snapshot from segment level to cu…
Browse files Browse the repository at this point in the history
…be level
  • Loading branch information
Ted-Jiang authored and hit-lacus committed Apr 6, 2021
1 parent e44aee2 commit 25b8a88
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 0 deletions.
12 changes: 12 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
}
}

public CubeDesc updatelookupTableSnapshotGlobal(CubeDesc desc, String lookupTable, boolean global) throws IOException {
try (AutoLock lock = descMapLock.lockForWrite()) {
CubeDesc copy = desc.latestCopyForWrite();
copy.createAndSetSnapshotTableGlobal(lookupTable, global);
return updateCubeDesc(copy);
}
}

public CubeDesc copyForWrite(CubeDesc desc) {
return crud.copyForWrite(desc);
}

/**
* if there is some change need be applied after getting a cubeDesc from front-end, do it here
* @param cubeDesc
Expand Down
21 changes: 21 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -375,6 +376,26 @@ public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTab
}
}

public CubeInstance updateCubeToBeGlobal(CubeInstance cube, String lookupTable) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite();
CubeUpdate update = new CubeUpdate(cube);
String snapshotResPath = cube.getLastSegment().getSnapshotResPath(lookupTable);
Map<String, String> updateResPath = new HashMap<>();
updateResPath.put(lookupTable, snapshotResPath);
update.setUpdateTableSnapshotPath(updateResPath);
List<CubeSegment> updateCubeSegments = new ArrayList<>();

for (CubeSegment seg : update.getCubeInstance().getSegments()) {
seg.removeSnapshots(lookupTable);
updateCubeSegments.add(seg);
}
update.setToUpdateSegs(updateCubeSegments.toArray(new CubeSegment[0]));

return updateCube(update);
}
}

private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
if (update == null || update.getCubeInstance() == null)
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ public void resetSnapshots() {
snapshots = new ConcurrentHashMap<String, String>();
}

public void removeSnapshots(String lookupTable) {
snapshots.remove(lookupTable);
}

public String getSnapshotResPath(String table) {
return getSnapshots().get(table);
}
Expand Down
33 changes: 33 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.kylin.common.util.Array;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.dict.GlobalDictionaryBuilder;
import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
Expand Down Expand Up @@ -623,6 +624,12 @@ public boolean consistentWith(CubeDesc another) {
return this.calculateSignature().equals(another.calculateSignature());
}

public CubeDesc latestCopyForWrite() {
CubeDescManager mgr = CubeDescManager.getInstance(config);
CubeDesc lastest = mgr.getCubeDesc(name);
return mgr.copyForWrite(lastest);
}

public String calculateSignature() {
MessageDigest md;
try {
Expand Down Expand Up @@ -1558,6 +1565,32 @@ public boolean isGlobalSnapshotTable(String tableName) {
return desc.isGlobal();
}

public void createAndSetSnapshotTableGlobal(String tableName, boolean global) {
SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
// if desc not exist create one
if (desc == null) {
SnapshotTableDesc newDesc = new SnapshotTableDesc();
newDesc.setGlobal(global);
newDesc.setTableName(tableName);
snapshotTableDescList.add(newDesc);
} else {
desc.setGlobal(global);
}
}

public Set<String> getInMemLookupTables() {
Set<String> snapshots = Sets.newHashSet();
for (DimensionDesc dim : getDimensions()) {
TableRef table = dim.getTableRef();
if (getModel().isLookupTable(table)) {
if (!isExtSnapshotTable(table.getTableIdentity())) {
snapshots.add(table.getTableIdentity());
}
}
}
return snapshots;
}

public boolean isExtSnapshotTable(String tableName) {
SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
if (desc == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,35 @@ public JobInstance rebuildLookupSnapshot(@PathVariable String cubeName,
}
}

/**
* Force change a cube's lookup table to be global
*
*@throws IOException
*/
@RequestMapping(value = "/{cubeNames}/{tableName}/change_lookup_global", method = {
RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public List<CubeInstance> globalLookupSnapshot(@PathVariable String cubeNames, @PathVariable String tableName) {

List<CubeInstance> result = new ArrayList<>();

final CubeManager cubeMgr = cubeService.getCubeManager();
String[] changeCubes = cubeNames.toUpperCase(Locale.ROOT).split(",");
for (String cubeName : changeCubes) {
try {
checkCubeExists(cubeName);
final CubeInstance cube = cubeMgr.getCube(cubeName);
CubeInstance cubeInstance = cubeService.changeLookupSnapshotBeGlobal(cube, tableName);
logger.info("cube {} change snapshotTable {} global Success", cubeName, tableName);
result.add(cubeInstance);
} catch (Exception e) {
logger.error("cube {} change snapshotTable {} global Fail", cubeName, tableName);
logger.error(e.getLocalizedMessage(), e);
}
}
return result;
}

/**
* Delete a cube segment
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ public String getREBUILD_SNAPSHOT_OF_VIEW() {
return "Rebuild snapshot of hive view '%s' is not supported, please refresh segment of the cube";
}

public String getSNAPSHOT_GLOBAL() {
return "snapshot for lookup table '%s' is already global";
}

public String getCUBE_HAS_NOT_READY_SEGS() {
return "lookup table '%s' has not ready segment.";
}

// Model
public String getINVALID_MODEL_DEFINITION() {
return "The data model definition is invalid.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,54 @@ public Draft getCubeDraft(String cubeName, String projectName) throws IOExceptio
return null;
}

@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+ " or hasPermission(#project, 'ADMINISTRATION') or hasPermission(#project, 'MANAGEMENT')")
public CubeInstance changeLookupSnapshotBeGlobal(CubeInstance cube, String lookupTable) throws BadRequestException {
aclEvaluate.checkProjectWritePermission(cube.getProject());
Message msg = MsgPicker.getMsg();
CubeDesc cubeDesc = cube.getDescriptor();

TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());

if (tableDesc == null) {
throw new BadRequestException(String.format(Locale.ROOT, msg.getTABLE_DESC_NOT_FOUND(), lookupTable));
}

Set<String> inMemLookupTables = cubeDesc.getInMemLookupTables();
if (!inMemLookupTables.contains(lookupTable)) {
throw new BadRequestException(String.format(Locale.ROOT, msg.getTABLE_DESC_NOT_FOUND(), lookupTable));
}

if (cubeDesc.isGlobalSnapshotTable(lookupTable)) {
throw new BadRequestException(String.format(Locale.ROOT, msg.getSNAPSHOT_GLOBAL(), tableDesc.getName()));
}

try {
RealizationStatusEnum ostatus = cube.getStatus();

if (null == ostatus || !cube.getStatus().equals(RealizationStatusEnum.DISABLED)) {
throw new BadRequestException(
String.format(Locale.ROOT, msg.getENABLE_NOT_DISABLED_CUBE(), cube.getName(), ostatus));
}

int segmentsCount = cube.getSegments().size();
if (segmentsCount == 0) {
// change in persistence
getCubeDescManager().updatelookupTableSnapshotGlobal(cubeDesc, lookupTable, true);
} else if (cube.getSegments(SegmentStatusEnum.READY).size() == segmentsCount) {
// change in persistence
getCubeManager().updateCubeToBeGlobal(cube, lookupTable);
getCubeDescManager().updatelookupTableSnapshotGlobal(cubeDesc, lookupTable, true);
} else {
throw new BadRequestException(
String.format(Locale.ROOT, msg.getCUBE_HAS_NOT_READY_SEGS(), cube.getName()));
}
} catch (IOException e) {
logger.error("Failed to auto update snapshot be global " + cube.getName() + "@" + lookupTable, e);
}
return cube;
}

public List<Draft> listCubeDrafts(String cubeName, String modelName, String project, boolean exactMatch)
throws IOException {
if (null == project) {
Expand Down

0 comments on commit 25b8a88

Please sign in to comment.