Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper put api #5949

Merged
merged 3 commits into from Sep 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Charsets;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
Expand All @@ -28,13 +29,17 @@
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -75,6 +80,63 @@ public String getData(
return null;
}

@DELETE
@Path("/zk/delete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delete api seems a bit dangerous. Should we protect it to only allow deleting certain paths? Seems this api will allow deleting the entire cluster?

@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Get content of the znode")
kishoreg marked this conversation as resolved.
Show resolved Hide resolved
@ApiResponses(value = { //
kishoreg marked this conversation as resolved.
Show resolved Hide resolved
@ApiResponse(code = 200, message = "Success"), //
@ApiResponse(code = 404, message = "ZK Path not found"), //
@ApiResponse(code = 204, message = "No Content"), //
@ApiResponse(code = 500, message = "Internal server error")})
public SuccessResponse delete(
@ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {

path = validateAndNormalizeZKPath(path);

boolean success = pinotHelixResourceManager.deleteZKPath(path);
if(success) {
return new SuccessResponse("Successfully deleted path: " + path);
} else {
throw new ControllerApplicationException(LOGGER, "Failed to delete path: " + path,
Response.Status.INTERNAL_SERVER_ERROR);
}
}

@PUT
@Path("/zk/put")
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Get content of the znode")
@ApiResponses(value = { //
@ApiResponse(code = 200, message = "Success"), //
@ApiResponse(code = 404, message = "ZK Path not found"), //
@ApiResponse(code = 204, message = "No Content"), //
@ApiResponse(code = 500, message = "Internal server error")})
public SuccessResponse putData(
@ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path,
@ApiParam(value = "Content", required = true) @QueryParam("data") @DefaultValue("") String content,
@ApiParam(value = "expectedVersion", required = true, defaultValue = "-1") @QueryParam("expectedVersion") @DefaultValue("-1") String expectedVersion,
@ApiParam(value = "accessOption", required = true, defaultValue = "1") @QueryParam("accessOption") @DefaultValue("1") String accessOption) {
path = validateAndNormalizeZKPath(path);
ZNRecord record = null;
if (content != null) {
record = (ZNRecord) _znRecordSerializer.deserialize(content.getBytes(Charsets.UTF_8));
}
try {
boolean result = pinotHelixResourceManager
kishoreg marked this conversation as resolved.
Show resolved Hide resolved
.setZKData(path, record, Integer.parseInt(expectedVersion), Integer.parseInt(accessOption));
if (result) {
return new SuccessResponse("Successfully Updated path: " + path);
} else {
throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
Response.Status.INTERNAL_SERVER_ERROR);
}
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

@GET
@Path("/zk/ls")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Expand Up @@ -1191,6 +1191,14 @@ void validateTableTenantConfig(TableConfig tableConfig) {
}
}

public boolean setZKData(String path, ZNRecord record, int expectedVersion, int accessOption) {
return _helixDataAccessor.getBaseDataAccessor().set(path, record, expectedVersion, accessOption);
}

public boolean deleteZKPath(String path) {
return _helixDataAccessor.getBaseDataAccessor().remove(path, -1);
}

public ZNRecord readZKData(String path) {
return _helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
}
Expand Down Expand Up @@ -1779,7 +1787,8 @@ private void sendRoutingTableRebuildMessage(String tableNameWithType) {
_helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType);
LOGGER
.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType);
} else {
LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", tableNameWithType);
}
Expand Down Expand Up @@ -2418,9 +2427,9 @@ private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> s
}
Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
} while (System.currentTimeMillis() < endTimeMs);
throw new TimeoutException(String.format(
"Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
tableNameWithType, segmentsToCheck));
throw new TimeoutException(String
.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
tableNameWithType, segmentsToCheck));
}

private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
Expand Down