Skip to content

Commit

Permalink
Zookeeper put api (#5949)
Browse files Browse the repository at this point in the history
* Adding api to edit ZK path

* Adding delete api

* Addressing comments
  • Loading branch information
kishoreg committed Sep 13, 2020
1 parent cadd61c commit 83598ce
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Charsets;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
Expand All @@ -28,13 +29,17 @@
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -75,6 +80,63 @@ public String getData(
return null;
}

@DELETE
@Path("/zk/delete")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Delete the znode at this path")
@ApiResponses(value = { //
@ApiResponse(code = 200, message = "Success"), //
@ApiResponse(code = 404, message = "ZK Path not found"), //
@ApiResponse(code = 204, message = "No Content"), //
@ApiResponse(code = 500, message = "Internal server error")})
public SuccessResponse delete(
@ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {

path = validateAndNormalizeZKPath(path);

boolean success = pinotHelixResourceManager.deleteZKPath(path);
if (success) {
return new SuccessResponse("Successfully deleted path: " + path);
} else {
throw new ControllerApplicationException(LOGGER, "Failed to delete path: " + path,
Response.Status.INTERNAL_SERVER_ERROR);
}
}

@PUT
@Path("/zk/put")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Update the content of the node")
@ApiResponses(value = { //
@ApiResponse(code = 200, message = "Success"), //
@ApiResponse(code = 404, message = "ZK Path not found"), //
@ApiResponse(code = 204, message = "No Content"), //
@ApiResponse(code = 500, message = "Internal server error")})
public SuccessResponse putData(
@ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path,
@ApiParam(value = "Content", required = true) @QueryParam("data") @DefaultValue("") String content,
@ApiParam(value = "expectedVersion", required = true, defaultValue = "-1") @QueryParam("expectedVersion") @DefaultValue("-1") String expectedVersion,
@ApiParam(value = "accessOption", required = true, defaultValue = "1") @QueryParam("accessOption") @DefaultValue("1") String accessOption) {
path = validateAndNormalizeZKPath(path);
ZNRecord record = null;
if (content != null) {
record = (ZNRecord) _znRecordSerializer.deserialize(content.getBytes(Charsets.UTF_8));
}
try {
boolean result = pinotHelixResourceManager
.setZKData(path, record, Integer.parseInt(expectedVersion), Integer.parseInt(accessOption));
if (result) {
return new SuccessResponse("Successfully Updated path: " + path);
} else {
throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
Response.Status.INTERNAL_SERVER_ERROR);
}
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@GET
@Path("/zk/ls")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Expand Up @@ -1226,6 +1226,14 @@ void validateTableTenantConfig(TableConfig tableConfig) {
}
}

public boolean setZKData(String path, ZNRecord record, int expectedVersion, int accessOption) {
return _helixDataAccessor.getBaseDataAccessor().set(path, record, expectedVersion, accessOption);
}

public boolean deleteZKPath(String path) {
return _helixDataAccessor.getBaseDataAccessor().remove(path, -1);
}

public ZNRecord readZKData(String path) {
return _helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
}
Expand Down Expand Up @@ -1814,7 +1822,8 @@ private void sendRoutingTableRebuildMessage(String tableNameWithType) {
_helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType);
LOGGER
.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType);
} else {
LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", tableNameWithType);
}
Expand Down Expand Up @@ -2474,9 +2483,9 @@ private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> s
}
Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
} while (System.currentTimeMillis() < endTimeMs);
throw new TimeoutException(String.format(
"Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
tableNameWithType, segmentsToCheck));
throw new TimeoutException(String
.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
tableNameWithType, segmentsToCheck));
}

private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
Expand Down

0 comments on commit 83598ce

Please sign in to comment.