Skip to content

Commit

Permalink
Add unrecognizedProperties to schema and tableConfigs APIs (#8606)
Browse files Browse the repository at this point in the history
This PR is a follow up of #8514. It adds the "unrecognizedProperties" fields to /schemas and /tableConfigs APIs
  • Loading branch information
saurabhd336 authored May 3, 2022
1 parent d49d117 commit 4e14101
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
Expand All @@ -44,6 +46,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
Expand Down Expand Up @@ -150,11 +153,15 @@ public SuccessResponse deleteSchema(
@ApiResponse(code = 400, message = "Missing or invalid request body"),
@ApiResponse(code = 500, message = "Internal error")
})
public SuccessResponse updateSchema(
public ConfigSuccessResponse updateSchema(
@ApiParam(value = "Name of the schema", required = true) @PathParam("schemaName") String schemaName,
@ApiParam(value = "Whether to reload the table if the new schema is backward compatible") @DefaultValue("false")
@QueryParam("reload") boolean reload, FormDataMultiPart multiPart) {
return updateSchema(schemaName, getSchemaFromMultiPart(multiPart), reload);
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart);
Schema schema = schemaAndUnrecognizedProps.getLeft();
SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight());
}

@PUT
Expand All @@ -169,11 +176,20 @@ public SuccessResponse updateSchema(
@ApiResponse(code = 400, message = "Missing or invalid request body"),
@ApiResponse(code = 500, message = "Internal error")
})
public SuccessResponse updateSchema(
public ConfigSuccessResponse updateSchema(
@ApiParam(value = "Name of the schema", required = true) @PathParam("schemaName") String schemaName,
@ApiParam(value = "Whether to reload the table if the new schema is backward compatible") @DefaultValue("false")
@QueryParam("reload") boolean reload, Schema schema) {
return updateSchema(schemaName, schema, reload);
@QueryParam("reload") boolean reload, String schemaJsonString) {
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps = null;
try {
schemaAndUnrecognizedProps = JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString, Schema.class);
} catch (Exception e) {
String msg = String.format("Invalid schema config json string: %s", schemaJsonString);
throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST, e);
}
Schema schema = schemaAndUnrecognizedProps.getLeft();
SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight());
}

@POST
Expand All @@ -186,16 +202,19 @@ public SuccessResponse updateSchema(
@ApiResponse(code = 400, message = "Missing or invalid request body"),
@ApiResponse(code = 500, message = "Internal error")
})
public SuccessResponse addSchema(
public ConfigSuccessResponse addSchema(
@ApiParam(value = "Whether to override the schema if the schema exists") @DefaultValue("true")
@QueryParam("override") boolean override, FormDataMultiPart multiPart, @Context HttpHeaders httpHeaders,
@Context Request request) {
Schema schema = getSchemaFromMultiPart(multiPart);
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart);
Schema schema = schemaAndUnrecognizedProps.getLeft();
String endpointUrl = request.getRequestURL().toString();
validateSchemaName(schema.getSchemaName());
_accessControlUtils.validatePermission(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl,
_accessControlFactory.create());
return addSchema(schema, override);
SuccessResponse successResponse = addSchema(schema, override);
return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight());
}

@POST
Expand All @@ -209,15 +228,25 @@ public SuccessResponse addSchema(
@ApiResponse(code = 400, message = "Missing or invalid request body"),
@ApiResponse(code = 500, message = "Internal error")
})
public SuccessResponse addSchema(
public ConfigSuccessResponse addSchema(
@ApiParam(value = "Whether to override the schema if the schema exists") @DefaultValue("true")
@QueryParam("override") boolean override, Schema schema, @Context HttpHeaders httpHeaders,
@QueryParam("override") boolean override, String schemaJsonString, @Context HttpHeaders httpHeaders,
@Context Request request) {
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProperties = null;
try {
schemaAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString, Schema.class);
} catch (Exception e) {
String msg = String.format("Invalid schema config json string: %s", schemaJsonString);
throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST, e);
}
Schema schema = schemaAndUnrecognizedProperties.getLeft();
String endpointUrl = request.getRequestURL().toString();
validateSchemaName(schema.getSchemaName());
_accessControlUtils.validatePermission(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl,
_accessControlFactory.create());
return addSchema(schema, override);
SuccessResponse successResponse = addSchema(schema, override);
return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProperties.getRight());
}

