Skip to content

Commit

Permalink
Merge branch 'release/5.1' into feature/merge-rel-5.1-develop
Browse files Browse the repository at this point in the history
Manually Resolved conflicts for files
cdap-app-fabric/src/test/java/co/cask/cdap/internal/app/services/http/AppFabricTestBase.java
cdap-app-fabric/src/test/java/co/cask/cdap/internal/app/services/http/handlers/ArtifactHttpHandlerTest.java
pom.xml
  • Loading branch information
rohitsinha54 committed Nov 7, 2018
2 parents 3a94694 + 596d84f commit 4807aa5
Show file tree
Hide file tree
Showing 79 changed files with 2,549 additions and 489 deletions.
67 changes: 43 additions & 24 deletions cdap-api/src/main/java/co/cask/cdap/api/plugin/PluginClass.java
Expand Up @@ -17,6 +17,7 @@
package co.cask.cdap.api.plugin;

import co.cask.cdap.api.annotation.Beta;
import co.cask.cdap.api.annotation.Plugin;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -40,33 +41,21 @@ public class PluginClass {
private final Set<String> endpoints;
private final Requirements requirements;

// for GSON deserialization should not be made public. VisibleForTesting
PluginClass() {
this.type = Plugin.DEFAULT_TYPE;
this.name = null;
this.description = "";
this.className = null;
this.configFieldName = null;
this.properties = Collections.emptyMap();
this.endpoints = Collections.emptySet();
this.requirements = Requirements.EMPTY;
}

public PluginClass(String type, String name, String description, String className,
@Nullable String configfieldName, Map<String, PluginPropertyField> properties,
Set<String> endpoints, Requirements requirements) {
if (type == null) {
throw new IllegalArgumentException("Plugin class type cannot be null");
}
if (name == null) {
throw new IllegalArgumentException("Plugin class name cannot be null");
}
if (description == null) {
throw new IllegalArgumentException("Plugin class description cannot be null");
}
if (className == null) {
throw new IllegalArgumentException("Plugin class className cannot be null");
}
if (properties == null) {
throw new IllegalArgumentException("Plugin class properties cannot be null");
}

if (endpoints == null) {
throw new IllegalArgumentException("Plugin endpoints cannot be null");
}

if (requirements == null) {
throw new IllegalArgumentException("Plugin requirements cannot be null");
}

this.type = type;
this.name = name;
this.description = description;
Expand All @@ -90,6 +79,36 @@ public PluginClass(String type, String name, String description, String classNam
Requirements.EMPTY);
}

/**
* Validates the {@link PluginClass}
*
* @throws IllegalArgumentException if any of the required fields are invalid
*/
public void validate() {
if (name == null) {
throw new IllegalArgumentException("Plugin class name cannot be null.");
}
if (className == null) {
throw new IllegalArgumentException("Plugin class className cannot be null.");
}
if (type == null) {
throw new IllegalArgumentException("Plugin class type cannot be null");
}
if (description == null) {
throw new IllegalArgumentException("Plugin class description cannot be null");
}
if (properties == null) {
throw new IllegalArgumentException("Plugin class properties cannot be null");
}

if (endpoints == null) {
throw new IllegalArgumentException("Plugin class endpoints cannot be null");
}
if (requirements == null) {
throw new IllegalArgumentException("Plugin class requirements cannot be null");
}
}

/**
* Returns the type name of the plugin.
*/
Expand Down
@@ -0,0 +1,58 @@
/*
* Copyright © 2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.cdap.api.plugin;


import com.google.gson.Gson;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;

/**
* Test for {@link PluginClass}
*/
public class PluginClassTest {
private static final Gson GSON = new Gson();

@Test
public void testOldFormat() {
// test that old json representation which does not have a requirements field can be be converted to Plugin class
// by normal GSON without any custom deserializer (CDAP-14515)
PluginClass pluginClass = new PluginClass("sparkprogram", "wordcount", "desc", "className", "config",
Collections.emptyMap(), Collections.emptySet(), null);
PluginClass deserializedPluginClass = GSON.fromJson(GSON.toJson(pluginClass), PluginClass.class);
Assert.assertEquals("wordcount", deserializedPluginClass.getName());
Assert.assertEquals("sparkprogram", deserializedPluginClass.getType());
Assert.assertEquals("desc", deserializedPluginClass.getDescription());
Assert.assertEquals("className", deserializedPluginClass.getClassName());
Assert.assertTrue(deserializedPluginClass.getRequirements().isEmpty());
}

@Test
public void testValidate() {
PluginClass invalidPluginClass = new PluginClass();
String pluginClassJSON = GSON.toJson(invalidPluginClass);
try {
PluginClass pluginClass = GSON.fromJson(pluginClassJSON, PluginClass.class);
pluginClass.validate();
Assert.fail("Should have failed to convert an invalid json");
} catch (IllegalArgumentException e) {
// expected
}
}
}
Expand Up @@ -161,7 +161,7 @@ public void testBadInputInWorkflow() throws Exception {
Assert.fail("Should have thrown Exception because Workflow is configured with schedules having same name.");
} catch (Exception ex) {
Assert.assertEquals("Duplicate schedule name for schedule: 'DailySchedule'",
ex.getCause().getCause().getMessage());
ex.getCause().getMessage());
}

