Skip to content

Commit

Permalink
Added support for StopSourceCallback. Added multiple threading models…
Browse files Browse the repository at this point in the history
… for \@source.
  • Loading branch information
3miliano committed Mar 22, 2012
1 parent 52b3b72 commit f299a26
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,18 +469,34 @@ protected DefinedClass getInterceptingMessageProcessorClass(ExecutableElement ex
}


protected DefinedClass getMessageSourceClass(ExecutableElement executableElement) {
protected DefinedClass getMessageSourceClass(ExecutableElement executableElement, boolean runnable) {
String beanDefinitionParserName = context.getNameUtils().generateClassName(executableElement, NamingContants.MESSAGE_SOURCE_CLASS_NAME_SUFFIX);
Package pkg = context.getCodeModel()._package(context.getNameUtils().getPackageName(beanDefinitionParserName) + NamingContants.MESSAGE_SOURCE_NAMESPACE);
DefinedClass clazz = pkg._class(context.getNameUtils().getClassName(beanDefinitionParserName), new Class[]{
MuleContextAware.class,
Startable.class,
Stoppable.class,
Runnable.class,
Initialisable.class,
MessageSource.class,
SourceCallback.class,
FlowConstructAware.class});

Class[] inherits;
if (runnable) {
inherits = new Class[]{
MuleContextAware.class,
Startable.class,
Stoppable.class,
Runnable.class,
Initialisable.class,
MessageSource.class,
SourceCallback.class,
FlowConstructAware.class};
} else {
inherits = new Class[]{
MuleContextAware.class,
Startable.class,
Stoppable.class,
Initialisable.class,
MessageSource.class,
SourceCallback.class,
FlowConstructAware.class};

}

DefinedClass clazz = pkg._class(context.getNameUtils().getClassName(beanDefinitionParserName), inherits);

return clazz;
}
Expand Down Expand Up @@ -594,13 +610,13 @@ protected Method generateInitialiseMethod(DefinedClass messageProcessorClass, Ma
TryStatement tryLookUp = ifNoObject._then()._try();
tryLookUp.body().assign(object, muleContext.invoke("getRegistry").invoke("lookupObject").arg(ExpressionFactory.dotclass(pojoClass)));
Conditional ifObjectNoFound = tryLookUp.body()._if(Op.eq(object, ExpressionFactory._null()));
if(shouldAutoCreate) {
if (shouldAutoCreate) {
ifObjectNoFound._then().assign(object, ExpressionFactory._new(pojoClass));
ifObjectNoFound._then().add(muleContext.invoke("getRegistry").invoke("registerObject").arg(pojoClass.dotclass().invoke("getName")).arg(object));
} else {
ifObjectNoFound._then()._throw(ExpressionFactory._new(ref(InitialisationException.class)).
arg(ref(MessageFactory.class).staticInvoke("createStaticMessage").
arg("Cannot find object")).arg(ExpressionFactory._this()));
arg(ref(MessageFactory.class).staticInvoke("createStaticMessage").
arg("Cannot find object")).arg(ExpressionFactory._this()));
}
CatchBlock catchBlock = tryLookUp._catch(ref(RegistrationException.class));
Variable exception = catchBlock.param("e");
Expand All @@ -617,9 +633,9 @@ protected Method generateInitialiseMethod(DefinedClass messageProcessorClass, Ma
Conditional ifObjectIsString = initialise.body()._if(Op._instanceof(object, ref(String.class)));
ifObjectIsString._then().assign(object, muleContext.invoke("getRegistry").invoke("lookupObject").arg(ExpressionFactory.cast(ref(String.class), object)));
ifObjectIsString._then()._if(Op.eq(object, ExpressionFactory._null()))._then().
_throw(ExpressionFactory._new(ref(InitialisationException.class)).
arg(ref(MessageFactory.class).staticInvoke("createStaticMessage").
arg("Cannot find object by config name")).arg(ExpressionFactory._this()));
_throw(ExpressionFactory._new(ref(InitialisationException.class)).
arg(ref(MessageFactory.class).staticInvoke("createStaticMessage").
arg("Cannot find object by config name")).arg(ExpressionFactory._this()));

if (fields != null) {
for (String fieldName : fields.keySet()) {
Expand Down Expand Up @@ -969,7 +985,7 @@ protected void generateSourceCallbackProcessMethodWithNoPayload(DefinedClass mes
CatchBlock catchException = tryBlock._catch(ref(Exception.class));
Variable exception = catchException.param("e");
catchException.body()._throw(exception);

process.body()._return(ExpressionFactory._null());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.mule.api.annotations.Connector;
import org.mule.api.annotations.Module;
import org.mule.api.annotations.Source;
import org.mule.api.annotations.SourceThreadingModel;
import org.mule.api.callback.SourceCallback;
import org.mule.api.callback.StopSourceCallback;
import org.mule.config.i18n.CoreMessages;
import org.mule.devkit.generation.AbstractMessageGenerator;
import org.mule.devkit.generation.DevKitTypeElement;
Expand Down Expand Up @@ -72,7 +74,8 @@ protected void doGenerate(DevKitTypeElement typeElement) {

private void generateMessageSource(DevKitTypeElement typeElement, ExecutableElement executableElement) {
// get class
DefinedClass messageSourceClass = getMessageSourceClass(executableElement);
Source sourceAnnotation = executableElement.getAnnotation(Source.class);
DefinedClass messageSourceClass = getMessageSourceClass(executableElement, sourceAnnotation.threadingModel() == SourceThreadingModel.SINGLE_THREAD);

messageSourceClass.javadoc().add(messageSourceClass.name() + " wraps ");
messageSourceClass.javadoc().add("{@link " + ((TypeElement) executableElement.getEnclosingElement()).getQualifiedName().toString() + "#");
Expand Down Expand Up @@ -105,8 +108,17 @@ private void generateMessageSource(DevKitTypeElement typeElement, ExecutableElem
FieldVariable muleContext = generateFieldForMuleContext(messageSourceClass);
FieldVariable flowConstruct = generateFieldForFlowConstruct(messageSourceClass);
FieldVariable messageProcessor = generateFieldForMessageProcessorListener(messageSourceClass);
FieldVariable thread = messageSourceClass.field(Modifier.PRIVATE, ref(Thread.class), "thread");
thread.javadoc().add("Thread under which this message source will execute");

FieldVariable stopSourceCallback = null;
if( executableElement.getReturnType().toString().contains("StopSourceCallback") ) {
stopSourceCallback = messageSourceClass.field(Modifier.PRIVATE, ref(StopSourceCallback.class), "stopSourceCallback");
}

FieldVariable thread = null;
if (sourceAnnotation.threadingModel() == SourceThreadingModel.SINGLE_THREAD) {
thread = messageSourceClass.field(Modifier.PRIVATE, ref(Thread.class), "thread");
thread.javadoc().add("Thread under which this message source will execute");
}

// add initialise
generateInitialiseMethod(messageSourceClass, fields, typeElement, muleContext, null, null, object, null, !typeElement.needsConfig());
Expand Down Expand Up @@ -140,41 +152,61 @@ private void generateMessageSource(DevKitTypeElement typeElement, ExecutableElem
generateSourceCallbackProcessWithPropertiesMethod(messageSourceClass, messageProcessor, muleContext, flowConstruct);
generateSourceCallbackProcessMethodWithNoPayload(messageSourceClass, messageProcessor, muleContext, flowConstruct);

// add start method
generateStartMethod(messageSourceClass, thread);
if (sourceAnnotation.threadingModel() == SourceThreadingModel.SINGLE_THREAD) {
// add start method
generateSingleThreadStartMethod(messageSourceClass, thread);
// add stop method
generateSingleThreadStopMethod(messageSourceClass, thread);
} else {
// get pool object if poolable
if (typeElement.isPoolable()) {
DefinedClass poolObjectClass = context.getClassForRole(context.getNameUtils().generatePoolObjectRoleKey(typeElement));

// add stop method
generateStopMethod(messageSourceClass, thread);
// add start method method
generateNoThreadStartMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, poolObjectClass, flowConstruct, stopSourceCallback);
} else {
// add start method method
generateNoThreadStartMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, flowConstruct, stopSourceCallback);
}
// add stop method
generateNoThreadStopMethod(messageSourceClass, stopSourceCallback, executableElement);
}

// get pool object if poolable
if (typeElement.isPoolable()) {
DefinedClass poolObjectClass = context.getClassForRole(context.getNameUtils().generatePoolObjectRoleKey(typeElement));
if (sourceAnnotation.threadingModel() == SourceThreadingModel.SINGLE_THREAD) {
// get pool object if poolable
if (typeElement.isPoolable()) {
DefinedClass poolObjectClass = context.getClassForRole(context.getNameUtils().generatePoolObjectRoleKey(typeElement));

// add run method
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, poolObjectClass, flowConstruct);
} else {
// add run method
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, flowConstruct);
// add run method
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, poolObjectClass, flowConstruct, stopSourceCallback);
} else {
// add run method
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, flowConstruct, stopSourceCallback);
}
}
}

private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, FieldVariable flowConstruct) {
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, null, flowConstruct);
private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, FieldVariable flowConstruct, FieldVariable stopSourceCallback) {
generateRunMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, null, flowConstruct, stopSourceCallback);
}


private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, DefinedClass poolObjectClass, FieldVariable flowConstruct) {
private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, DefinedClass poolObjectClass, FieldVariable flowConstruct, FieldVariable stopSourceCallback) {
String methodName = executableElement.getSimpleName().toString();
Source sourceAnnotation = executableElement.getAnnotation(Source.class);
Method run = messageSourceClass.method(Modifier.PUBLIC, context.getCodeModel().VOID, "run");
run.javadoc().add("Implementation {@link Runnable#run()} that will invoke the method on the pojo that this message source wraps.");

DefinedClass moduleObjectClass = context.getClassForRole(context.getNameUtils().generateModuleObjectRoleKey((TypeElement)executableElement.getEnclosingElement()));
Variable moduleObject = run.body().decl(moduleObjectClass, "castedModuleObject", ExpressionFactory._null());
generateSourceExecution(run.body(), executableElement, fields, connectFields, object, muleContext, poolObjectClass, flowConstruct, methodName, sourceAnnotation, stopSourceCallback);
}

