Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #905 from ZupIT/wf-listener-publish-implementation
Browse files Browse the repository at this point in the history
add customized WorkflowStatusListener
  • Loading branch information
kishorebanala committed Mar 8, 2019
2 parents c3e1ff3 + 4f007b9 commit 6b0647c
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netflix.conductor.contribs;

import com.google.inject.AbstractModule;
import com.netflix.conductor.contribs.listener.DynoQueueStatusPublisher;
import com.netflix.conductor.core.execution.WorkflowStatusListener;

public class DynoQueueStatusPublisherModule extends AbstractModule {
@Override
protected void configure() {
bind(WorkflowStatusListener.class).to(DynoQueueStatusPublisher.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.contribs.listener;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.WorkflowStatusListener;
import com.netflix.conductor.dao.QueueDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.Collections;

/**
* Publishes a @see Message containing a @see WorkflowSummary to a DynoQueue on a workflow completion or termination event.
*/
public class DynoQueueStatusPublisher implements WorkflowStatusListener {

private static final Logger LOGGER = LoggerFactory.getLogger(DynoQueueStatusPublisher.class);
private final QueueDAO queueDAO;
private final ObjectMapper objectMapper;
private final Configuration config;
private final String successStatusQueue;
private final String failureStatusQueue;

@Inject
public DynoQueueStatusPublisher(QueueDAO queueDAO, ObjectMapper objectMapper, Configuration config) {
this.queueDAO = queueDAO;
this.objectMapper = objectMapper;
this.config = config;
this.successStatusQueue = config.getProperty("workflowstatuslistener.publisher.success.queue", "_callbackSuccessQueue");
this.failureStatusQueue = config.getProperty("workflowstatuslistener.publisher.failure.queue", "_callbackFailureQueue");
}

@Override
public void onWorkflowCompleted(Workflow workflow) {
LOGGER.info("Publishing callback of workflow {} on completion ", workflow.getWorkflowId());
queueDAO.push(successStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}

@Override
public void onWorkflowTerminated(Workflow workflow) {
LOGGER.info("Publishing callback of workflow {} on termination", workflow.getWorkflowId());
queueDAO.push(failureStatusQueue, Collections.singletonList(workflowToMessage(workflow)));
}

private Message workflowToMessage(Workflow workflow) {
String jsonWfSummary;
WorkflowSummary summary = new WorkflowSummary(workflow);
try {
jsonWfSummary = objectMapper.writeValueAsString(summary);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to convert WorkflowSummary: {} to String. Exception: {}", summary, e);
throw new RuntimeException(e);
}
return new Message(workflow.getWorkflowId(), jsonWfSummary, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.matcher.Matchers;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
Expand All @@ -34,9 +35,12 @@
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -56,10 +60,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.inject.AbstractModule;
import com.google.inject.ProvisionException;
import com.google.inject.util.Modules;
import com.netflix.conductor.cassandra.CassandraModule;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.contribs.http.HttpTask;
Expand All @@ -14,13 +15,7 @@
import com.netflix.conductor.dao.RedisWorkflowModule;
import com.netflix.conductor.elasticsearch.ElasticSearchModule;
import com.netflix.conductor.mysql.MySQLWorkflowModule;
import com.netflix.conductor.server.DynomiteClusterModule;
import com.netflix.conductor.server.JerseyModule;
import com.netflix.conductor.server.LocalRedisModule;
import com.netflix.conductor.server.RedisClusterModule;
import com.netflix.conductor.server.RedisSentinelModule;
import com.netflix.conductor.server.ServerModule;
import com.netflix.conductor.server.SwaggerModule;
import com.netflix.conductor.server.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +25,8 @@
import java.util.Arrays;
import java.util.List;

import static java.util.Collections.singletonList;

// TODO Investigate whether this should really be a ThrowingProvider.
public class ModulesProvider implements Provider<List<AbstractModule>> {
private static final Logger logger = LoggerFactory.getLogger(ModulesProvider.class);
Expand All @@ -47,12 +44,8 @@ public ModulesProvider(Configuration configuration) {

@Override
public List<AbstractModule> get() {
List<AbstractModule> modulesToLoad = new ArrayList<>();

modulesToLoad.addAll(selectModulesToLoad());
modulesToLoad.addAll(configuration.getAdditionalModules());

return modulesToLoad;
AbstractModule resolvedModule = (AbstractModule) Modules.override(selectModulesToLoad()).with(configuration.getAdditionalModules());
return singletonList(resolvedModule);
}

private List<AbstractModule> selectModulesToLoad() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.tests.listener;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
import com.netflix.conductor.contribs.DynoQueueStatusPublisherModule;
import com.netflix.conductor.tests.utils.TestModule;
import org.junit.runners.BlockJUnit4ClassRunner;

public class StatusPublisherTestRunner extends BlockJUnit4ClassRunner {

private Injector dependenciesInjector;

static {
System.setProperty("EC2_REGION", "us-east-1");
System.setProperty("EC2_AVAILABILITY_ZONE", "us-east-1c");
}


public StatusPublisherTestRunner(Class<?> klass) throws Exception {
super(klass);
System.setProperty("workflow.namespace.prefix", "conductor" + System.getProperty("user.name"));
dependenciesInjector = Guice.createInjector(Modules.override(new TestModule()).with(new DynoQueueStatusPublisherModule()));
}

@Override
protected Object createTest() throws Exception {
Object test = super.createTest();
dependenciesInjector.injectMembers(test);
return test;
}


}

0 comments on commit 6b0647c

Please sign in to comment.