// try deploying app containing a schedule for non existent workflow
Expand Down
Expand Up @@ -35,7 +35,6 @@
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.PluginClassDeserializer;
import co.cask.cdap.common.http.AbstractBodyConsumer;
import co.cask.cdap.common.id.Id;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
Expand Down Expand Up @@ -132,7 +131,6 @@ public class ArtifactHttpHandler extends AbstractHttpHandler {
new TypeToken<List<ArtifactSummaryProperties>>() { }.getType();
private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(Schema.class, new SchemaTypeAdapter())
.registerTypeAdapter(PluginClass.class, new PluginClassDeserializer())
.create();
private static final Type PLUGINS_TYPE = new TypeToken<Set<PluginClass>>() { }.getType();

Expand Down Expand Up @@ -700,10 +698,11 @@ public BodyConsumer addArtifact(HttpRequest request, HttpResponder responder,
} else {
try {
additionalPluginClasses = GSON.fromJson(pluginClasses, PLUGINS_TYPE);
additionalPluginClasses.forEach(PluginClass::validate);
} catch (JsonParseException e) {
responder.sendString(HttpResponseStatus.BAD_REQUEST, String.format(
"%s header '%s' is invalid: %s", PLUGINS_HEADER, pluginClasses, e.getMessage()));
return null;
throw new BadRequestException(String.format("%s header '%s' is invalid.", PLUGINS_HEADER, pluginClasses), e);
} catch (IllegalArgumentException e) {
throw new BadRequestException(String.format("Invalid PluginClasses '%s'.", pluginClasses), e);
}
}

Expand Down
Expand Up @@ -19,10 +19,8 @@
import co.cask.cdap.api.artifact.ArtifactInfo;
import co.cask.cdap.api.artifact.ArtifactScope;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.plugin.PluginClass;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.PluginClassDeserializer;
import co.cask.cdap.common.http.DefaultHttpRequestConfig;
import co.cask.cdap.common.internal.remote.RemoteClient;
import co.cask.cdap.common.io.Locations;
Expand Down Expand Up @@ -55,7 +53,6 @@
public final class RemoteArtifactManager extends AbstractArtifactManager {
private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(Schema.class, new SchemaTypeAdapter())
.registerTypeAdapter(PluginClass.class, new PluginClassDeserializer())
.create();
private static final Type ARTIFACT_INFO_LIST_TYPE = new TypeToken<List<ArtifactInfo>>() { }.getType();
private final LocationFactory locationFactory;
Expand Down
Expand Up @@ -23,7 +23,6 @@
import co.cask.cdap.api.plugin.PluginSelector;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.PluginClassDeserializer;
import co.cask.cdap.common.http.DefaultHttpRequestConfig;
import co.cask.cdap.common.internal.remote.RemoteClient;
import co.cask.cdap.common.io.Locations;
Expand All @@ -41,7 +40,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -62,9 +60,7 @@
*/
public class RemotePluginFinder implements PluginFinder {

private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(PluginClass.class, new PluginClassDeserializer())
.create();
private static final Gson GSON = new Gson();
private static final Type PLUGIN_INFO_LIST_TYPE = new TypeToken<List<PluginInfo>>() { }.getType();

private final RemoteClient remoteClient;
Expand Down
Expand Up @@ -87,7 +87,7 @@ Configuration getHConf() {
}

/**
* Updates the {@link Configuration} of this class with the given paramters.
* Updates the {@link Configuration} of this class with the given parameters.
*
* @param context the context for the MapReduce program
* @param conf the CDAP configuration
Expand Down
Expand Up @@ -23,7 +23,6 @@
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduce;
Expand All @@ -50,10 +49,8 @@
import co.cask.cdap.internal.app.runtime.LocalizationUtils;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.batch.dataset.UnsupportedOutputFormat;
import co.cask.cdap.internal.app.runtime.batch.dataset.input.MapperInput;
import co.cask.cdap.internal.app.runtime.batch.dataset.input.MultipleInputs;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputs;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsMainOutputWrapper;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.ProvidedOutput;
import co.cask.cdap.internal.app.runtime.batch.distributed.ContainerLauncherGenerator;
Expand Down Expand Up @@ -121,7 +118,6 @@
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -747,35 +743,8 @@ private void setOutputsIfNeeded(Job job) throws ClassNotFoundException {
List<ProvidedOutput> outputsMap = context.getOutputs();
fixOutputPermissions(job, outputsMap);
LOG.debug("Using as output for MapReduce Job: {}", outputsMap);
OutputFormatProvider rootOutputFormatProvider;
if (outputsMap.isEmpty()) {
// user is not going through our APIs to add output; propagate the job's output format
rootOutputFormatProvider =
new BasicOutputFormatProvider(job.getOutputFormatClass().getName(), Collections.<String, String>emptyMap());
} else if (outputsMap.size() == 1) {
// If only one output is configured through the context, then set it as the root OutputFormat
rootOutputFormatProvider = outputsMap.get(0).getOutputFormatProvider();
} else {
// multiple output formats configured via the context. We should use a RecordWriter that doesn't support writing
// as the root output format in this case to disallow writing directly on the context.
// the OutputCommitter is effectively a no-op, as it runs as the RootOutputCommitter in MultipleOutputsCommitter
rootOutputFormatProvider =
new BasicOutputFormatProvider(UnsupportedOutputFormat.class.getName(), Collections.<String, String>emptyMap());
}

MultipleOutputsMainOutputWrapper.setRootOutputFormat(job,
rootOutputFormatProvider.getOutputFormatClassName(),
rootOutputFormatProvider.getOutputFormatConfiguration());
MultipleOutputsMainOutputWrapper.setOutputs(job, outputsMap);
job.setOutputFormatClass(MultipleOutputsMainOutputWrapper.class);

for (ProvidedOutput output : outputsMap) {
String outputName = output.getOutput().getAlias();
String outputFormatClassName = output.getOutputFormatClassName();
Map<String, String> outputConfig = output.getOutputFormatConfiguration();
MultipleOutputs.addNamedOutput(job, outputName, outputFormatClassName,
job.getOutputKeyClass(), job.getOutputValueClass(), outputConfig);

}
}

private void fixOutputPermissions(JobContext job, List<ProvidedOutput> outputs) {
Expand Down

0 comments on commit 4807aa5

Please sign in to comment.