Skip to content

Commit

Permalink
DBI tweaks for concurrent writes to SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
Eliot McIntire committed Feb 14, 2020
1 parent 089eb65 commit c4d454f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
20 changes: 13 additions & 7 deletions R/DBI.R
Expand Up @@ -25,17 +25,23 @@ createCache <- function(cachePath, drv = getOption("reproducible.drv", RSQLite::
checkPath(CacheStorageDir(cachePath), create = TRUE)
if (useDBI()) {
if (is.null(conn)) {
conn <- dbConnectAll(drv, cachePath = cachePath)
on.exit(dbDisconnect(conn))
conn <- dbConnectAll(drv, cachePath = cachePath)
on.exit(dbDisconnect(conn))
}
}
dt <- .emptyCacheTable

retry(retries = 250, exponentialDecayBase = 1.01, quote(
dbWriteTable(conn, CacheDBTableName(cachePath, drv), dt, overwrite = TRUE,
field.types = c(cacheId = "text", tagKey = "text",
tagValue = "text", createdDate = "text")))
)
# Some tough to find cases where stalls on dbWriteTable -- this *may* prevent some
a <- retry(retries = 250, exponentialDecayBase = 1.01,
quote(dbListTables(conn)))

if (isTRUE(!CacheDBTableName(cachePath, drv) %in% a))
#retry(retries = 5, exponentialDecayBase = 1.5, quote(
try(dbWriteTable(conn, CacheDBTableName(cachePath, drv), dt, overwrite = FALSE,
field.types = c(cacheId = "text", tagKey = "text",
tagValue = "text", createdDate = "text")), silent = TRUE)
#)

}

#' @rdname cacheTools
Expand Down
3 changes: 2 additions & 1 deletion R/cache.R
Expand Up @@ -558,7 +558,8 @@ setMethod(
dbTabName = dbTabNam,
outputHash = outputHash,
.con = conn)
res <- retry(quote(dbSendQuery(conn, qry)))
res <- retry(retries = 15, exponentialDecayBase = 1.01,
quote(dbSendQuery(conn, qry)))
isInRepo <- setDT(dbFetch(res))
dbClearResult(res)
} else {
Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test-cluster.R
Expand Up @@ -28,6 +28,7 @@ test_that("test parallel collisions", {
if (interactive()) {
of <- tmpfile[3]
cl <- makeCluster(N, outfile = of)
print(paste("log file is", of))
} else {
cl <- makeCluster(N)
}
Expand All @@ -40,6 +41,9 @@ test_that("test parallel collisions", {
# })
numToRun <- 40
skip_on_os("mac")
# THere is a creating Cache at the same time problem -- haven't resolved
# Just make cache first and it seems fine
Cache(rnorm, 1, cacheRepo = tmpdir)
a <- try(clusterMap(cl = cl, fun, seq(numToRun), cacheRepo = tmpdir, .scheduling = "dynamic"),
silent = FALSE)
expect_false(is(a, "try-error"))
Expand Down

0 comments on commit c4d454f

Please sign in to comment.