Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Workflow/Task Summary Input/Output Json Serialization (#2128)
Browse files Browse the repository at this point in the history
* Added configuration for SummaryInputOutputJSONSerialization and updated Workflow/TaskSummary classes and the server's ModulesProvider to leverage the configuration, which allows the summary input/output strings to be serialized as JSON strings as opposed to the default configuration of using Java Map toString()

* Updated Task/Workflow Summary Input/Output Json String configuration management and added in tests for the additional features as well as a regression test for expected default behavior

* Minor whitespace fix in WorkflowSummary

* Another tiny whitespace tweak for WorkflowSummary

* Minor whitespace changes to SummaryUtil
  • Loading branch information
dsmith-globys committed Mar 16, 2021
1 parent a88e274 commit ac987e0
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.utils.SummaryUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Objects;
Expand Down Expand Up @@ -94,17 +96,17 @@ public class TaskSummary {
@ProtoField(id = 19)
private int workflowPriority;

public TaskSummary() {
}
public TaskSummary() {
}

public TaskSummary(Task task) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdf.setTimeZone(gmt);

this.taskId = task.getTaskId();
this.taskDefName = task.getTaskDefName();
this.taskType = task.getTaskType();
this.taskId = task.getTaskId();
this.taskDefName = task.getTaskDefName();
this.taskType = task.getTaskType();
this.workflowId = task.getWorkflowInstanceId();
this.workflowType = task.getWorkflowType();
this.workflowPriority = task.getWorkflowPriority();
Expand All @@ -117,11 +119,11 @@ public TaskSummary(Task task) {
this.reasonForIncompletion = task.getReasonForIncompletion();
this.queueWaitTime = task.getQueueWaitTime();
if (task.getInputData() != null) {
this.input = task.getInputData().toString();
this.input = SummaryUtil.serializeInputOutput(task.getInputData());
}

if (task.getOutputData() != null) {
this.output = task.getOutputData().toString();
this.output = SummaryUtil.serializeInputOutput(task.getOutputData());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.common.utils.SummaryUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Objects;
Expand Down Expand Up @@ -90,8 +92,8 @@ public class WorkflowSummary {
private int priority;

public WorkflowSummary() {

}

public WorkflowSummary(Workflow workflow) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
Expand All @@ -113,11 +115,11 @@ public WorkflowSummary(Workflow workflow) {
}
this.status = workflow.getStatus();
if(workflow.getInput() != null){
this.input = workflow.getInput().toString();
this.input = SummaryUtil.serializeInputOutput(workflow.getInput());
}
if(workflow.getOutput() != null){
this.output = SummaryUtil.serializeInputOutput(workflow.getOutput());
}
if(workflow.getOutput() != null){
this.output = workflow.getOutput().toString();
}
this.reasonForIncompletion = workflow.getReasonForIncompletion();
if(workflow.getEndTime() > 0){
this.executionTime = workflow.getEndTime() - workflow.getStartTime();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.common.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SummaryUtil {
private static final Logger logger = LoggerFactory.getLogger(SummaryUtil.class);
private static final ObjectMapper objectMapper = new JsonMapperProvider().get();
private static boolean isSummaryInputOutputJsonSerializationEnabled = false;

/**
* Serializes the Workflow or Task's Input/Output object by Java's toString (default), or by
* a Json ObjectMapper (@see Configuration.isSummaryInputOutputJsonSerializationEnabled)
* @param object the Input or Output Object to serialize
* @return the serialized string of the Input or Output object
*/
public static String serializeInputOutput(Map<String, Object> object) {
if (isSummaryInputOutputJsonSerializationEnabled == false) {
return object.toString();
}

try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
logger.error("The provided value ({}) could not be serialized as Json", object.toString(), e);
throw new RuntimeException(e);
}
}

/**
* @param isEnabled (default) false for Java toString, true for Json serialized string
*/
public static void setSummaryInputOutputJsonSerializationEnabled(boolean isEnabled) {
isSummaryInputOutputJsonSerializationEnabled = isEnabled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2021 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.common.utils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SummaryUtilTest {
private Map<String, Object> testObject;

public SummaryUtilTest() {
Map<String, Object> child = new HashMap<String, Object>();
child.put("testStr", "childTestStr");

Map<String, Object> obj = new HashMap<String, Object>();
obj.put("testStr", "stringValue");
obj.put("testArray", new ArrayList<Integer>(Arrays.asList(1,2,3)));
obj.put("testObj", child);
obj.put("testNull", null);

this.testObject = obj;
}

@Test
public void testSerializeInputOutput_defaultToString() throws Exception {
SummaryUtil.setSummaryInputOutputJsonSerializationEnabled(false);
String serialized = SummaryUtil.serializeInputOutput(this.testObject);

assertEquals("The Java.toString() Serialization should match the serialized Test Object",
this.testObject.toString(), serialized);
}

@Test
public void testSerializeInputOutput_jsonSerializationEnabled() throws Exception {
ObjectMapper objectMapper = new JsonMapperProvider().get();
SummaryUtil.setSummaryInputOutputJsonSerializationEnabled(true);
String serialized = SummaryUtil.serializeInputOutput(this.testObject);

assertEquals("The ObjectMapper Json Serialization should match the serialized Test Object",
objectMapper.writeValueAsString(this.testObject), serialized);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public interface Configuration {
String WORKFLOW_ARCHIVAL_DELAY_QUEUE_WORKER_THREAD_COUNT_PROPERTY_NAME = "workflow.archival.delay.queue.worker.thread.count";
int WORKFLOW_ARCHIVAL_DELAY_QUEUE_WORKER_THREAD_COUNT_DEFAULT_VALUE = 5;

String SUMMARY_INPUT_OUTPUT_JSON_SERIALIZATION_ENABLED_PROPERTY_NAME = "summary.input.output.json.serialization.enabled";
boolean SUMMARY_INPUT_OUTPUT_JSON_SERIALIZATION_ENABLED_DEFAULT_VALUE = false;

String OWNER_EMAIL_MANDATORY_NAME = "workflow.owner.email.mandatory";
boolean OWNER_EMAIL_MANDATORY_DEFAULT_VALUE = true;

Expand Down Expand Up @@ -369,6 +372,14 @@ default int getWorkflowArchivalDelayQueueWorkerThreadCount() {
return getIntProperty(WORKFLOW_ARCHIVAL_DELAY_QUEUE_WORKER_THREAD_COUNT_PROPERTY_NAME, WORKFLOW_ARCHIVAL_DELAY_QUEUE_WORKER_THREAD_COUNT_DEFAULT_VALUE);
}

/**
* By default, this value is false, meaning that Java's default toString() method is used.
* @return if true, Workflow/Task Summary Input and Output are serialized as Json strings.
*/
default boolean isSummaryInputOutputJsonSerializationEnabled()
{
return getBooleanProperty(SUMMARY_INPUT_OUTPUT_JSON_SERIALIZATION_ENABLED_PROPERTY_NAME, SUMMARY_INPUT_OUTPUT_JSON_SERIALIZATION_ENABLED_DEFAULT_VALUE);
}

/**
* @return the number of threads to be use in Scheduler used for polling events from multiple event queues.
Expand Down
3 changes: 3 additions & 0 deletions docker/server/config/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ workflow.elasticsearch.index.name=conductor
# conductor.additional.modules=com.netflix.conductor.contribs.metrics.MetricsRegistryModule,com.netflix.conductor.contribs.metrics.LoggingMetricsModule
# com.netflix.conductor.contribs.metrics.LoggingMetricsModule.reportPeriodSeconds=15

# To enable Workflow/Task Summary Input/Output JSON Serialization, use the following:
# summary.input.output.json.serialization.enabled=true

# Load sample kitchen sink workflow
loadSample=true
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.netflix.conductor.bootstrap;

import com.netflix.conductor.common.utils.SummaryUtil;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch;
import com.netflix.conductor.grpc.server.GRPCServer;
Expand Down Expand Up @@ -54,6 +56,13 @@ public static void setupIndex(IndexDAO indexDAO) {
}
}

public static void configureSummaryUtil(Configuration configuration) {
if (configuration.isSummaryInputOutputJsonSerializationEnabled() == true) {
System.out.println("Enabling Summary Input/Output Json Serialization based on configuration");
SummaryUtil.setSummaryInputOutputJsonSerializationEnabled(true);
}
}

static void startGRPCServer(GRPCServer grpcServer) {
try {
grpcServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.elasticsearch.EmbeddedElasticSearch;
import com.netflix.conductor.elasticsearch.EmbeddedElasticSearchProvider;
Expand Down Expand Up @@ -56,6 +57,7 @@ public static void main(String[] args) throws Exception {
System.exit(3);
}

BootstrapUtil.configureSummaryUtil(serverInjector.getInstance(Configuration.class));

System.out.println("\n\n\n");
System.out.println(" _ _ ");
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/resources/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ workflow.decider.locking.leaseTimeInSeconds=60
# Setting is ignored if the value is lower or equals to 0
# workflow.event.queues.amqp.maxPriority=-1

# To enable Workflow/Task Summary Input/Output JSON Serialization, use the following:
# summary.input.output.json.serialization.enabled=true

0 comments on commit ac987e0

Please sign in to comment.