Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2964] Fixing aws lock configs to inherit from HoodieConfig #4258

Merged
merged 1 commit into from Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,7 +42,7 @@
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.AWSLockConfiguration;
import org.apache.hudi.config.DynamoDbBasedLockConfig;
import org.apache.hudi.exception.HoodieLockException;

import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -80,8 +80,8 @@ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, fina
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
this.tableName = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key());
this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key());
this.tableName = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key());
this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
if (dynamoDB == null) {
dynamoDB = getDynamoDBClient();
Expand Down Expand Up @@ -155,7 +155,7 @@ public LockItem getLock() {
}

private AmazonDynamoDB getDynamoDBClient() {
String region = this.lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key());
String region = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key());
String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
Expand All @@ -166,7 +166,7 @@ private AmazonDynamoDB getDynamoDBClient() {
}

private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) {
String billingMode = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key());
String billingMode = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key());
KeySchemaElement partitionKeyElement = new KeySchemaElement();
partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME);
partitionKeyElement.setKeyType(KeyType.HASH);
Expand All @@ -182,14 +182,14 @@ private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName
createTableRequest.setBillingMode(billingMode);
if (billingMode.equals(BillingMode.PROVISIONED.name())) {
createTableRequest.setProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key())))
.withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
.withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key())))
.withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
}
dynamoDB.createTable(createTableRequest);

LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active");
try {
TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
} catch (TableUtils.TableNeverTransitionedToStateException e) {
throw new HoodieLockException("Created dynamoDB table never transits to active", e);
} catch (InterruptedException e) {
Expand All @@ -199,14 +199,14 @@ private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName
}

private void checkRequiredProps(final LockConfiguration config) {
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
}

private String generateLogSuffixString() {
Expand Down
Expand Up @@ -18,7 +18,10 @@

package org.apache.hudi.config;

import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;

Expand All @@ -27,7 +30,15 @@

import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;

public class AWSLockConfiguration {
/**
* Hoodie Configs for Locks.
*/
@ConfigClassProperty(name = "DynamoDB based Locks Configurations",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configs that control DynamoDB based locking mechanisms required for concurrency control "
+ " between writers to a Hudi table. Concurrency between Hudi's own table services "
+ " are auto managed internally.")
public class DynamoDbBasedLockConfig extends HoodieConfig {

// configs for DynamoDb based locks
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb.";
Expand Down
Expand Up @@ -29,12 +29,12 @@
import java.util.Properties;
import javax.annotation.concurrent.Immutable;

import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_REGION;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME;
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;

/**
* Configurations used by the AWS credentials and AWS DynamoDB based lock.
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.config.AWSLockConfiguration;
import org.apache.hudi.config.DynamoDbBasedLockConfig;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -54,20 +54,20 @@ public class ITTestDynamoDBBasedLockProvider {
@BeforeAll
public static void setup() throws InterruptedException {
Properties properties = new Properties();
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
// properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX);
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key(), REGION);
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), REGION);
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
lockConfiguration = new LockConfiguration(properties);
dynamoDb = getDynamoClientWithLocalEndpoint();
}

@Test
public void testAcquireLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
Expand All @@ -76,7 +76,7 @@ public void testAcquireLock() {

@Test
public void testUnlock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
Expand All @@ -87,7 +87,7 @@ public void testUnlock() {

@Test
public void testReentrantLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
Expand All @@ -98,7 +98,7 @@ public void testReentrantLock() {

@Test
public void testUnlockWithoutLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
dynamoDbBasedLockProvider.unlock();
}
Expand Down