Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,34 @@ pramen {
}
```

### (experimental) DynamoDB database
Here is how you can use a DynamoDB database for storing bookkeeping information:

```hocon
pramen {
bookkeeping.enabled = "true"

bookkeeping.dynamodb {
region = "af-south-1"
table.prefix = "pramen_uat"
}
}
```

DynamoDB tables are automatically created if they don't exist with default options. Use the prefix to create multiple
Pramen bookeeping environments per AWS account.

Note that the Pramen project that uses DynamoDB for bookeeping needs to add DynamoDB as a dependency if it is not provided
by the Spark cluster (e.g. EMR).

```xml
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
```
Comment on lines +2592 to +2598
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the Maven example copy-pasteable.

${aws.sdk.version} is defined in this repository’s POM, not in a downstream user’s project, so the snippet on Line 2596 will fail as pasted. Either inline the version here or show the matching <properties> block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 2592 - 2598, The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.


### Hadoop (CSV+JSON)
This is less recommended way, and is quite slow. But the advantage is that you don't need a database.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.api

import java.time.LocalDate

trait MetadataManager {
trait MetadataManager extends AutoCloseable {
/**
* Get metadata value for a given table, date and key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
*/
package za.co.absa.pramen.api.lock

trait TokenLockFactory {
trait TokenLockFactory extends AutoCloseable {
def getLock(token: String): TokenLock

override def close(): Unit = {}
}
7 changes: 7 additions & 0 deletions pramen/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@
<artifactId>channel_scala_${scala.compat.version}</artifactId>
</dependency>

<!-- AWS SDK for DynamoDB -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<scope>provided</scope>
</dependency>
Comment thread
coderabbitai[bot] marked this conversation as resolved.

<!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.metastore.Metastore

trait AppContext {
trait AppContext extends AutoCloseable {
val appConfig: AppConfig

def bookkeeper: Bookkeeper
Expand All @@ -31,6 +31,4 @@ trait AppContext {
def journal: Journal

def metastore: Metastore

def close(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ case class BookkeeperConfig(
bookkeepingJdbcConfig: Option[JdbcConfig],
deltaDatabase: Option[String],
deltaTablePrefix: Option[String],
temporaryDirectory: Option[String]
temporaryDirectory: Option[String],
dynamoDbRegion: Option[String],
dynamoDbTableArn: Option[String],
dynamoDbTablePrefix: Option[String]
)

object BookkeeperConfig {
Expand All @@ -44,6 +47,9 @@ object BookkeeperConfig {
val BOOKKEEPING_DB_NAME = "pramen.bookkeeping.mongodb.database"
val BOOKKEEPING_DELTA_DB_NAME = "pramen.bookkeeping.delta.database"
val BOOKKEEPING_DELTA_TABLE_PREFIX = "pramen.bookkeeping.delta.table.prefix"
val BOOKKEEPING_DYNAMODB_REGION = "pramen.bookkeeping.dynamodb.region"
val BOOKKEEPING_DYNAMODB_TABLE_ARN = "pramen.bookkeeping.dynamodb.table.arn"
val BOOKKEEPING_DYNAMODB_TABLE_PREFIX = "pramen.bookkeeping.dynamodb.table.prefix"
val BOOKKEEPING_TEMPORARY_DIRECTORY_KEY = "pramen.temporary.directory"

def fromConfig(conf: Config, allowLocalBookkepingStorage: Boolean = false): BookkeeperConfig = {
Expand All @@ -56,14 +62,17 @@ object BookkeeperConfig {
val temporaryDirectory = ConfigUtils.getOptionString(conf, BOOKKEEPING_TEMPORARY_DIRECTORY_KEY)
val deltaDatabase = ConfigUtils.getOptionString(conf, BOOKKEEPING_DELTA_DB_NAME)
val deltaTablePrefix = ConfigUtils.getOptionString(conf, BOOKKEEPING_DELTA_TABLE_PREFIX)
val dynamoDbRegion = ConfigUtils.getOptionString(conf, BOOKKEEPING_DYNAMODB_REGION)
val dynamoDbTableArn = ConfigUtils.getOptionString(conf, BOOKKEEPING_DYNAMODB_TABLE_ARN)
val dynamoDbTablePrefix = ConfigUtils.getOptionString(conf, BOOKKEEPING_DYNAMODB_TABLE_PREFIX)

if (bookkeepingEnabled && bookkeepingJdbcConfig.isEmpty && bookkeepingHadoopFormat == HadoopFormat.Delta) {
if (bookkeepingLocation.isEmpty && deltaTablePrefix.isEmpty) {
throw new RuntimeException(s"In order to use Delta Lake for bookkeeping, either $BOOKKEEPING_LOCATION or $BOOKKEEPING_DELTA_TABLE_PREFIX must be defined. " +
s"Preferably $BOOKKEEPING_DELTA_DB_NAME should be defined as well for managed Delta Lake tables.")
}
} else {
if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty) {
if (bookkeepingEnabled && bookkeepingConnectionString.isEmpty && bookkeepingLocation.isEmpty && bookkeepingJdbcConfig.isEmpty && dynamoDbRegion.isEmpty) {
if (allowLocalBookkepingStorage) {
log.warn("Bookkeeping configuration is missing. Using the default SQLite database 'pramen.sqlite'")
return BookkeeperConfig(
Expand All @@ -78,17 +87,24 @@ object BookkeeperConfig {
)),
None,
None,
temporaryDirectory
temporaryDirectory,
None,
None,
None
)
} else {
throw new RuntimeException(s"One of the following should be defined: $BOOKKEEPING_PARENT.jdbc.url, $BOOKKEEPING_CONNECTION_STRING or $BOOKKEEPING_LOCATION" +
throw new RuntimeException(s"One of the following should be defined: $BOOKKEEPING_PARENT.jdbc.url, $BOOKKEEPING_CONNECTION_STRING, $BOOKKEEPING_DYNAMODB_REGION, or $BOOKKEEPING_LOCATION" +
s" when bookkeeping is enabled. You can disable bookkeeping by setting $BOOKKEEPING_ENABLED = false.")
}
}

if (bookkeepingConnectionString.isDefined && bookkeepingDbName.isEmpty) {
throw new RuntimeException(s"Database name is not defined. Please, define $BOOKKEEPING_DB_NAME.")
}

if (dynamoDbRegion.isDefined && dynamoDbTablePrefix.isEmpty) {
log.warn(s"DynamoDB table prefix is not defined. Using default prefix 'pramen'. You can define it with $BOOKKEEPING_DYNAMODB_TABLE_PREFIX.")
}
}

BookkeeperConfig(
Expand All @@ -100,7 +116,10 @@ object BookkeeperConfig {
bookkeepingJdbcConfig,
deltaDatabase,
deltaTablePrefix,
temporaryDirectory
temporaryDirectory,
dynamoDbRegion,
dynamoDbTableArn,
dynamoDbTablePrefix
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.app.config.{BookkeeperConfig, HadoopFormat, Runtim
import za.co.absa.pramen.core.bookkeeper.model.DataAvailability
import za.co.absa.pramen.core.journal._
import za.co.absa.pramen.core.lock._
import za.co.absa.pramen.core.metadata.{MetadataManagerJdbc, MetadataManagerNull}
import za.co.absa.pramen.core.metadata.{MetadataManagerDynamoDb, MetadataManagerJdbc, MetadataManagerNull}
import za.co.absa.pramen.core.model.DataChunk
import za.co.absa.pramen.core.mongo.MongoDbConnection
import za.co.absa.pramen.core.rdb.PramenDb
Expand Down Expand Up @@ -90,6 +90,7 @@ object Bookkeeper {
}

val hasBookkeepingJdbc = bookkeepingConfig.bookkeepingJdbcConfig.exists(_.primaryUrl.isDefined)
val hasBookkeepingDynamoDb = bookkeepingConfig.dynamoDbRegion.isDefined

val dbOpt = if (hasBookkeepingJdbc) {
val jdbcConfig = bookkeepingConfig.bookkeepingJdbcConfig.get
Expand All @@ -101,6 +102,14 @@ object Bookkeeper {
if (hasBookkeepingJdbc) {
log.info(s"Using RDB for lock management.")
new TokenLockFactoryJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else if (hasBookkeepingDynamoDb) {
val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX)
log.info(s"Using DynamoDB for lock management in region '${bookkeepingConfig.dynamoDbRegion.get}' with table prefix '$tablePrefix'")
TokenLockFactoryDynamoDb.builder
.withRegion(bookkeepingConfig.dynamoDbRegion.get)
.withTablePrefix(tablePrefix)
.withTableArn(bookkeepingConfig.dynamoDbTableArn)
.build()
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -129,6 +138,15 @@ object Bookkeeper {
new BookkeeperNull()
} else if (hasBookkeepingJdbc) {
BookkeeperJdbc.fromPramenDb(dbOpt.get, batchId)
} else if (hasBookkeepingDynamoDb) {
val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX)
log.info(s"Using DynamoDB for bookkeeping in region '${bookkeepingConfig.dynamoDbRegion.get}' with table prefix '$tablePrefix'")
BookkeeperDynamoDb.builder
.withRegion(bookkeepingConfig.dynamoDbRegion.get)
.withBatchId(batchId)
.withTablePrefix(tablePrefix)
.withTableArn(bookkeepingConfig.dynamoDbTableArn)
.build()
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -161,6 +179,14 @@ object Bookkeeper {
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep journal of executed jobs.")
new JournalJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else if (hasBookkeepingDynamoDb) {
val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(JournalDynamoDB.DEFAULT_TABLE_PREFIX)
log.info(s"Using DynamoDB for journal in region '${bookkeepingConfig.dynamoDbRegion.get}' with table prefix '$tablePrefix'")
JournalDynamoDB.builder
.withRegion(bookkeepingConfig.dynamoDbRegion.get)
.withTablePrefix(tablePrefix)
.withTableArn(bookkeepingConfig.dynamoDbTableArn)
.build()
} else {
mongoDbConnection match {
case Some(connection) =>
Expand Down Expand Up @@ -194,6 +220,14 @@ object Bookkeeper {
} else if (hasBookkeepingJdbc) {
log.info(s"Using RDB to keep custom metadata.")
new MetadataManagerJdbc(dbOpt.get.slickDb, dbOpt.get.slickProfile)
} else if (hasBookkeepingDynamoDb) {
val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(MetadataManagerDynamoDb.DEFAULT_TABLE_PREFIX)
log.info(s"Using DynamoDB for metadata in region '${bookkeepingConfig.dynamoDbRegion.get}' with table prefix '$tablePrefix'")
MetadataManagerDynamoDb.builder
.withRegion(bookkeepingConfig.dynamoDbRegion.get)
.withTablePrefix(tablePrefix)
.withTableArn(bookkeepingConfig.dynamoDbTableArn)
.build()
} else {
log.info(s"The custom metadata management is not supported.")
new MetadataManagerNull(isPersistenceEnabled = true)
Expand All @@ -203,6 +237,9 @@ object Bookkeeper {
override def close(): Unit = {
mongoDbConnection.foreach(_.close())
dbOpt.foreach(_.close())
tokenFactory.close()
journal.close()
metadataManager.close()
}
}

Expand Down
Loading
Loading