Skip to content

Commit

Permalink
Merge 20e9afd into 8ff04bd
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemJiang committed Aug 28, 2018
2 parents 8ff04bd + 20e9afd commit 9a87756
Show file tree
Hide file tree
Showing 16 changed files with 749 additions and 17 deletions.
6 changes: 5 additions & 1 deletion omega/omega-connector/omega-connector-grpc/pom.xml
Expand Up @@ -18,7 +18,6 @@
~
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
Expand Down Expand Up @@ -58,6 +57,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
Expand Up @@ -59,11 +59,12 @@ public void onNext(GrpcCompensateCommand command) {

@Override
public void onError(Throwable t) {
LOG.error("failed to process grpc compensate command.", t);
LOG.error("Failed to process grpc compensate command.", t);
errorHandler.run();
}

@Override
public void onCompleted() {
// Do nothing here
}
}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.saga.omega.connector.grpc;

import java.lang.invoke.MethodHandles;

import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.stub.StreamObserver;

public class GrpcCoordinateStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MessageHandler messageHandler;


public GrpcCoordinateStreamObserver(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}

@Override
public void onNext(GrpcTccCoordinateCommand command) {
LOG.info("Received coordinate command, global tx id: {}, local tx id: {}, call method: {}",
command.getGlobalTxId(), command.getLocalTxId(), command.getMethod());
messageHandler.onReceive(command.getGlobalTxId(), command.getLocalTxId(), command.getParentTxId(), command.getMethod());
}

@Override
public void onError(Throwable t) {
//TODO need to find a way to handle the error
LOG.error("Failed to process grpc coordinate command.", t);
}

@Override
public void onCompleted() {
// Do nothing here
}
}
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.saga.omega.connector.grpc;

import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.tcc.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.tcc.TccEventService;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;

import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinatedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;

import io.grpc.ManagedChannel;

public class GrpcTccEventService implements TccEventService {
private final GrpcServiceConfig serviceConfig;
private final String target;
private final TccEventServiceBlockingStub tccBlockingEventService;
private final TccEventServiceStub tccAsyncEventService;
private final GrpcCoordinateStreamObserver observer;

public GrpcTccEventService(ServiceConfig serviceConfig,
ManagedChannel channel,
String address,
MessageHandler handler
) {
this.target = address;
tccBlockingEventService = TccEventServiceGrpc.newBlockingStub(channel);
tccAsyncEventService = TccEventServiceGrpc.newStub(channel);
this.serviceConfig = serviceConfig(serviceConfig.serviceName(), serviceConfig.instanceId());
observer = new GrpcCoordinateStreamObserver(handler);
}

@Override
public void onConnected() {
tccAsyncEventService.onConnected(serviceConfig, observer);
}

@Override
public void onDisconnected() {
tccBlockingEventService.onDisconnected(serviceConfig);
}

@Override
public void close() {
// do nothing here
}

@Override
public String target() {
return target;
}

@Override
public AlphaResponse participate(ParticipatedEvent participateEvent) {
GrpcAck grpcAck = tccBlockingEventService.participate(convertTo(participateEvent));
return new AlphaResponse(grpcAck.getAborted());
}

@Override
public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) {
GrpcAck grpcAck = tccBlockingEventService.onTccTransactionStarted(convertTo(tccStartEvent));
return new AlphaResponse(grpcAck.getAborted());
}


@Override
public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) {
GrpcAck grpcAck = tccBlockingEventService.onTccTransactionEnded(convertTo(tccEndEvent));
return new AlphaResponse(grpcAck.getAborted());

}

@Override
public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) {
GrpcAck grpcAck = tccBlockingEventService.onTccCoordinated(convertTo(coordinatedEvent));
return new AlphaResponse(grpcAck.getAborted());
}

private GrpcTccCoordinatedEvent convertTo(CoordinatedEvent coordinatedEvent) {
return GrpcTccCoordinatedEvent.newBuilder()
.setServiceName(serviceConfig.getServiceName())
.setInstanceId(serviceConfig.getInstanceId())
.setGlobalTxId(coordinatedEvent.getGlobalTxId())
.setLocalTxId(coordinatedEvent.getLocalTxId())
.setParentTxId(coordinatedEvent.getParentTxId())
.setMethodName(coordinatedEvent.getMethodName())
.setStatus(coordinatedEvent.getStatus().toString())
.build();
}

private GrpcServiceConfig serviceConfig(String serviceName, String instanceId) {
return GrpcServiceConfig.newBuilder()
.setServiceName(serviceName)
.setInstanceId(instanceId)
.build();
}

private GrpcTccTransactionStartedEvent convertTo(TccStartedEvent tccStartEvent) {
return GrpcTccTransactionStartedEvent.newBuilder()
.setServiceName(serviceConfig.getServiceName())
.setInstanceId(serviceConfig.getInstanceId())
.setGlobalTxId(tccStartEvent.getGlobalTxId())
.setLocalTxId(tccStartEvent.getLocalTxId())
.build();
}

private GrpcTccTransactionEndedEvent convertTo(TccEndedEvent tccEndEvent) {
return GrpcTccTransactionEndedEvent.newBuilder()
.setServiceName(serviceConfig.getServiceName())
.setInstanceId(serviceConfig.getInstanceId())
.setGlobalTxId(tccEndEvent.getGlobalTxId())
.setLocalTxId(tccEndEvent.getLocalTxId())
.setStatus(tccEndEvent.getStatus().toString())
.build();
}

private GrpcTccParticipatedEvent convertTo(ParticipatedEvent participateEvent) {
return GrpcTccParticipatedEvent.newBuilder()
.setServiceName(serviceConfig.getServiceName())
.setInstanceId(serviceConfig.getInstanceId())
.setGlobalTxId(participateEvent.getGlobalTxId())
.setLocalTxId(participateEvent.getLocalTxId())
.setParentTxId(participateEvent.getParentTxId())
.setCancelMethod(participateEvent.getCancelMethod())
.setConfirmMethod(participateEvent.getConfirmMethod())
.setStatus(participateEvent.getStatus().toString())
.build();
}
}

0 comments on commit 9a87756

Please sign in to comment.