Skip to content

Commit

Permalink
implement multiget
Browse files Browse the repository at this point in the history
handle insert conflicts
  • Loading branch information
jillesvangurp committed Jan 2, 2024
1 parent c1b6210 commit d6c859e
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 74 deletions.
136 changes: 111 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,94 @@ content of the categories field to enable tag search.
// do some crud
val doc1 = MyModel("Number 1", "a first document", categories = listOf("foo"))
store.create(doc1)

store.getById(doc1.id)?.let {
println("Retrieved ${it.title}")
}
// update by id
store.update(doc1.id) {
it.copy(title = "Number One")

// you can only create the same id once
try {
store.create(doc1)
} catch (e: GenericDatabaseException) {
// fails
println("id already exists")
}
// or just pass in the document
store.update(doc1) {
it.copy(title = "Numero Uno")

// you can force the conflict be handled
// by overwriting the original
store.create(
doc1.copy(title = "Numero Uno"),
onConflictUpdate = true)

// now it is changed
store.getById(doc1.id)?.let {
println("Changed title ${it.title}")
}

// This is a better way to do updates
// update by id and modify the retrieved original
store.update(doc1.id) { original ->
// modify the original
original.copy(title = "Number One")
}
// Retrieve it again

// you can also do it like this
store.update(doc1) { original ->
// still fetches the original
original.copy(title = "This is the one")
}

store.getById(doc1.id)?.let {
println("Retrieved ${it.title}")
println("Now it is ${it.title}")
}

// delete a document
store.delete(doc1)
println("now it's gone: ${store.getById(doc1.id)}")

```

This prints:

```text
Retrieved Number 1
id already exists
Changed title Numero Uno
Now it is This is the one
now it's gone: null
```

### Multi-get

You can get multiple documents in one go like this:

```kotlin
store.create(MyModel(id="1", title = "Foo"))
store.create(MyModel(id="1", title = "Bar"))
val docs =
store.multiGetById(listOf("1", "2"))
println(docs.map { it.title })
```

This prints:

```text
```

### Bulk inserting documents

Inserting documents one at the time is not
very efficient. If you have large amounts
of documents, you should use bulkInsert.

This works with lists or flows. If you use
scrolling searches, which return a flow,
you can efficiently update large amounts
of documents in one go.

```kotlin

// you can also do bulk inserts using flows or lists
flow {
repeat(200) { index ->
Expand All @@ -140,48 +212,62 @@ flow {
// bulk insert 40 at a time
store.bulkInsert(flow = f, chunkSize = 40)
}
```

### Querying

Querying is a bit limited in pg-docstore; it's meant to
be used in combination with things like opensearch or
elasticsearch.

However, there are a few ways in which you can get documents
out of the store.

```kotlin

// and of course we can query in all sorts of ways
// documents are sorted by recency
println("five most recent documents: ${
store.documentsByRecency(limit = 5).map { it.title }
}")
// we can scroll through the entire table
// we can also scroll through the entire table
// and count the number of documents in the flow
println(
"Total documents: ${
store.documentsByRecencyScrolling().count()
// returns a Flow<T>
store.documentsByRecencyScrolling(
// fetch pages of 5 rows
fetchSize = 5
).count()
}"
)
// of course you can also do a select count(*) ...
println(store.count())

// and we can restrict the search using tags
// The search can be restricted using tags
// you need to set up the tagExtractor functino
// in the DocStore params for this to work
println(
"Just the bulk tagged documents: ${
store
.documentsByRecencyScrolling(
// filters on the extracted tags
tags = listOf("bulk")
)
.count()
store.documentsByRecencyScrolling(
// filters on the extracted tags
tags = listOf("bulk")
).count()
}"
)

// delete a document
store.delete(doc1)
println("now it's gone: ${store.getById(doc1.id)}")
```

This prints:

```text
Retrieved Number 1
Retrieved Numero Uno
five most recent documents: [Bulk 199, Bulk 198, Bulk 197, Bulk 196, Bulk 195]
Total documents: 201
201
Just the bulk tagged documents: 200
now it's gone: null
```

### Text search
While no substitute for a proper search engine, postgres has some
text search facilities.

```kotlin

Expand Down
81 changes: 65 additions & 16 deletions src/main/kotlin/com/tryformation/pgdocstore/DocStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ class DocStore<T : Any>(

suspend fun create(
doc: T,
timestamp: Instant = Clock.System.now()
timestamp: Instant = Clock.System.now(),
onConflictUpdate:Boolean=false
) {
create(idExtractor.invoke(doc), doc, timestamp)
create(idExtractor.invoke(doc), doc, timestamp,onConflictUpdate)
}

suspend fun create(id: String, doc: T, timestamp: Instant = Clock.System.now()) {
suspend fun create(id: String, doc: T, timestamp: Instant = Clock.System.now(),onConflictUpdate:Boolean=false) {
val txt = json.encodeToString(serializationStrategy, doc)
val tags = tagExtractor.invoke(doc)
val text = textExtractor.invoke(doc)
Expand All @@ -105,6 +106,16 @@ class DocStore<T : Any>(
"""
INSERT INTO $tableName (id, created_at, updated_at, json, tags, text)
VALUES (?,?,?,?,?,to_tsvector(coalesce(?)))
${
if(onConflictUpdate) """
ON CONFLICT (id) DO UPDATE SET
json = EXCLUDED.json,
tags = EXCLUDED.tags,
text = EXCLUDED.text,
updated_at = EXCLUDED.updated_at
""".trimIndent()
else ""
}
""".trimIndent(), listOf(id, timestamp, timestamp, txt, tags, text)
)
}
Expand All @@ -125,6 +136,23 @@ class DocStore<T : Any>(
}
}

suspend fun multiGetById(ids: List<String>): List<T> {

return connection.sendPreparedStatement(
// little hack with question marks because jasync doesn't handle lists in prepared statements
query = """
select ${DocStoreEntry::json.name} from $tableName where id in (${ids.joinToString(",") { "?" }})
""".trimIndent(),
values = ids,
// release because number of question marks may vary ...
release = true
).rows.mapNotNull { row ->
row.getString(DocStoreEntry::json.name)?.let { str ->
json.decodeFromString(serializationStrategy, str)
}
}
}

suspend fun getEntryById(id: String): DocStoreEntry? {
return connection.sendPreparedStatement(
"""
Expand All @@ -140,6 +168,22 @@ class DocStore<T : Any>(
}
}

suspend fun multiGetEntryById(ids: List<String>): List<DocStoreEntry> {
return connection.sendPreparedStatement(
// little hack with question marks because jasync doesn't handle lists in prepared statements
query = """
select * from $tableName where id in (${ids.joinToString(",") { "?" }})
""".trimIndent(),
values = ids,
// release because number of question marks may vary ...
release = true
).let { r ->
r.rows.map {
it.docStoreEntry
}
}
}

suspend fun update(
doc: T,
timestamp: Instant = Clock.System.now(),
Expand Down Expand Up @@ -253,7 +297,7 @@ class DocStore<T : Any>(
}
}

suspend fun insertList(chunk: List<Pair<String, T>>) {
suspend fun insertList(chunk: List<Pair<String, T>>,timestamp: Instant = Clock.System.now()) {
// Base SQL for INSERT
val baseSql = """
INSERT INTO $tableName (id, json, tags, created_at, updated_at, text)
Expand All @@ -266,27 +310,23 @@ class DocStore<T : Any>(
ON CONFLICT (id) DO UPDATE SET
json = EXCLUDED.json,
tags = EXCLUDED.tags,
text = EXCLUDED.text,
updated_at = EXCLUDED.updated_at
""".trimIndent()
text = EXCLUDED.text,
updated_at = EXCLUDED.created_at
""".trimIndent() // uses rejected created_at as the update timestamp

val sql = "$baseSql $values $conflictAction"

// Flatten the list of lists into a single list of parameters
val params = chunk.map { (id, doc) ->
val now = Clock.System.now()
val now = timestamp
val tags = tagExtractor.invoke(doc)
val text = textExtractor.invoke(doc)
listOf(id, json.encodeToString(serializationStrategy, doc), tags, now, now, text)
}.flatten()

// Create and execute the PreparedStatement

try {
connection.sendPreparedStatement(sql, params)
} catch (e: Exception) {
e.printStackTrace()
}
connection.sendPreparedStatement(sql, params)
}

suspend fun count(): Long {
Expand Down Expand Up @@ -330,9 +370,13 @@ class DocStore<T : Any>(
tags: List<String> = emptyList(),
orTags: Boolean = false,
query: String? = null,
fetchSize: Int = 100,
): Flow<DocStoreEntry> {
val q = constructQuery(tags, query, orTags)
return queryFlow(q, tags + listOfNotNull(query)) { row ->
return queryFlow(
query = constructQuery(tags, query, orTags),
params = tags + listOfNotNull(query),
fetchSize = fetchSize
) { row ->
row.docStoreEntry
}
}
Expand All @@ -341,9 +385,14 @@ class DocStore<T : Any>(
tags: List<String> = emptyList(),
orTags: Boolean = false,
query: String? = null,
fetchSize: Int = 100,
): Flow<T> {
val q = constructQuery(tags, query, orTags)
return queryFlow(q, tags + listOfNotNull(query)) { row ->
return queryFlow(
query = q,
params = tags + listOfNotNull(query),
fetchSize = fetchSize
) { row ->
json.decodeFromString(
serializationStrategy,
row.getString(DocStoreEntry::json.name) ?: error("empty json")
Expand Down
4 changes: 3 additions & 1 deletion src/test/kotlin/com/tryformation/pgdocstore/BulkTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ class BulkTest : DbTestBase() {
ds.entriesByRecency(limit = 33).size shouldBe 33
ds.entriesByRecency(limit = 33).map { it.id }.distinct().size shouldBe 33

stored.map { it.copy(property = it.property.reversed()) }.let {
stored.map { it.copy(title = it.title.reversed()) }.let {
ds.bulkInsert(it)
}
ds.count() shouldBe 200
ds.multiGetById(stored.map { it.id }.take(13)).size shouldBe 13
ds.multiGetEntryById(stored.map { it.id }.take(13)).size shouldBe 13
}
}
Loading

0 comments on commit d6c859e

Please sign in to comment.