Skip to content

Commit

Permalink
Adding more flexible events processing approach using Guava's EventBus.
Browse files Browse the repository at this point in the history
  • Loading branch information
phillip-michailov committed Jan 26, 2016
1 parent 12eeaee commit 6299106
Show file tree
Hide file tree
Showing 14 changed files with 538 additions and 45 deletions.
10 changes: 10 additions & 0 deletions README.md
Expand Up @@ -68,6 +68,16 @@ try (Connection connection = ...;
}
```

In order to get events about loading process use `HierarchicalInfileObjectLoader.subscribe()` and otherwise `HierarchicalInfileObjectLoader.unsubscribe()` to stop receiving events.
Listener should have have public method that accepts appropriate event as argument and marked by `@Subscribe` annotation.
```java
public class Listener {
@Subscribe public void handle(SaveEntityEvent event) { /* handler code for SaveEntityEvent events */ }
@Subscribe public void handle(FlushEvent event) { /* handler code for FlushEvent events */ }
}
```
Refer to [documentation](https://github.com/google/guava/wiki/EventBusExplained) to get more information.

# What license is jPile released under?

jPile is released on the MIT license which is available in `license.txt` to read.
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Expand Up @@ -33,7 +33,6 @@
<findbugs-maven-version>3.0.1</findbugs-maven-version>
</properties>


<scm>
<url>git@github.com:opower/jpile.git</url>
<connection>scm:git:git@github.com:opower/jpile.git</connection>
Expand Down
1 change: 1 addition & 0 deletions release_notes.txt
Expand Up @@ -3,6 +3,7 @@ New in 1.8.0
Minor fixes for performance test
Now uses Spring 3.2.6
Added `HierarchicalInfileObjectLoader#setIgnoredClasses`; deprecated `HierarchicalInfileObjectLoader#setClassesToIgnore`
Added more flexible events processing approach using Guava's EventBus

New in 1.7.11
Added greater precision when persisting float values
Expand Down
@@ -0,0 +1,39 @@
package com.opower.persistence.jpile.infile.events;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Base class for events.
*
* @author phillip-michailov
*/
public class EventBase {

private final Object source;
private final EventFirePoint firePoint;

/**
* Constructor.
*
* @param source object that fired this event.
* @param firePoint point in code path where this event occurred.
*/
public EventBase(Object source, EventFirePoint firePoint) {
this.source = checkNotNull(source, "Source cannot be null");
this.firePoint = checkNotNull(firePoint, "Fire point cannot be null");
}

/**
* @return object that fired this event.
*/
public Object getSource() {
return this.source;
}

/**
* @return point in code path where this event occurred.
*/
public EventFirePoint getFirePoint() {
return this.firePoint;
}
}
@@ -0,0 +1,11 @@
package com.opower.persistence.jpile.infile.events;