private void generateSourceExecution(Block body, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, DefinedClass poolObjectClass, FieldVariable flowConstruct, String methodName, Source sourceAnnotation, FieldVariable stopSourceCallback) {
DefinedClass moduleObjectClass = context.getClassForRole(context.getNameUtils().generateModuleObjectRoleKey((TypeElement) executableElement.getEnclosingElement()));
Variable moduleObject = body.decl(moduleObjectClass, "castedModuleObject", ExpressionFactory._null());

Variable poolObject = null;
if (poolObjectClass != null) {
poolObject = run.body().decl(poolObjectClass, "poolObject", ExpressionFactory._null());
poolObject = body.decl(poolObjectClass, "poolObject", ExpressionFactory._null());
}

// add connection field declarations
Expand All @@ -183,22 +215,22 @@ private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElemen
Variable connection = null;
if (connectMethod != null) {
DefinedClass connectionClass = context.getClassForRole(context.getNameUtils().generateConnectorObjectRoleKey((TypeElement) connectMethod.getEnclosingElement()));
connection = run.body().decl(connectionClass, "connection", ExpressionFactory._null());
connection = body.decl(connectionClass, "connection", ExpressionFactory._null());

for (VariableElement variable : connectMethod.getParameters()) {
String fieldName = variable.getSimpleName().toString();

Type type = ref(connectFields.get(fieldName).getVariableElement().asType()).boxify();
String name = "transformed" + StringUtils.capitalize(fieldName);

Variable transformed = run.body().decl(type, name, ExpressionFactory._null());
Variable transformed = body.decl(type, name, ExpressionFactory._null());
connectionParameters.put(fieldName, transformed);
}
}

TryStatement callSource = run.body()._try();
TryStatement callSource = body._try();

if( sourceAnnotation.primaryNodeOnly() ) {
if (sourceAnnotation.primaryNodeOnly()) {
WhileLoop ifNotPrimary = callSource.body()._while(Op.not(muleContext.invoke("isPrimaryPollingInstance")));
ifNotPrimary.body().add(ref(Thread.class).staticInvoke("sleep").arg(ExpressionFactory.lit(5000)));
}
Expand Down Expand Up @@ -297,9 +329,13 @@ private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElemen
methodCall.arg(parameters.get(i));
}

callSource.body().add(methodCall);
if( executableElement.getReturnType().toString().contains("StopSourceCallback") ) {
callSource.body().assign(stopSourceCallback, methodCall);
} else {
callSource.body().add(methodCall);
}

if( sourceAnnotation.primaryNodeOnly() ) {
if (sourceAnnotation.primaryNodeOnly()) {
// catch interrupted exception and do nothing
callSource._catch(ref(InterruptedException.class));
}
Expand Down Expand Up @@ -340,7 +376,7 @@ private void generateRunMethod(DefinedClass messageSourceClass, ExecutableElemen
}
}

private void generateStartMethod(DefinedClass messageSourceClass, FieldVariable thread) {
private void generateSingleThreadStartMethod(DefinedClass messageSourceClass, FieldVariable thread) {
Method start = messageSourceClass.method(Modifier.PUBLIC, context.getCodeModel().VOID, "start");
start.javadoc().add("Method to be called when Mule instance gets started.");
start._throws(ref(MuleException.class));
Expand All @@ -354,13 +390,49 @@ private void generateStartMethod(DefinedClass messageSourceClass, FieldVariable
}


private void generateStopMethod(DefinedClass messageSourceClass, FieldVariable thread) {
private void generateSingleThreadStopMethod(DefinedClass messageSourceClass, FieldVariable thread) {
Method stop = messageSourceClass.method(Modifier.PUBLIC, context.getCodeModel().VOID, "stop");
stop.javadoc().add("Method to be called when Mule instance gets stopped.");
stop._throws(ref(MuleException.class));

stop.body().add(thread.invoke("interrupt"));
}

private void generateNoThreadStartMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, FieldVariable flowConstruct, FieldVariable stopSourceCallback) {
generateNoThreadStartMethod(messageSourceClass, executableElement, fields, connectFields, object, muleContext, null, flowConstruct, stopSourceCallback);
}

private void generateNoThreadStartMethod(DefinedClass messageSourceClass, ExecutableElement executableElement, Map<String, FieldVariableElement> fields, Map<String, FieldVariableElement> connectFields, FieldVariable object, FieldVariable muleContext, DefinedClass poolObjectClass, FieldVariable flowConstruct, FieldVariable stopSourceCallback) {
String methodName = executableElement.getSimpleName().toString();
Source sourceAnnotation = executableElement.getAnnotation(Source.class);
Method start = messageSourceClass.method(Modifier.PUBLIC, context.getCodeModel().VOID, "start");
start.javadoc().add("Method to be called when Mule instance gets started.");
start._throws(ref(MuleException.class));

generateSourceExecution(start.body(), executableElement, fields, connectFields, object, muleContext, poolObjectClass, flowConstruct, methodName, sourceAnnotation, stopSourceCallback);
}


private void generateNoThreadStopMethod(DefinedClass messageSourceClass, FieldVariable stopSourceCallback, ExecutableElement executableElement) {
String methodName = executableElement.getSimpleName().toString();
Method stop = messageSourceClass.method(Modifier.PUBLIC, context.getCodeModel().VOID, "stop");
stop.javadoc().add("Method to be called when Mule instance gets stopped.");
stop._throws(ref(MuleException.class));

if( stopSourceCallback != null ) {
Conditional ifStopCallbackNotNull = stop.body()._if(Op.ne(stopSourceCallback, ExpressionFactory._null()));
TryStatement tryToStop = ifStopCallbackNotNull._then()._try();
tryToStop.body().add(stopSourceCallback.invoke("stop"));
CatchBlock catchException = tryToStop._catch(ref(Exception.class));
Variable e = catchException.param("e");

Invocation messagingException = ExpressionFactory._new(ref(MessagingException.class));
messagingException.arg(ref(CoreMessages.class).staticInvoke("failedToStop").arg(methodName));
messagingException.arg(ExpressionFactory.cast(ref(MuleEvent.class), ExpressionFactory._null()));
messagingException.arg(e);

catchException.body()._throw(messagingException);
}
}

}
Loading

0 comments on commit f299a26

Please sign in to comment.