Skip to content

Commit

Permalink
#325: Add handeling for concurrency modification exception of product…
Browse files Browse the repository at this point in the history
… type sync.
  • Loading branch information
heshamMassoud committed Dec 3, 2018
1 parent 6bb20cc commit 13822fe
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 29 deletions.
110 changes: 81 additions & 29 deletions src/main/java/com/commercetools/sync/producttypes/ProductTypeSync.java
Expand Up @@ -14,6 +14,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -32,9 +33,10 @@
* This class syncs product type drafts with the corresponding product types in the CTP project.
*/
public class ProductTypeSync extends BaseSync<ProductTypeDraft, ProductTypeSyncStatistics, ProductTypeSyncOptions> {
private static final String CTP_PRODUCT_TYPE_FETCH_FAILED = "Failed to fetch existing product types of keys '%s'.";
private static final String CTP_PRODUCT_TYPE_UPDATE_FAILED = "Failed to update product type of key '%s'.";
private static final String CTP_PRODUCT_TYPE_CREATE_FAILED = "Failed to create product type of key '%s'.";
private static final String CTP_PRODUCT_TYPE_FETCH_FAILED = "Failed to fetch existing product types with keys:"
+ " '%s'.";
private static final String CTP_PRODUCT_TYPE_UPDATE_FAILED = "Failed to update product type with key: '%s'."
+ " Reason: %s";
private static final String PRODUCT_TYPE_DRAFT_HAS_NO_KEY = "Failed to process product type draft without key.";
private static final String PRODUCT_TYPE_DRAFT_IS_NULL = "Failed to process null product type draft.";

Expand Down Expand Up @@ -191,7 +193,7 @@ private CompletionStage<Void> syncBatch(
final ProductType oldProductType = oldProductTypeMap.get(newProductType.getKey());

return ofNullable(oldProductType)
.map(productType -> updateProductType(oldProductType, newProductType))
.map(productType -> buildActionsAndUpdate(oldProductType, newProductType))
.orElseGet(() -> applyCallbackAndCreate(newProductType));
})
.map(CompletionStage::toCompletableFuture)
Expand All @@ -206,22 +208,40 @@ private CompletionStage<Void> syncBatch(
* @return a {@link CompletionStage} which contains an empty result after execution of the create.
*/
@Nonnull
private CompletionStage<Void> applyCallbackAndCreate(
private CompletionStage<Optional<ProductType>> applyCallbackAndCreate(
@Nonnull final ProductTypeDraft productTypeDraft) {

return syncOptions
.applyBeforeCreateCallBack(productTypeDraft)
.map(draft -> productTypeService
.createProductType(draft)
.thenAccept(productTypeOptional -> {
.thenApply(productTypeOptional -> {
if (productTypeOptional.isPresent()) {
statistics.incrementCreated();
} else {
statistics.incrementFailed();
}
return productTypeOptional;
})
)
.orElse(CompletableFuture.completedFuture(null));
.orElse(CompletableFuture.completedFuture(Optional.empty()));
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") // https://github.com/findbugsproject/findbugs/issues/79
private CompletionStage<Optional<ProductType>> buildActionsAndUpdate(
@Nonnull final ProductType oldProductType,
@Nonnull final ProductTypeDraft newProductType) {

final List<UpdateAction<ProductType>> updateActions = buildActions(oldProductType, newProductType, syncOptions);

final List<UpdateAction<ProductType>> updateActionsAfterCallback =
syncOptions.applyBeforeUpdateCallBack(updateActions, newProductType, oldProductType);

if (!updateActionsAfterCallback.isEmpty()) {
return updateProductType(oldProductType, newProductType, updateActionsAfterCallback);
}

return completedFuture(null);
}

/**
Expand All @@ -238,29 +258,61 @@ private CompletionStage<Void> applyCallbackAndCreate(
* @param newProductType draft containing data that could differ from data in {@code oldProductType}.
* @return a {@link CompletionStage} which contains an empty result after execution of the update.
*/
@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") // https://github.com/findbugsproject/findbugs/issues/79
private CompletionStage<Void> updateProductType(@Nonnull final ProductType oldProductType,
@Nonnull final ProductTypeDraft newProductType) {

final List<UpdateAction<ProductType>> updateActions = buildActions(oldProductType, newProductType, syncOptions);

final List<UpdateAction<ProductType>> updateActionsAfterCallback = syncOptions.applyBeforeUpdateCallBack(
updateActions,
newProductType,
oldProductType
);

if (!updateActionsAfterCallback.isEmpty()) {
return productTypeService.updateProductType(oldProductType, updateActionsAfterCallback)
.thenAccept(updatedProductType -> statistics.incrementUpdated())
.exceptionally(exception -> {
final String errorMessage = format(CTP_PRODUCT_TYPE_UPDATE_FAILED, newProductType.getKey());
handleError(errorMessage, exception, 1);
private CompletionStage<Optional<ProductType>> updateProductType(@Nonnull final ProductType oldProductType,
@Nonnull final ProductTypeDraft newProductType,
@Nonnull final List<UpdateAction<ProductType>> updateActions) {

return productTypeService
.updateProductType(oldProductType, updateActions)
.handle(ImmutablePair::new)
.thenCompose(updateResponse -> {
final ProductType updatedProductType = updateResponse.getKey();
final Throwable sphereException = updateResponse.getValue();
if (sphereException != null) {
return executeSupplierIfConcurrentModificationException(
sphereException,
() -> fetchAndUpdate(oldProductType, newProductType),
() -> {
final String errorMessage =
format(CTP_PRODUCT_TYPE_UPDATE_FAILED, newProductType.getKey(),
sphereException.getMessage());
handleError(errorMessage, sphereException, 1);
return CompletableFuture.completedFuture(Optional.empty());
});
} else {
statistics.incrementUpdated();
return CompletableFuture.completedFuture(Optional.of(updatedProductType));
}
});
}

return null;
private CompletionStage<Optional<ProductType>> fetchAndUpdate(@Nonnull final ProductType oldProductType,
@Nonnull final ProductTypeDraft newProductType) {
final String key = oldProductType.getKey();
return productTypeService
.fetchProductType(key)
.handle(ImmutablePair::new)
.thenCompose(fetchResponse -> {
final Optional<ProductType> fetchedProductTypeOptional = fetchResponse.getKey();
final Throwable exception = fetchResponse.getValue();

if (exception != null) {
final String errorMessage = format(CTP_PRODUCT_TYPE_UPDATE_FAILED, key,
"Failed to fetch from CTP while retrying after concurrency modification.");
handleError(errorMessage, exception, 1);
return CompletableFuture.completedFuture(null);
}

return fetchedProductTypeOptional
.map(fetchedProductType -> buildActionsAndUpdate(fetchedProductType, newProductType))
.orElseGet(() -> {
final String errorMessage =
format(CTP_PRODUCT_TYPE_UPDATE_FAILED, key,
"Not found when attempting to fetch while retrying "
+ "after concurrency modification.");
handleError(errorMessage, null, 1);
return CompletableFuture.completedFuture(null);
});
}

return completedFuture(null);
});
}
}
@@ -1,11 +1,13 @@
package com.commercetools.sync.services;

import com.commercetools.sync.products.AttributeMetaData;
import io.sphere.sdk.client.SphereClient;
import io.sphere.sdk.commands.UpdateAction;
import io.sphere.sdk.producttypes.ProductType;
import io.sphere.sdk.producttypes.ProductTypeDraft;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -105,4 +107,16 @@ CompletionStage<Optional<Map<String, AttributeMetaData>>> fetchCachedProductAttr
@Nonnull
CompletionStage<ProductType> updateProductType(@Nonnull final ProductType productType,
@Nonnull final List<UpdateAction<ProductType>> updateActions);

/**
* Given a productType key, this method fetches a productType that matches this given key in the CTP project defined
* in a potentially injected {@link SphereClient}. If there is no matching productType an empty {@link Optional}
* will be returned in the returned future.
*
* @param key the key of the category to fetch.
* @return {@link CompletionStage}&lt;{@link Optional}&gt; in which the result of it's completion contains an
* {@link Optional} that contains the matching {@link ProductType} if exists, otherwise empty.
*/
@Nonnull
CompletionStage<Optional<ProductType>> fetchProductType(@Nullable final String key);
}
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@

import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
import static org.apache.commons.lang3.StringUtils.isBlank;

public final class ProductTypeServiceImpl
extends BaseService<ProductTypeDraft, ProductType, BaseSyncOptions> implements ProductTypeService {
Expand Down Expand Up @@ -142,4 +144,27 @@ public CompletionStage<ProductType> updateProductType(

return updateResource(productType, ProductTypeUpdateCommand::of, updateActions);
}

@Nonnull
@Override
public CompletionStage<Optional<ProductType>> fetchProductType(@Nullable final String key) {

if (isBlank(key)) {
return CompletableFuture.completedFuture(Optional.empty());
}

final ProductTypeQuery productTypeQuery =
ProductTypeQuery.of().plusPredicates(queryModel -> queryModel.key().is(key));

return syncOptions
.getCtpClient()
.execute(productTypeQuery)
.thenApply(productTypePagedQueryResult ->
productTypePagedQueryResult
.head()
.map(productType -> {
keyToIdCache.put(productType.getKey(), productType.getId());
return productType;
}));
}
}

0 comments on commit 13822fe

Please sign in to comment.