Skip to content
Permalink
Browse files
Journaled Event implementation based on MongoDB
    * initial commit
  • Loading branch information
Alexei Krainiouk authored and Alexei Krainiouk committed Jan 4, 2019
1 parent 56efda4 commit b5f522148e1256997444f3f7e93553f82a0a35e7
Showing 13 changed files with 996 additions and 0 deletions.
@@ -0,0 +1,43 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The SF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.aries.events.mongo</groupId>
<artifactId>org.apache.aries.events.mongo</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events.api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.8.2</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package org.apache.aries.events.mongo;

import org.slf4j.Logger;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import static org.slf4j.LoggerFactory.getLogger;

/**
* A factory that keeps previously created instances in a cache so that
* they will get reused if requested repeatedly.
* Currently there is no cache size limit implemented so this implementation
* is only good for the use case with limited parameter space.
* @param <K> key type. Serves as cache key as well as an input parameter for the
* factory method. Must provide sensible implementations for
* equals and hashCode methods
* @param <V> result type.
*/
public final class CachingFactory<K, V extends AutoCloseable> implements Closeable {


public static <K2, V2 extends AutoCloseable> CachingFactory<K2, V2> cachingFactory(Function<K2, V2> create) {
return new CachingFactory<K2, V2>(create);
}

/**
* Find or created a value for the specified key
* @param arg key instance
* @return either an existing (cached) value of newly created one.
*/
public synchronized V get(K arg) {
return cache.computeIfAbsent(arg, create);
}

/**
* Clears all cached instances properly disposing them.
*/
public synchronized void clear() {
cache.values().stream()
.forEach(CachingFactory::safeClose);
cache.clear();
}

/**
* Closing this factory properly disposing all cached instances
*/
@Override
public void close() {
clear();
}

//*********************************************
// Private
//*********************************************

private static final Logger LOG = getLogger(CachingFactory.class);
private final Map<K, V> cache = new HashMap<K, V>();
private final Function<K, V> create;

private CachingFactory(Function<K, V> create) {
this.create = create;
}

private static void safeClose(AutoCloseable closable) {
try {
closable.close();
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
}
}


}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package org.apache.aries.events.mongo;

import com.mongodb.client.MongoCollection;
import org.bson.Document;

import static com.mongodb.client.model.Filters.lte;
import static com.mongodb.client.model.Indexes.descending;
import static org.apache.aries.events.mongo.Common.Fields.INDEX;

/**
* Common string definitions
*/
@SuppressWarnings({"HardCodedStringLiteral", "InterfaceNeverImplemented"})
interface Common {

String DEFAULT_DB_NAME = "aem-replication";

/** MongoDB field names */
interface Fields {
String INDEX = "i";
String TIME_STAMP = "t";
String PAYLOAD = "d";
String PROPS = "p";
}

/**
* Returns the next available index in the collection
* @param col collection to check. The collection must contain
* log messages published by a Publisher instance
* @return the index that should be assigned to the next message when
* it gets published
*/
static long upcomingIndex(MongoCollection<Document> col) {
Document doc = col.find(lte(INDEX, Long.MAX_VALUE))
.sort(descending(INDEX))
.first();
if (doc != null) {
long latestAvailable = doc.getLong(INDEX);
return latestAvailable + 1L;
} else {
return 0L;
}
}

}
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package org.apache.aries.events.mongo;

import org.apache.aries.events.api.Message;

public interface MessageReceiver extends AutoCloseable {

/** returns data entry for the specified offset.
* If necessary waits until data is available.
* If data entry at the specified offset has
* been evicted, throws NoSuchElement exception
* @param index an offset to the desired entry
* @return requested data entry together with the
* offset for the next data entry
*/
Message receive(long index) throws InterruptedException;

/** returns the index of the earliest available
* data entry. It also causes the receiver to
* pre-fetch and cache a batch of earliest available
* entries thus giving the user a chance to consume
* them and catch up before they get evicted
* @return an index of the first available data entry or
* 0 if the log is empty
*/
long earliestIndex();

/** returns the index of the next available data
* entry.
* The returned index points to the entry yet to
* be inserted into the log.
* @return index of the data entry that will be
* inserted next. 0 if the log is empty.
*/
long latestIndex();

@Override
void close();

}

0 comments on commit b5f5221

Please sign in to comment.