Skip to content

Commit

Permalink
1. The fix is wrong, no need to allow global policy changes to affect…
Browse files Browse the repository at this point in the history
… records exists action

2. Refactor and cleanup (use generics instead of Objects)
3. Add insert() method for both reactive and none-reactive flows and add integration tests
  • Loading branch information
roimenashe committed Jun 19, 2024
1 parent 4533700 commit fa903f9
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 197 deletions.
77 changes: 44 additions & 33 deletions src/main/java/com/aerospike/mapper/tools/AeroMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.aerospike.mapper.tools.converters.MappingConverter;
import com.aerospike.mapper.tools.utils.MapperUtils;
import com.aerospike.mapper.tools.virtuallist.VirtualList;
import reactor.core.publisher.Mono;

public class AeroMapper implements IAeroMapper {

Expand All @@ -52,50 +53,30 @@ public Builder(IAerospikeClient client) {

}
}

@Override
public void save(@NotNull Object... objects) throws AerospikeException {
for (Object thisObject : objects) {
public <T> void save(@NotNull T... objects) throws AerospikeException {
for (T thisObject : objects) {
this.save(thisObject);
}
}

@Override
public void save(@NotNull Object object, String... binNames) throws AerospikeException {
save(null, object, RecordExistsAction.REPLACE, binNames);
public <T> void save(@NotNull T object, String... binNames) throws AerospikeException {
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.REPLACE;
save(writePolicy, object, binNames);
}

@SuppressWarnings("unchecked")
@Override
public void save(@NotNull WritePolicy writePolicy, @NotNull Object object, String... binNames)
public <T> void save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames)
throws AerospikeException {
save(writePolicy, object, null, binNames);
}

@SuppressWarnings("unchecked")
private <T> void save(WritePolicy writePolicy, @NotNull T object, RecordExistsAction recordExistsAction,
String[] binNames) {
Class<T> clazz = (Class<T>) object.getClass();
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
if (writePolicy == null) {
writePolicy = new WritePolicy(entry.getWritePolicy());
if (recordExistsAction != null && (writePolicy.recordExistsAction == null || writePolicy.recordExistsAction == RecordExistsAction.UPDATE)) {
// Override the default with the passed policy. Only do this if the policy is already at the default.
// Otherwise, "save" with an INSERT_ONLY policy would fail for example.
writePolicy.recordExistsAction = recordExistsAction;
}

// #132 -- Ensure that if an overriding TTL / sendkey is passed in the policy it
// is NOT overwritten. Hence
// only if the policy is null do we override these settings.
Integer ttl = entry.getTtl();
Boolean sendKey = entry.getSendKey();

if (ttl != null) {
writePolicy.expiration = ttl;
}
if (sendKey != null) {
writePolicy.sendKey = sendKey;
}
if (writePolicy == null) {
writePolicy = generateWritePolicyFromObject(object);
}

String set = entry.getSetName();
Expand All @@ -111,8 +92,38 @@ private <T> void save(WritePolicy writePolicy, @NotNull T object, RecordExistsAc
}

@Override
public void update(@NotNull Object object, String... binNames) throws AerospikeException {
save(null, object, RecordExistsAction.UPDATE, binNames);
public <T> void insert(@NotNull T object, String... binNames) {
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
save(writePolicy, object, binNames);
}

@Override
public <T> void update(@NotNull T object, String... binNames) throws AerospikeException {
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
save(writePolicy, object, binNames);
}

@SuppressWarnings("unchecked")
private <T> WritePolicy generateWritePolicyFromObject(T object) {
Class<T> clazz = (Class<T>) object.getClass();
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);

WritePolicy writePolicy = new WritePolicy(entry.getWritePolicy());

// #132 -- Ensure that if an overriding TTL / sendKey is passed in the policy it
// is NOT overwritten. Hence, only if the policy is null do we override these settings.
Integer ttl = entry.getTtl();
Boolean sendKey = entry.getSendKey();

if (ttl != null) {
writePolicy.expiration = ttl;
}
if (sendKey != null) {
writePolicy.sendKey = sendKey;
}
return writePolicy;
}

@Override
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/com/aerospike/mapper/tools/IAeroMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
* @param objects One or two objects to save.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
void save(@NotNull Object... objects);
<T> void save(@NotNull T... objects);

/**
* Save an object in the database. This method will perform a REPLACE on the existing record so any existing
Expand All @@ -42,7 +42,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
* @param object The object to save.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
void save(@NotNull Object object, String... binNames);
<T> void save(@NotNull T object, String... binNames);

/**
* Save an object in the database with the given WritePolicy. This write policy will override any other set writePolicy so
Expand All @@ -52,7 +52,16 @@ public interface IAeroMapper extends IBaseAeroMapper {
* @param object The object to save.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
void save(@NotNull WritePolicy writePolicy, @NotNull Object object, String... binNames);
<T> void save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames);

/**
* Insert an object to the database This uses the RecordExistsAction
* of CREATE_ONLY. If bins are specified, only bins with the passed names will be inserted (or all of them if null is passed)
*
* @param object The object to insert.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
<T> void insert(@NotNull T object, String... binNames);

/**
* Updates the object in the database, merging the record with the existing record. This uses the RecordExistsAction
Expand All @@ -61,7 +70,7 @@ public interface IAeroMapper extends IBaseAeroMapper {
* @param object The object to update.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
void update(@NotNull Object object, String... binNames);
<T> void update(@NotNull T object, String... binNames);

/**
* Read a record from the repository and map it to an instance of the passed class, by providing a digest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ public interface IReactiveAeroMapper extends IBaseAeroMapper {
*/
<T> Mono<T> save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames);

/**
* Insert an object to the database This uses the RecordExistsAction
* of CREATE_ONLY. If bins are specified, only bins with the passed names will be inserted (or all of them if null is passed)
*
* @param object The object to insert.
* @throws AerospikeException an AerospikeException will be thrown in case of an error.
*/
<T> Mono<T> insert(@NotNull T object, String... binNames);

/**
* Updates the object in the database, merging the record with the existing record. This uses the RecordExistsAction
* of UPDATE. If bins are specified, only bins with the passed names will be updated (or all of them if null is passed)
Expand Down
61 changes: 37 additions & 24 deletions src/main/java/com/aerospike/mapper/tools/ReactiveAeroMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,19 @@ public <T> Flux<T> save(@NotNull T... objects) {

@Override
public <T> Mono<T> save(@NotNull T object, String... binNames) {
return save(null, object, RecordExistsAction.REPLACE, binNames);
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.REPLACE;
return save(writePolicy, object, binNames);
}

@SuppressWarnings("unchecked")
@Override
public <T> Mono<T> save(@NotNull WritePolicy writePolicy, @NotNull T object, String... binNames) {
return save(writePolicy, object, null, binNames);
}

@SuppressWarnings("unchecked")
private <T> Mono<T> save(WritePolicy writePolicy, @NotNull T object, RecordExistsAction recordExistsAction, String[] binNames) {
Class<T> clazz = (Class<T>) object.getClass();
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);
if (writePolicy == null) {
writePolicy = new WritePolicy(entry.getWritePolicy());
if (recordExistsAction != null && (writePolicy.recordExistsAction == null || writePolicy.recordExistsAction == RecordExistsAction.UPDATE)) {
// Override the default with the passed policy. Only do this if the policy is already at the default.
// Otherwise, "save" with an INSERT_ONLY policy would fail for example.
writePolicy.recordExistsAction = recordExistsAction;
}

// #132 -- Only override the TTL / send key if the policy was not passed in.
Integer ttl = entry.getTtl();
Boolean sendKey = entry.getSendKey();

if (ttl != null) {
writePolicy.expiration = ttl;
}
if (sendKey != null) {
writePolicy.sendKey = sendKey;
}
if (writePolicy == null) {
writePolicy = generateWritePolicyFromObject(object);
}

String set = entry.getSetName();
Expand All @@ -106,9 +89,39 @@ private <T> Mono<T> save(WritePolicy writePolicy, @NotNull T object, RecordExist
.map(docKey -> object);
}

@Override
public <T> Mono<T> insert(@NotNull T object, String... binNames) {
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
return save(writePolicy, object, binNames);
}

@Override
public <T> Mono<T> update(@NotNull T object, String... binNames) {
return save(null, object, RecordExistsAction.UPDATE, binNames);
WritePolicy writePolicy = generateWritePolicyFromObject(object);
writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
return save(writePolicy, object, binNames);
}

@SuppressWarnings("unchecked")
private <T> WritePolicy generateWritePolicyFromObject(T object) {
Class<T> clazz = (Class<T>) object.getClass();
ClassCacheEntry<T> entry = MapperUtils.getEntryAndValidateNamespace(clazz, this);

WritePolicy writePolicy = new WritePolicy(entry.getWritePolicy());

// #132 -- Ensure that if an overriding TTL / sendKey is passed in the policy it
// is NOT overwritten. Hence, only if the policy is null do we override these settings.
Integer ttl = entry.getTtl();
Boolean sendKey = entry.getSendKey();

if (ttl != null) {
writePolicy.expiration = ttl;
}
if (sendKey != null) {
writePolicy.sendKey = sendKey;
}
return writePolicy;
}

@Override
Expand Down
71 changes: 0 additions & 71 deletions src/test/java/com/aerospike/mapper/InsertOnlyModeTest.java

This file was deleted.

62 changes: 62 additions & 0 deletions src/test/java/com/aerospike/mapper/InsertOnlyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.aerospike.mapper;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.mapper.annotations.AerospikeKey;
import com.aerospike.mapper.annotations.AerospikeRecord;
import com.aerospike.mapper.tools.AeroMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class InsertOnlyTest extends AeroMapperBaseTest {

@AerospikeRecord(namespace = "test", set = "testSet")
public static class DataClass {
@AerospikeKey
int a;
int b;
int c;
int d;
int e;
}

@BeforeEach
public void setup() {
client.delete(null, new Key("test", "testSet", 1));
}

@Test
public void testInsertOnly() {
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.totalTimeout = 2000;
writePolicy.socketTimeout = 100;
AeroMapper mapper = new AeroMapper.Builder(client)
.withWritePolicy(writePolicy).forClasses(DataClass.class)
.build();

DataClass dataClass = new DataClass();
dataClass.a = 1;
dataClass.b = 2;
dataClass.c = 3;
dataClass.d = 4;
dataClass.e = 5;

// Insert
mapper.insert(dataClass);

Key key = new Key("test", "testSet", 1);
Record record = client.get(null, key);
assertEquals(5, record.bins.size());
assertEquals(3, record.getInt("c"));

// Try to insert again and get an exception
dataClass.c = 9;
dataClass.e = 11;
assertThrows(AerospikeException.class, () -> mapper.insert(dataClass, "a", "c", "e"));
}
}
Loading

0 comments on commit fa903f9

Please sign in to comment.