@POST
Expand All @@ -231,9 +260,17 @@ public SuccessResponse addSchema(
@ApiResponse(code = 500, message = "Internal error")
})
public String validateSchema(FormDataMultiPart multiPart) {
Schema schema = getSchemaFromMultiPart(multiPart);
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart);
Schema schema = schemaAndUnrecognizedProps.getLeft();
validateSchemaInternal(schema);
return schema.toPrettyJsonString();
ObjectNode response = schema.toJsonObject();
response.set("unrecognizedProperties", JsonUtils.objectToJsonNode(schemaAndUnrecognizedProps.getRight()));
try {
return JsonUtils.objectToPrettyString(response);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@POST
Expand All @@ -247,9 +284,23 @@ public String validateSchema(FormDataMultiPart multiPart) {
@ApiResponse(code = 400, message = "Missing or invalid request body"),
@ApiResponse(code = 500, message = "Internal error")
})
public String validateSchema(Schema schema) {
public String validateSchema(String schemaJsonString) {
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps = null;
try {
schemaAndUnrecognizedProps = JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString, Schema.class);
} catch (Exception e) {
String msg = String.format("Invalid schema config json string: %s", schemaJsonString);
throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST, e);
}
Schema schema = schemaAndUnrecognizedProps.getLeft();
validateSchemaInternal(schema);
return schema.toPrettyJsonString();
ObjectNode response = schema.toJsonObject();
response.set("unrecognizedProperties", JsonUtils.objectToJsonNode(schemaAndUnrecognizedProps.getRight()));
try {
return JsonUtils.objectToPrettyString(response);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private void validateSchemaName(String schemaName) {
Expand Down Expand Up @@ -342,7 +393,8 @@ private SuccessResponse updateSchema(String schemaName, Schema schema, boolean r
}
}

private Schema getSchemaFromMultiPart(FormDataMultiPart multiPart) {
private Pair<Schema, Map<String, Object>> getSchemaAndUnrecognizedPropertiesFromMultiPart(
FormDataMultiPart multiPart) {
try {
Map<String, List<FormDataBodyPart>> map = multiPart.getFields();
if (!PinotSegmentUploadDownloadRestletResource.validateMultiPart(map, null)) {
Expand All @@ -351,7 +403,7 @@ private Schema getSchemaFromMultiPart(FormDataMultiPart multiPart) {
}
FormDataBodyPart bodyPart = map.values().iterator().next().get(0);
try (InputStream inputStream = bodyPart.getValueAs(InputStream.class)) {
return Schema.fromInputSteam(inputStream);
return Schema.parseSchemaAndUnrecognizedPropsfromInputStream(inputStream);
} catch (IOException e) {
throw new ControllerApplicationException(LOGGER,
"Caught exception while de-serializing the schema from request body: " + e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
Expand All @@ -41,6 +43,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
Expand Down Expand Up @@ -149,14 +152,17 @@ public String getConfig(
@Path("/tableConfigs")
@ApiOperation(value = "Add the TableConfigs using the tableConfigsStr json",
notes = "Add the TableConfigs using the tableConfigsStr json")
public SuccessResponse addConfig(
public ConfigSuccessResponse addConfig(
String tableConfigsStr,
@ApiParam(value = "comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)")
@QueryParam("validationTypesToSkip") @Nullable String typesToSkip, @Context HttpHeaders httpHeaders,
@Context Request request) {
Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps;
TableConfigs tableConfigs;
try {
tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class);
tableConfigsAndUnrecognizedProps =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigsStr, TableConfigs.class);
tableConfigs = tableConfigsAndUnrecognizedProps.getLeft();
validateConfig(tableConfigs, typesToSkip);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, String.format("Invalid TableConfigs. %s", e.getMessage()),
Expand Down Expand Up @@ -214,7 +220,8 @@ public SuccessResponse addConfig(
throw e;
}

return new SuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added");
return new ConfigSuccessResponse("TableConfigs " + tableConfigs.getTableName() + " successfully added",
tableConfigsAndUnrecognizedProps.getRight());
} catch (Exception e) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_ADD_ERROR, 1L);
if (e instanceof InvalidTableConfigException) {
Expand Down Expand Up @@ -280,17 +287,20 @@ public SuccessResponse deleteConfig(
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Update the TableConfigs provided by the tableConfigsStr json",
notes = "Update the TableConfigs provided by the tableConfigsStr json")
public SuccessResponse updateConfig(
public ConfigSuccessResponse updateConfig(
@ApiParam(value = "TableConfigs name i.e. raw table name", required = true) @PathParam("tableName")
String tableName,
@ApiParam(value = "comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)")
@QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
@ApiParam(value = "Reload the table if the new schema is backward compatible") @DefaultValue("false")
@QueryParam("reload") boolean reload, String tableConfigsStr)
throws Exception {
Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps;
TableConfigs tableConfigs;
try {
tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class);
tableConfigsAndUnrecognizedProps =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigsStr, TableConfigs.class);
tableConfigs = tableConfigsAndUnrecognizedProps.getLeft();
Preconditions.checkState(tableConfigs.getTableName().equals(tableName),
"'tableName' in TableConfigs: %s must match provided tableName: %s", tableConfigs.getTableName(), tableName);

Expand Down Expand Up @@ -346,7 +356,8 @@ public SuccessResponse updateConfig(
Response.Status.INTERNAL_SERVER_ERROR, e);
}

return new SuccessResponse("TableConfigs updated for " + tableName);
return new ConfigSuccessResponse("TableConfigs updated for " + tableName,
tableConfigsAndUnrecognizedProps.getRight());
}

/**
Expand All @@ -360,14 +371,20 @@ public SuccessResponse updateConfig(
public String validateConfig(String tableConfigsStr,
@ApiParam(value = "comma separated list of validation type(s) to skip. supported types: (ALL|TASK|UPSERT)")
@QueryParam("validationTypesToSkip") @Nullable String typesToSkip) {
Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps;
TableConfigs tableConfigs;
try {
tableConfigs = JsonUtils.stringToObject(tableConfigsStr, TableConfigs.class);
tableConfigsAndUnrecognizedProps =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigsStr, TableConfigs.class);
tableConfigs = tableConfigsAndUnrecognizedProps.getLeft();
} catch (IOException e) {
throw new ControllerApplicationException(LOGGER,
String.format("Invalid TableConfigs json string: %s", tableConfigsStr), Response.Status.BAD_REQUEST, e);
}
return validateConfig(tableConfigs, typesToSkip);
TableConfigs validatedTableConfigs = validateConfig(tableConfigs, typesToSkip);
ObjectNode response = JsonUtils.objectToJsonNode(validatedTableConfigs).deepCopy();
response.set("unrecognizedProperties", JsonUtils.objectToJsonNode(tableConfigsAndUnrecognizedProps.getRight()));
return response.toString();
}

private void tuneConfig(TableConfig tableConfig, Schema schema) {
Expand All @@ -376,7 +393,7 @@ private void tuneConfig(TableConfig tableConfig, Schema schema) {
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig, _controllerConf.getDimTableMaxSize());
}

private String validateConfig(TableConfigs tableConfigs, @Nullable String typesToSkip) {
private TableConfigs validateConfig(TableConfigs tableConfigs, @Nullable String typesToSkip) {
String rawTableName = tableConfigs.getTableName();
TableConfig offlineTableConfig = tableConfigs.getOffline();
TableConfig realtimeTableConfig = tableConfigs.getRealtime();
Expand Down Expand Up @@ -410,7 +427,7 @@ private String validateConfig(TableConfigs tableConfigs, @Nullable String typesT
TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig);
}

return tableConfigs.toJsonString();
return tableConfigs;
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Invalid TableConfigs: %s. %s", rawTableName, e.getMessage()), Response.Status.BAD_REQUEST, e);
Expand Down
Loading

0 comments on commit 4e14101

Please sign in to comment.