Skip to content

Commit

Permalink
chore: updated to use semistructured api
Browse files Browse the repository at this point in the history
Signed-off-by: Maximillian Arruda <dearrudam@gmail.com>
  • Loading branch information
dearrudam committed Mar 12, 2024
1 parent 4a56b26 commit f0ee8fb
Show file tree
Hide file tree
Showing 25 changed files with 277 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package org.eclipse.jnosql.databases.dynamodb.communication;

import org.eclipse.jnosql.communication.Settings;
import org.eclipse.jnosql.communication.document.DocumentDeleteQuery;
import org.eclipse.jnosql.communication.document.DocumentEntity;
import org.eclipse.jnosql.communication.document.DocumentQuery;
import org.eclipse.jnosql.communication.semistructured.CommunicationEntity;
import org.eclipse.jnosql.communication.semistructured.DeleteQuery;
import org.eclipse.jnosql.communication.semistructured.SelectQuery;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand Down Expand Up @@ -60,11 +60,11 @@
import static java.util.Objects.requireNonNull;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.entityAttributeName;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.toAttributeValue;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.toDocumentEntity;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.toCommunicationEntity;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.toItem;
import static org.eclipse.jnosql.databases.dynamodb.communication.DynamoDBConverter.toItemUpdate;

public class DefaultDynamoDBDocumentManager implements DynamoDBDocumentManager {
public class DefaultDynamoDBDatabaseManager implements DynamoDBDatabaseManager {

private final String database;

Expand All @@ -76,7 +76,7 @@ public class DefaultDynamoDBDocumentManager implements DynamoDBDocumentManager {

private final ConcurrentHashMap<String, DescribeTableResponse> tables = new ConcurrentHashMap<>();

public DefaultDynamoDBDocumentManager(String database, DynamoDbClient dynamoDbClient, Settings settings) {
public DefaultDynamoDBDatabaseManager(String database, DynamoDbClient dynamoDbClient, Settings settings) {
this.settings = settings;
this.database = database;
this.dynamoDbClient = dynamoDbClient;
Expand All @@ -96,7 +96,7 @@ public String name() {
}

@Override
public DocumentEntity insert(DocumentEntity documentEntity) {
public CommunicationEntity insert(CommunicationEntity documentEntity) {
requireNonNull(documentEntity, "documentEntity is required");
dynamoDbClient().putItem(PutItemRequest.builder()
.tableName(createTableIfNeeded(documentEntity.name()).table().tableName())
Expand Down Expand Up @@ -189,23 +189,23 @@ private String getEntityAttributeName() {
}

@Override
public DocumentEntity insert(DocumentEntity documentEntity, Duration ttl) {
public CommunicationEntity insert(CommunicationEntity documentEntity, Duration ttl) {
requireNonNull(documentEntity, "documentEntity is required");
requireNonNull(ttl, "ttl is required");
documentEntity.add(getTTLAttributeName(documentEntity.name()).get(), Instant.now().plus(ttl).truncatedTo(ChronoUnit.SECONDS));
return insert(documentEntity);
}

@Override
public Iterable<DocumentEntity> insert(Iterable<DocumentEntity> entities) {
public Iterable<CommunicationEntity> insert(Iterable<CommunicationEntity> entities) {
requireNonNull(entities, "entities are required");
return StreamSupport.stream(entities.spliterator(), false)
.map(this::insert)
.toList();
}

@Override
public Iterable<DocumentEntity> insert(Iterable<DocumentEntity> entities, Duration ttl) {
public Iterable<CommunicationEntity> insert(Iterable<CommunicationEntity> entities, Duration ttl) {
requireNonNull(entities, "entities is required");
requireNonNull(ttl, "ttl is required");
return StreamSupport.stream(entities.spliterator(), false)
Expand All @@ -214,7 +214,7 @@ public Iterable<DocumentEntity> insert(Iterable<DocumentEntity> entities, Durati
}

@Override
public DocumentEntity update(DocumentEntity documentEntity) {
public CommunicationEntity update(CommunicationEntity documentEntity) {
requireNonNull(documentEntity, "entity is required");
Map<String, AttributeValue> itemKey = getItemKey(documentEntity);
Map<String, AttributeValueUpdate> attributeUpdates = asItemToUpdate(documentEntity);
Expand All @@ -227,7 +227,7 @@ public DocumentEntity update(DocumentEntity documentEntity) {
return documentEntity;
}

private Map<String, AttributeValue> getItemKey(DocumentEntity documentEntity) {
private Map<String, AttributeValue> getItemKey(CommunicationEntity documentEntity) {
DescribeTableResponse describeTableResponse = this.tables.computeIfAbsent(documentEntity.name(), this::getDescribeTableResponse);
Map<String, AttributeValue> itemKey = describeTableResponse
.table()
Expand All @@ -243,47 +243,48 @@ private Map<String, AttributeValue> getItemKey(DocumentEntity documentEntity) {
return itemKey;
}

private Map<String, AttributeValueUpdate> asItemToUpdate(DocumentEntity documentEntity) {
private Map<String, AttributeValueUpdate> asItemToUpdate(CommunicationEntity documentEntity) {
return toItemUpdate(this::resolveEntityNameAttributeName, documentEntity);
}

@Override
public Iterable<DocumentEntity> update(Iterable<DocumentEntity> entities) {
public Iterable<CommunicationEntity> update(Iterable<CommunicationEntity> entities) {
requireNonNull(entities, "entities is required");
return StreamSupport.stream(entities.spliterator(), false)
.map(this::update)
.toList();
}

@Override
public void delete(DocumentDeleteQuery documentDeleteQuery) {
Objects.requireNonNull(documentDeleteQuery, "documentDeleteQuery is required");
public void delete(DeleteQuery deleteQuery) {
Objects.requireNonNull(deleteQuery, "deleteQuery is required");

List<String> primaryKeys = getDescribeTableResponse(documentDeleteQuery.name())
List<String> primaryKeys = getDescribeTableResponse(deleteQuery.name())
.table()
.keySchema()
.stream()
.map(KeySchemaElement::attributeName).toList();

DocumentQuery.DocumentQueryBuilder selectQueryBuilder = DocumentQuery.builder()

var selectQueryBuilder = SelectQuery.builder()
.select(primaryKeys.toArray(new String[0]))
.from(documentDeleteQuery.name());
.from(deleteQuery.name());

documentDeleteQuery.condition().ifPresent(selectQueryBuilder::where);
deleteQuery.condition().ifPresent(selectQueryBuilder::where);

select(selectQueryBuilder.build()).forEach(
documentEntity ->
dynamoDbClient().deleteItem(DeleteItemRequest.builder()
.tableName(documentDeleteQuery.name())
.tableName(deleteQuery.name())
.key(getItemKey(documentEntity))
.build()));
}

@Override
public Stream<DocumentEntity> select(DocumentQuery documentQuery) {
Objects.requireNonNull(documentQuery, "documentQuery is required");
public Stream<CommunicationEntity> select(SelectQuery query) {
Objects.requireNonNull(query, "query is required");
DynamoDBQuery dynamoDBQuery = DynamoDBQuery
.builderOf(documentQuery.name(), getEntityAttributeName(), documentQuery)
.builderOf(query.name(), getEntityAttributeName(), query)
.get();

ScanRequest.Builder selectRequest = ScanRequest.builder()
Expand All @@ -298,7 +299,7 @@ public Stream<DocumentEntity> select(DocumentQuery documentQuery) {
return StreamSupport
.stream(dynamoDbClient().scanPaginator(selectRequest.build()).spliterator(), false)
.flatMap(scanResponse -> scanResponse.items().stream()
.map(item -> toDocumentEntity(this::resolveEntityNameAttributeName, item)));
.map(item -> toCommunicationEntity(this::resolveEntityNameAttributeName, item)));
}

@Override
Expand All @@ -319,29 +320,24 @@ public void close() {
}

@Override
public Stream<DocumentEntity> partiQL(String query) {
return partiQL(query,new Object[0]);
}

@Override
public Stream<DocumentEntity> partiQL(String query, Object... params) {
public Stream<CommunicationEntity> partiQL(String query, Object... params) {
Objects.requireNonNull(query, "query is required");
List<AttributeValue> parameters = Stream.of(params).map(DynamoDBConverter::toAttributeValue).toList();
ExecuteStatementResponse executeStatementResponse = dynamoDbClient()
.executeStatement(ExecuteStatementRequest.builder()
.statement(query)
.parameters(parameters)
.build());
List<DocumentEntity> result = new LinkedList<>();
executeStatementResponse.items().forEach(item -> result.add(toDocumentEntity(this::resolveEntityNameAttributeName, item)));
List<CommunicationEntity> result = new LinkedList<>();
executeStatementResponse.items().forEach(item -> result.add(toCommunicationEntity(this::resolveEntityNameAttributeName, item)));
while (executeStatementResponse.nextToken() != null) {
executeStatementResponse = dynamoDbClient()
.executeStatement(ExecuteStatementRequest.builder()
.statement(query)
.parameters(parameters)
.nextToken(executeStatementResponse.nextToken())
.build());
executeStatementResponse.items().forEach(item -> result.add(toDocumentEntity(this::resolveEntityNameAttributeName, item)));
executeStatementResponse.items().forEach(item -> result.add(toCommunicationEntity(this::resolveEntityNameAttributeName, item)));
}
return result.stream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package org.eclipse.jnosql.databases.dynamodb.communication;

import org.eclipse.jnosql.communication.document.Document;
import org.eclipse.jnosql.communication.document.DocumentEntity;
import org.eclipse.jnosql.communication.driver.ValueUtil;
import org.eclipse.jnosql.communication.ValueUtil;
import org.eclipse.jnosql.communication.semistructured.CommunicationEntity;
import org.eclipse.jnosql.communication.semistructured.Element;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand Down Expand Up @@ -62,7 +62,7 @@ private static Object convertValue(Object value) {
case L:
return attributeValue.l().stream().map(DynamoDBConverter::convertValue).toList();
case M:
return attributeValue.m().entrySet().stream().map(e -> Document.of(e.getKey(), convertValue(e.getValue()))).toList();
return attributeValue.m().entrySet().stream().map(e -> Element.of(e.getKey(), convertValue(e.getValue()))).toList();
case NUL:
return null;
case BOOL:
Expand All @@ -75,10 +75,10 @@ private static Object convertValue(Object value) {
return value;
}

static Map<String, Object> getMap(UnaryOperator<String> entityNameResolver, DocumentEntity entity) {
static Map<String, Object> getMap(UnaryOperator<String> entityNameResolver, CommunicationEntity entity) {
var nameResolver = Optional.ofNullable(entityNameResolver).orElse(UnaryOperator.identity());
Map<String, Object> jsonObject = new HashMap<>();
entity.documents().forEach(feedJSON(jsonObject));
entity.elements().forEach(feedJSON(jsonObject));
jsonObject.put(entityAttributeName(nameResolver), entity.name());
return jsonObject;
}
Expand All @@ -88,11 +88,11 @@ public static String entityAttributeName(UnaryOperator<String> nameResolver) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static Consumer<Document> feedJSON(Map<String, Object> jsonObject) {
private static Consumer<Element> feedJSON(Map<String, Object> jsonObject) {
return d -> {
Object value = ValueUtil.convert(d.value());
if (value instanceof Document subDocument) {
jsonObject.put(d.name(), singletonMap(subDocument.name(), subDocument.get()));
if (value instanceof Element subElement) {
jsonObject.put(d.name(), singletonMap(subElement.name(), subElement.get()));
} else if (isSudDocument(value)) {
Map<String, Object> subDocument = getMap(value);
jsonObject.put(d.name(), subDocument);
Expand All @@ -114,17 +114,17 @@ private static Map<String, Object> getMap(Object value) {

private static boolean isSudDocument(Object value) {
return value instanceof Iterable && StreamSupport.stream(Iterable.class.cast(value).spliterator(), false).
allMatch(org.eclipse.jnosql.communication.document.Document.class::isInstance);
allMatch(Element.class::isInstance);
}

private static boolean isSudDocumentList(Object value) {
return value instanceof Iterable && StreamSupport.stream(Iterable.class.cast(value).spliterator(), false).
allMatch(d -> d instanceof Iterable && isSudDocument(d));
}

public static Map<String, AttributeValue> toItem(UnaryOperator<String> entityNameResolver, DocumentEntity documentEntity) {
public static Map<String, AttributeValue> toItem(UnaryOperator<String> entityNameResolver, CommunicationEntity entity) {
UnaryOperator<String> resolver = Optional.ofNullable(entityNameResolver).orElse(UnaryOperator.identity());
Map<String, Object> documentAttributes = getMap(resolver, documentEntity);
Map<String, Object> documentAttributes = getMap(resolver, entity);
return toItem(documentAttributes);
}

Expand Down Expand Up @@ -161,15 +161,15 @@ public static AttributeValue toAttributeValue(Object value) {
if (value instanceof InputStream input) {
return AttributeValue.builder().b(SdkBytes.fromInputStream(input)).build();
}
if (value instanceof Document document) {
return toAttributeValue(getMap(document));
if (value instanceof Element element) {
return toAttributeValue(getMap(element));
}
return AttributeValue.builder().s(String.valueOf(value)).build();
}

public static Map<String, AttributeValueUpdate> toItemUpdate(UnaryOperator<String> entityNameResolver, DocumentEntity documentEntity) {
public static Map<String, AttributeValueUpdate> toItemUpdate(UnaryOperator<String> entityNameResolver, CommunicationEntity entity) {
UnaryOperator<String> resolver = Optional.ofNullable(entityNameResolver).orElse(UnaryOperator.identity());
Map<String, Object> documentAttributes = getMap(resolver, documentEntity);
Map<String, Object> documentAttributes = getMap(resolver, entity);
return toItemUpdate(documentAttributes);
}

Expand All @@ -189,7 +189,7 @@ public static AttributeValueUpdate toAttributeValueUpdate(Object value) {
}


public static DocumentEntity toDocumentEntity(UnaryOperator<String> entityNameResolver, Map<String, AttributeValue> item) {
public static CommunicationEntity toCommunicationEntity(UnaryOperator<String> entityNameResolver, Map<String, AttributeValue> item) {
if (item == null) {
return null;
}
Expand All @@ -199,11 +199,11 @@ public static DocumentEntity toDocumentEntity(UnaryOperator<String> entityNameRe
UnaryOperator<String> resolver = Optional.ofNullable(entityNameResolver).orElse(UnaryOperator.identity());
String entityAttribute = resolver.apply(ENTITY);
var entityName = item.containsKey(entityAttribute) ? item.get(entityAttribute).s() : entityAttribute;
List<Document> documents = item.entrySet()
var elements = item.entrySet()
.stream()
.filter(entry -> !Objects.equals(entityAttribute, entry.getKey()))
.map(entry -> Document.of(entry.getKey(), convertValue(entry.getValue())))
.map(entry -> Element.of(entry.getKey(), convertValue(entry.getValue())))
.toList();
return DocumentEntity.of(entityName, documents);
return CommunicationEntity.of(entityName, elements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,27 @@

package org.eclipse.jnosql.databases.dynamodb.communication;

import org.eclipse.jnosql.communication.document.DocumentEntity;
import org.eclipse.jnosql.communication.document.DocumentManager;
import org.eclipse.jnosql.communication.semistructured.CommunicationEntity;
import org.eclipse.jnosql.communication.semistructured.DatabaseManager;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

import java.util.stream.Stream;

/**
* A document manager interface for DynamoDB database operations.
*/
public interface DynamoDBDocumentManager extends DocumentManager {

/**
* DynamoDB supports a limited subset of
* <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html">PartiQL</a>.
* This method executes a PartiQL query and returns a stream of DocumentEntity objects.
*
* @param query the PartiQL query
* @return a {@link Stream} of {@link DocumentEntity} representing the query result
* @throws NullPointerException when the query is null
*/
Stream<DocumentEntity> partiQL(String query);
public interface DynamoDBDatabaseManager extends DatabaseManager {

/**
* DynamoDB supports a limited subset of <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html">PartiQL</a>.
* This method executes a PartiQL query with parameters and returns a stream of DocumentEntity objects.
* This method executes a PartiQL query with parameters and returns a stream of CommunicationEntity objects.
* <p>Example query: {@code SELECT * FROM users WHERE status = ?}</p>
*
* @param query the PartiQL query
* @return a {@link Stream} of {@link DocumentEntity} representing the query result
* @return a {@link Stream} of {@link CommunicationEntity} representing the query result
* @throws NullPointerException when the query is null
*/
Stream<DocumentEntity> partiQL(String query, Object... params);
Stream<CommunicationEntity> partiQL(String query, Object... params);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
package org.eclipse.jnosql.databases.dynamodb.communication;

import org.eclipse.jnosql.communication.Settings;
import org.eclipse.jnosql.communication.document.DocumentManagerFactory;
import org.eclipse.jnosql.communication.semistructured.DatabaseManagerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

import java.util.Optional;

public class DynamoDBDocumentManagerFactory implements DocumentManagerFactory {
public class DynamoDBDatabaseManagerFactory implements DatabaseManagerFactory {

private final DynamoDbClient dynamoDB;
private final Settings settings;

public DynamoDBDocumentManagerFactory(DynamoDbClient dynamoDB, Settings settings) {
public DynamoDBDatabaseManagerFactory(DynamoDbClient dynamoDB, Settings settings) {
this.dynamoDB = dynamoDB;
this.settings = settings;
}

@Override
public DynamoDBDocumentManager apply(String database) {
return new DefaultDynamoDBDocumentManager(database, dynamoDB, settings);
public DynamoDBDatabaseManager apply(String database) {
return new DefaultDynamoDBDatabaseManager(database, dynamoDB, settings);
}

@Override
Expand Down

0 comments on commit f0ee8fb

Please sign in to comment.