/**
* A point in the code path where an event was fired.
* For now it can be either `before` some action or 'after' it.
*
* @author phillip-michailov
*/
public enum EventFirePoint {
BEFORE, AFTER
}
@@ -0,0 +1,59 @@
package com.opower.persistence.jpile.infile.events;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* This event indicates when infile buffer was flushed.
*
* @author phillip-michailov
*/
public class FlushEvent extends EventBase {

private final Class<?> entityClass;
private final String tableName;
private final long timestamp;

/**
* Constructor
*
* @param source object that fired this event.
* @param firePoint point in code path where this event occurred.
* @param entityClass class annotated by {@link javax.persistence.Entity}.
* @param tableName name of database table for which this event occurred.
* @param timestamp when this event occurred in nanoseconds.
*/
public FlushEvent(Object source, EventFirePoint firePoint, Class<?> entityClass, String tableName, long timestamp) {
super(source, firePoint);
this.entityClass = checkNotNull(entityClass, "Entity class cannot be null");
this.tableName = checkNotNull(tableName, "Table name cannot be null");
this.timestamp = timestamp;
}

/**
* Constructs event by capturing current {@link System#nanoTime()} as {@link #timestamp}.
*/
public FlushEvent(Object source, EventFirePoint firePoint, Class<?> entityClass, String tableName) {
this(source, firePoint, entityClass, tableName, System.nanoTime());
}

/**
* @return class annotated by {@link javax.persistence.Entity}.
*/
public Class<?> getEntityClass() {
return this.entityClass;
}

/**
* @return when this event occurred in nanoseconds.
*/
public long getTimestamp() {
return this.timestamp;
}

/**
* @return name of database table for which this event occurred.
*/
public String getTableName() {
return this.tableName;
}
}
@@ -0,0 +1,32 @@
package com.opower.persistence.jpile.infile.events;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* This event indicates that object was saved to the infile buffer.
*
* @author phillip-michailov
*/
public class SaveEntityEvent extends EventBase {

private final Object entity;

/**
* Constructor
*
* @param source object that fired this event.
* @param firePoint point in code path where this event occurred.
* @param entity entity instance that was saved to buffer.
*/
public SaveEntityEvent(Object source, EventFirePoint firePoint, Object entity) {
super(source, firePoint);
this.entity = checkNotNull(entity, "Entity cannot be null");
}

/**
* @return entity instance that was saved to buffer.
*/
public Object getEntity() {
return this.entity;
}
}
@@ -0,0 +1,42 @@
package com.opower.persistence.jpile.infile.events;

import com.google.common.eventbus.Subscribe;
import com.opower.persistence.jpile.loader.HierarchicalInfileObjectLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Adapter for {@link HierarchicalInfileObjectLoader.CallBack} for backward compatibility.
*
* @author phillip-michailov
*/
@SuppressWarnings("deprecation")
public class SaveEntityEventAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(SaveEntityEventAdapter.class);

private final HierarchicalInfileObjectLoader.CallBack callBack;

public SaveEntityEventAdapter(HierarchicalInfileObjectLoader.CallBack callBack) {
this.callBack = checkNotNull(callBack, "callback");
}

@Subscribe
@SuppressWarnings("unused")
public void dispatch(SaveEntityEvent event) {
EventFirePoint firePoint = event.getFirePoint();
Object entity = event.getEntity();
switch (firePoint) {
case BEFORE:
this.callBack.onBeforeSave(entity);
break;
case AFTER:
this.callBack.onAfterSave(entity);
break;
default:
LOGGER.warn("Unknown fire point: {}", firePoint);
}
}
}
Expand Up @@ -4,7 +4,11 @@
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.opower.persistence.jpile.infile.InfileDataBuffer;
import com.opower.persistence.jpile.infile.events.EventFirePoint;
import com.opower.persistence.jpile.infile.events.SaveEntityEvent;
import com.opower.persistence.jpile.infile.events.SaveEntityEventAdapter;
import com.opower.persistence.jpile.reflection.CachedProxy;
import com.opower.persistence.jpile.reflection.PersistenceAnnotationInspector;
import com.opower.persistence.jpile.util.JdbcUtil;
Expand Down Expand Up @@ -55,12 +59,14 @@
* @since 1.0
*/
public class HierarchicalInfileObjectLoader implements Flushable, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(HierarchicalInfileObjectLoader.class);
private static final String EVENT_BUS_IDENTIFIER = "jpile-event-bus";

private PersistenceAnnotationInspector persistenceAnnotationInspector =
CachedProxy.create(new PersistenceAnnotationInspector());

private CallBack eventCallback = new NoOpCallBack();
private EventBus eventBus = new EventBus(EVENT_BUS_IDENTIFIER);
private Connection connection;

// linked for consistent error message
Expand Down Expand Up @@ -126,9 +132,9 @@ private void persistWithCyclicCheck(Object entity, Set<Object> cyclicCheck) {
}

// Save this entity now that we know all children have been saved
callOnBeforeEvent(entity);
this.eventBus.post(new SaveEntityEvent(this, EventFirePoint.BEFORE, entity));
this.primaryObjectLoaders.get(entity.getClass()).add(entity);
callOnAfterEvent(entity);
this.eventBus.post(new SaveEntityEvent(this, EventFirePoint.AFTER, entity));

