Skip to content

Commit

Permalink
Merge 62636f1 into 4c8b15f
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemJiang committed Aug 22, 2018
2 parents 4c8b15f + 62636f1 commit f7bc10e
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 9 deletions.
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.saga.omega.context.annotations;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
* Indicates the annotated method will start a TCC .
*/
@Retention(RUNTIME)
@Target(METHOD)
public @interface TccStart {
/**
* TCC timeout, in seconds. <br>
* Default value is 0, which means never timeout.
*
* @return
*/
int timeout() default 0;
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.saga.omega.transaction.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
/**
* Indicates the annotated method will start a sub-transaction. <br>
* A <code>@Participate</code> method should satisfy below requirements:
* <ol>
* <li>all parameters are serialized</li>
* <li>is idempotent</li>
* <li>the object instance which @Participate method resides in should be stateless</li>
* </ol>
*/
public @interface Participate {
/**
* Confirm method name.<br>
* A confirm method should satisfy below requirements:
* <ol>
* <li>has same parameter list as @Participate method's</li>
* <li>all parameters are serialized</li>
* <li>is idempotent</li>
* <li>be in the same class as @Participate method is in</li>
* </ol>
*
* @return
*/
String confirmMethod() default "";

/**
* Cancel method name.<br>
* A cancel method should satisfy below requirements:
* <ol>
* <li>has same parameter list as @Participate method's</li>
* <li>all parameters are serialized</li>
* <li>is idempotent</li>
* <li>be in the same class as @Participate method is in</li>
* </ol>
*
* @return
*/
String cancelMethod() default "";

}
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.saga.omega.transaction.tcc;

import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;

public interface TccEventService {

void onConnected();

void onDisconnected();

void close();

String target();

AlphaResponse participate(ParticipatedEvent participateEvent);

AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);

AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);

AlphaResponse send(TxEvent event);

}
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.saga.omega.transaction.tcc;

import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;

import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Aspect
public class TccParticipatorAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final OmegaContext context;
private final TccEventService tccEventService;

public TccParticipatorAspect(TccEventService tccEventService, OmegaContext context) {
this.context = context;
this.tccEventService = tccEventService;
}

@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Participate * *(..)) && @annotation(participate)")
Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
String localTxId = context.localTxId();
String cancelMethod = participate.cancelMethod();
String confirmMethod = participate.confirmMethod();

context.newLocalTxId();
LOG.debug("Updated context {} for participate method {} ", context, method.toString());

try {
Object result = joinPoint.proceed();
// Send the participate message back
tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod, confirmMethod,
TransactionStatus.Succeed));
LOG.debug("Participate Transaction with context {} has finished.", context);
return result;
} catch (Throwable throwable) {
// Now we don't handle the error message
tccEventService.participate(new ParticipatedEvent(context.globalTxId(), context.localTxId(), localTxId, cancelMethod,
confirmMethod, TransactionStatus.Failed));
LOG.error("Participate Transaction with context {} failed.", context, throwable);
throw throwable;
} finally {
context.setLocalTxId(localTxId);
}
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.saga.omega.transaction.tcc;

import javax.transaction.TransactionalException;

import org.apache.servicecomb.saga.common.TransactionStatus;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;

public class TccStartAnnotationProcessor implements EventAwareInterceptor {

private final OmegaContext omegaContext;
private final TccEventService eventService;

TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService eventService) {
this.omegaContext = omegaContext;
this.eventService = eventService;
}

@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
int retries, Object... message) {
try {
return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
}

@Override
public void postIntercept(String parentTxId, String compensationMethod) {
eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
TransactionStatus.Succeed));
}

@Override
public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
// Send the cancel event
// Do we need to wait for the alpha finish all the transaction
eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
TransactionStatus.Failed));
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.saga.omega.transaction.tcc;

import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;

import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Aspect
public class TccStartAspect {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final TccStartAnnotationProcessor tccStartAnnotationProcessor;

private final OmegaContext context;

public TccStartAspect(TccEventService tccEventServicer, OmegaContext context) {
this.context = context;
this.tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, tccEventServicer);
}

@Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart * *(..)) && @annotation(tccStart)")
Object advise(ProceedingJoinPoint joinPoint, TccStart tccStart) throws Throwable {
initializeOmegaContext();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();

tccStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), tccStart.timeout(), "", 0);
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());

try {
Object result = joinPoint.proceed();

tccStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString());
LOG.debug("Transaction with context {} has finished.", context);

return result;
} catch (Throwable throwable) {
// We don't need to handle the OmegaException here
if (!(throwable instanceof OmegaException)) {
tccStartAnnotationProcessor.onError(context.globalTxId(), method.toString(), throwable);
LOG.error("Transaction {} failed.", context.globalTxId());
}
throw throwable;
} finally {
context.clear();
}
}

private void initializeOmegaContext() {
context.setLocalTxId(context.newGlobalTxId());
}
}

0 comments on commit f7bc10e

Please sign in to comment.