Skip to content

Commit

Permalink
eventapis-10 Ability to register custom rollback functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
zeldal committed Aug 18, 2017
1 parent adcddf8 commit 4c0f540
Show file tree
Hide file tree
Showing 31 changed files with 102 additions and 94 deletions.
@@ -1,11 +1,6 @@
package com.kloia.eventapis.view;
package com.kloia.eventapis.api;

import com.google.common.reflect.TypeToken;
import com.kloia.eventapis.common.PublishedEvent;
import lombok.Getter;
import lombok.NonNull;

import java.lang.reflect.ParameterizedType;

/**
* Created by zeldalozdemir on 21/02/2017.
Expand Down
@@ -1,7 +1,5 @@
package com.kloia.eventapis.cassandra;

import com.kloia.eventapis.exception.EventStoreException;

public class ConcurrentEventException extends Exception {
private Exception exception;

Expand Down
Expand Up @@ -3,8 +3,6 @@
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.exception.EventStoreException;

import java.util.UUID;

public class DefaultConcurrencyResolver implements ConcurrencyResolver<ConcurrentEventException> {
@Override
public void tryMore() throws ConcurrentEventException {
Expand Down
@@ -1,12 +1,8 @@
package com.kloia.eventapis.common;

import com.datastax.driver.core.querybuilder.Clause;
import com.kloia.eventapis.cassandra.ConcurrencyResolver;
import com.kloia.eventapis.cassandra.ConcurrentEventException;
import com.kloia.eventapis.cassandra.EntityEvent;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.view.Entity;
import com.kloia.eventapis.view.EntityFunctionSpec;

import java.util.List;
import java.util.Optional;
Expand Down
@@ -1,8 +1,8 @@
package com.kloia.eventapis.common;


import com.kloia.eventapis.api.Views;
import com.fasterxml.jackson.annotation.JsonView;
import com.kloia.eventapis.api.Views;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down
@@ -1,32 +1,63 @@
package com.kloia.eventapis.view;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.RollbackSpec;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.EntityEvent;
import com.kloia.eventapis.common.PublishedEvent;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.pojos.Operation;
import com.kloia.eventapis.pojos.TransactionState;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.lang.reflect.ParameterizedType;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class AggregateListener {
ViewQuery viewQuery;
EventRepository eventRepository;
SnapshotRepository snapshotRepository;
private ObjectMapper objectMapper;
private final Map<String, Map.Entry<Class<PublishedEvent>,RollbackSpec>> rollbackSpecMap;

public AggregateListener(ViewQuery viewQuery, EventRepository eventRepository, SnapshotRepository snapshotRepository, List<RollbackSpec> rollbackSpecs) {
public AggregateListener(ViewQuery viewQuery, EventRepository eventRepository, SnapshotRepository snapshotRepository, List<RollbackSpec> rollbackSpecs, ObjectMapper objectMapper) {
this.viewQuery = viewQuery;
this.eventRepository = eventRepository;
this.snapshotRepository = snapshotRepository;
this.objectMapper = objectMapper;
rollbackSpecMap = new HashMap<>();
rollbackSpecs.forEach(rollbackSpec -> {
ParameterizedType type = (ParameterizedType) TypeToken.of(rollbackSpec.getClass()).getSupertype(RollbackSpec.class).getType();
try {
Class<PublishedEvent> publishedEventClass = (Class<PublishedEvent>) Class.forName(type.getActualTypeArguments()[0].getTypeName());
rollbackSpecMap.put(publishedEventClass.getSimpleName(), new AbstractMap.SimpleEntry<>(publishedEventClass, rollbackSpec));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}

public void listenOperations(ConsumerRecord<String, Operation> data) {
try {
log.info("Incoming Message: " + data.key()+ " "+ data.value());
if (data.value().getTransactionState() == TransactionState.TXN_FAILED) {
eventRepository.markFail(data.key());
List<EntityEvent> entityEvents = eventRepository.markFail(data.key());
entityEvents.forEach(entityEvent -> {
try {
Map.Entry<Class<PublishedEvent>, RollbackSpec> specEntry = rollbackSpecMap.get(entityEvent.getEventType());
if(specEntry != null)
specEntry.getValue().rollback(new EntityEventWrapper<>(specEntry.getKey(),objectMapper,entityEvent).getEventData());
} catch (EventStoreException e) {
log.warn(e.getMessage(),e);
}
});
snapshotRepository.save(viewQuery.queryByOpId(data.key()));
}else if (data.value().getTransactionState() == TransactionState.TXN_SUCCEDEED) {
snapshotRepository.save(viewQuery.queryByOpId(data.key()));
Expand Down
@@ -1,7 +1,6 @@
package com.kloia.eventapis.view;

import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.view.EntityEventWrapper;

/**
* Created by zeldalozdemir on 21/02/2017.
Expand Down
2 changes: 0 additions & 2 deletions kafka-tester/src/main/java/Eventapis.java
@@ -1,12 +1,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;

import java.util.Collection;
import java.util.HashMap;
Expand Down
@@ -1,9 +1,9 @@
package com.kloia.sample.commands;

import com.kloia.eventapis.api.CommandHandler;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.command.CreateOrderCommandDto;
import com.kloia.sample.dto.event.OrderCreatedEvent;
import com.kloia.sample.model.Order;
Expand Down
@@ -1,12 +1,12 @@
package com.kloia.sample.commands;

import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.api.CommandHandler;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.api.RollbackSpec;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.view.RollbackSpec;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.command.ProcessOrderCommandDto;
import com.kloia.sample.dto.event.ReserveStockEvent;
import com.kloia.sample.model.Order;
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.IUserContext;
import com.kloia.eventapis.api.RollbackSpec;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.CassandraEventRecorder;
import com.kloia.eventapis.cassandra.CassandraSession;
Expand All @@ -14,7 +15,6 @@
import com.kloia.eventapis.spring.configuration.EventApisConfiguration;
import com.kloia.eventapis.view.AggregateListener;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.view.RollbackSpec;
import com.kloia.sample.model.Order;
import com.kloia.sample.repository.OrderRepository;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

@Configuration
@Slf4j
Expand All @@ -43,7 +42,7 @@ public class Components {
@Bean
AggregateListener snapshotRecorder(ViewQuery<Order> orderViewRepository, EventRepository orderEventRepository, OrderRepository orderRepository,
Optional<List<RollbackSpec>> rollbackSpecs) {
return new AggregateListener(orderViewRepository, orderEventRepository, orderRepository, rollbackSpecs.orElseGet(ArrayList::new));
return new AggregateListener(orderViewRepository, orderEventRepository, orderRepository, rollbackSpecs.orElseGet(ArrayList::new), objectMapper);
}

@Bean
Expand Down
@@ -1,13 +1,13 @@
package com.kloia.sample.controller.event;

import com.kloia.eventapis.api.EventHandler;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.ConcurrentEventException;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.exception.EventPulisherException;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.event.OrderPaidEvent;
import com.kloia.sample.dto.event.PaymentSuccessEvent;
import com.kloia.sample.model.Order;
Expand Down
@@ -1,13 +1,13 @@
package com.kloia.sample.controller.event;

import com.kloia.eventapis.api.EventHandler;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.ConcurrentEventException;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.exception.EventPulisherException;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.event.PaymentProcessEvent;
import com.kloia.sample.dto.event.StockReservedEvent;
import com.kloia.sample.model.Order;
Expand Down
@@ -1,6 +1,5 @@
package com.kloia.sample.dto.command;

import com.kloia.sample.model.PaymentInformation;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down
@@ -1,9 +1,9 @@
package com.kloia.sample.dto.event;

import com.fasterxml.jackson.annotation.JsonView;
import com.kloia.eventapis.api.Views;
import com.kloia.eventapis.common.EventType;
import com.kloia.eventapis.common.PublishedEvent;
import com.kloia.eventapis.api.Views;
import com.kloia.sample.model.PaymentInformation;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
Expand Up @@ -2,7 +2,6 @@

import com.kloia.eventapis.view.Entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.IUserContext;
import com.kloia.eventapis.api.RollbackSpec;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.CassandraEventRecorder;
import com.kloia.eventapis.cassandra.CassandraSession;
Expand All @@ -14,10 +15,8 @@
import com.kloia.eventapis.spring.configuration.EventApisConfiguration;
import com.kloia.eventapis.view.AggregateListener;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.view.RollbackSpec;
import com.kloia.sample.model.Payment;
import com.kloia.sample.repository.PaymentRepository;
import com.sun.javafx.scene.control.skin.VirtualFlow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand All @@ -43,7 +42,7 @@ public class Components {
@Bean
AggregateListener snapshotRecorder(ViewQuery<Payment> paymentViewRepository, EventRepository paymentEventRepository, PaymentRepository paymentRepository,
Optional<List<RollbackSpec>> rollbackSpecs) {
return new AggregateListener(paymentViewRepository, paymentEventRepository, paymentRepository, rollbackSpecs.orElseGet(ArrayList::new));
return new AggregateListener(paymentViewRepository, paymentEventRepository, paymentRepository, rollbackSpecs.orElseGet(ArrayList::new),objectMapper);
}

@Bean
Expand Down
@@ -1,12 +1,6 @@
package com.kloia.sample.controller;

import com.kloia.eventapis.exception.EventPulisherException;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.sample.controller.event.DoPaymentEventHandler;
import com.kloia.sample.dto.event.PaymentProcessEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Controller;


Expand Down
Expand Up @@ -2,8 +2,6 @@

import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.common.EventRecorder;
import com.kloia.sample.controller.event.DoPaymentEventHandler;
import com.kloia.sample.model.Payment;
import com.kloia.sample.repository.PaymentRepository;
import lombok.extern.slf4j.Slf4j;
Expand Down
@@ -1,13 +1,13 @@
package com.kloia.sample.controller.event;

import com.kloia.eventapis.api.EventHandler;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.ConcurrentEventException;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.exception.EventPulisherException;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.exception.EventStoreException;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.event.PaymentProcessEvent;
import com.kloia.sample.dto.event.PaymentSuccessEvent;
import com.kloia.sample.model.Payment;
Expand Down
@@ -1,9 +1,9 @@
package com.kloia.sample.dto.event;

import com.fasterxml.jackson.annotation.JsonView;
import com.kloia.eventapis.api.Views;
import com.kloia.eventapis.common.EventType;
import com.kloia.eventapis.common.PublishedEvent;
import com.kloia.eventapis.api.Views;
import lombok.Data;

@Data
Expand Down
4 changes: 2 additions & 2 deletions samples/pom.xml
Expand Up @@ -156,11 +156,11 @@
<artifactId>querydsl-jpa</artifactId>
<version>${querydsl.version}</version>
</dependency>
<!-- <dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>-->
</dependency>
</dependencies>
<build>
<resources>
Expand Down
@@ -1,10 +1,10 @@
package com.kloia.sample.commands;

import com.kloia.eventapis.api.CommandHandler;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.common.EventKey;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.sample.dto.command.CreateStockCommandDto;
import com.kloia.sample.dto.event.StockCreatedEvent;
import com.kloia.sample.model.Stock;
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kloia.eventapis.api.EventRepository;
import com.kloia.eventapis.api.IUserContext;
import com.kloia.eventapis.api.RollbackSpec;
import com.kloia.eventapis.api.ViewQuery;
import com.kloia.eventapis.cassandra.CassandraEventRecorder;
import com.kloia.eventapis.cassandra.CassandraSession;
Expand All @@ -14,7 +15,6 @@
import com.kloia.eventapis.spring.configuration.EventApisConfiguration;
import com.kloia.eventapis.view.AggregateListener;
import com.kloia.eventapis.view.EntityFunctionSpec;
import com.kloia.eventapis.view.RollbackSpec;
import com.kloia.sample.model.Stock;
import com.kloia.sample.repository.StockRepository;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

@Configuration
@Slf4j
Expand All @@ -44,7 +43,7 @@ public class Components {
@Bean
AggregateListener snapshotRecorder(ViewQuery<Stock> stockViewRepository, EventRepository stockEventRepository, StockRepository stockRepository,
Optional<List<RollbackSpec>> rollbackSpecs){
return new AggregateListener(stockViewRepository,stockEventRepository, stockRepository, rollbackSpecs.orElseGet(ArrayList::new) );
return new AggregateListener(stockViewRepository,stockEventRepository, stockRepository, rollbackSpecs.orElseGet(ArrayList::new), objectMapper );
}

@Bean
Expand Down

0 comments on commit 4c0f540

Please sign in to comment.