// Get generated id
Object id = PersistenceAnnotationInspector.getIdValue(this.persistenceAnnotationInspector, entity);
Expand Down Expand Up @@ -156,14 +162,6 @@ private void persistWithCyclicCheck(Object entity, Set<Object> cyclicCheck) {
}
}

private void callOnBeforeEvent(Object entity) {
this.eventCallback.onBeforeSave(entity);
}

private void callOnAfterEvent(Object entity) {
this.eventCallback.onAfterSave(entity);
}

private void initForClass(Class<?> aClass) {
findParentDependents(aClass);
findChildDependents(aClass);
Expand All @@ -174,7 +172,7 @@ private void createObjectLoader(Class<?> aClass) {
if (this.primaryObjectLoaders.containsKey(aClass)) {
return;
}
SingleInfileObjectLoader<Object> primaryLoader = new SingleInfileObjectLoaderBuilder<>(aClass)
SingleInfileObjectLoader<Object> primaryLoader = new SingleInfileObjectLoaderBuilder<>(aClass, this.eventBus)
.withBuffer(newInfileDataBuffer())
.withDefaultTableName()
.withJdbcConnection(this.connection)
Expand All @@ -187,7 +185,7 @@ private void createObjectLoader(Class<?> aClass) {
for (SecondaryTable secondaryTable : this.persistenceAnnotationInspector.findSecondaryTables(aClass)) {
if (!this.secondaryClassesToIgnore.contains(secondaryTable.name())) {
SingleInfileObjectLoader<Object> secondaryLoader
= new SingleInfileObjectLoaderBuilder<>(aClass)
= new SingleInfileObjectLoaderBuilder<>(aClass, this.eventBus)
.withBuffer(newInfileDataBuffer())
.withDefaultTableName()
.usingSecondaryTable(secondaryTable)
Expand Down Expand Up @@ -340,8 +338,67 @@ public void setSecondaryClassesToIgnore(Set<String> secondaryClassesToIgnore) {
this.secondaryClassesToIgnore = secondaryClassesToIgnore;
}

/**
* This method is decommissioned.
* <br/>
* Use {@link #subscribe(Object)} method by passing listener object which has public method that
* marked by {@link com.google.common.eventbus.Subscribe} annotation and accepts {@link SaveEntityEvent}
* as argument. Example:
* <pre>
* {@code public class Listener {
* @literal @Subscribe public void handle(SaveEntityEvent event) { }
* }}
* </pre>
*
* @see #subscribe
* @see #unsubscribe
*
* @deprecated
*/
@Deprecated
public void setEventCallback(CallBack eventCallback) {
this.eventCallback = eventCallback;
subscribe(new SaveEntityEventAdapter(eventCallback));
}

/**
* Subscribes listener to the events stream.
* <br/>
* Listener object should have public method that marked by {@link com.google.common.eventbus.Subscribe} annotation
* and accepts needed event type as argument. Example:
* <pre>
* {@code public class Listener {
* @literal @Subscribe public void handle(SaveEntityEvent event) { }
* @literal @Subscribe public void handle(FlushEvent event) { }
* }}
* </pre>
*
* @see EventBus#register(Object)
* @since 1.8.0
*/
public void subscribe(Object listener) {
this.eventBus.register(listener);
}

/**
* Un-subscribes listener from the events stream.
*
* @see EventBus#unregister(Object)
* @since 1.8.0
*/
public void unsubscribe(Object listener) {
this.eventBus.unregister(listener);
}

/**
* Sets event bus implementation.
*
* @see com.google.common.eventbus.EventBus
* @see com.google.common.eventbus.AsyncEventBus
*
* @since 1.8.0
*/
public void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
}

/**
Expand All @@ -354,7 +411,10 @@ public void setUseReplace(boolean useReplace) {

/**
* An event interface that can be used to do perform actions before and after persisting objects
*
* @deprecated
*/
@Deprecated
public interface CallBack {
/**
* Gets called before saving an object
Expand Down

This file was deleted.

0 comments on commit 6299106

Please sign in to comment.