Skip to content

Commit

Permalink
droplet: implement access to volume by iterating
Browse files Browse the repository at this point in the history
Before dpl_opendir has been used to access all chunks of a volume.
Unfortenatly, it has a bug and returns only the first 1000 entries.

As we know, that chunks are always named as 4 digit numbers from 0000 to 9999
we now iterate through them. We stop, if a chunk does not exist.

This commit also fixes the check_remote function.
Before, if droplet have already be initialized
but connection to backend stops operating,
check_remote still return true. This is now fixed.
  • Loading branch information
joergsteffens committed Oct 1, 2018
1 parent 1396403 commit 2826932
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 47 deletions.
210 changes: 163 additions & 47 deletions src/stored/backends/droplet_device.c
Expand Up @@ -186,12 +186,14 @@ static dpl_status_t chunked_volume_truncate_callback(dpl_dirent_t *dirent, dpl_c
if (*dirent->name >= '0' && *dirent->name <= '9') {

// FIXME: only for testing
/*
if (bstrcmp(dirent->name, "0007")) {
status = DPL_EIO;
Emsg2(M_ERROR, 0, "FAKE: Failed to delete chunk file %s using dpl_unlink(): ERR=%s.\n",
dirent->fqn.path, dpl_status_str(status));
return status;
}
*/

status = dpl_unlink(ctx, dirent->name);

Expand Down Expand Up @@ -300,54 +302,179 @@ bool droplet_device::walk_directory(const char *dirname, t_dpl_walk_directory_ca
}



/*
* Callback for getting the total size of a chunked volume.
*/
static dpl_status_t chunked_volume_size_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
{
dpl_status_t status = DPL_SUCCESS;
ssize_t *volumesize = (ssize_t *)data;

*volumesize = *volumesize + sysmd->size;

return status;
}

/*
* Callback for truncating a chunked volume.
*
* Checks is connection to backend storage system is possible.
*
* Returns true - if connection can be established
* false - otherwise
* @return DPL_SUCCESS on success, on error: a dpl_status_t value that represents the error.
*/
static dpl_status_t chunked_volume_truncate_callback(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data)
{
dpl_status_t status = DPL_SUCCESS;

status = dpl_unlink(ctx, chunkpath);

switch (status) {
case DPL_SUCCESS:
break;
default:
/* no error message here, as error will be set by calling function. */
return status;
}

return status;
}


/*
* Generic function that walks a dirname and calls the callback
* function for each entry it finds in that directory.
*
* FIXME: currently, check_remote() returns true,
* after an initial connection could be made,
* even if the system is now no more reachable.
* Seams to be some caching effect.
* @return: true - if no error occured
* false - if an error has occured. Sets dev_errno and errmsg to the first error.
*/
bool droplet_device::check_remote()
bool droplet_device::walk_chunks(const char *dirname, t_dpl_walk_chunks_call_back callback, void *data)
{
bool retval = false;
bool retval = true;
dpl_status_t status;
dpl_status_t callback_status;
dpl_sysmd_t *sysmd = NULL;
POOL_MEM path(PM_NAME);

if (!m_ctx) {
if (!initialize()) {
return false;
sysmd = dpl_sysmd_dup(&m_sysmd);
bool found = true;
int i = 0;
while ((i < m_max_chunks) && (found) && (retval)) {
path.bsprintf("%s/%04d", dirname, i);

status = dpl_getattr(m_ctx, /* context */
path.c_str(), /* locator */
NULL, /* metadata */
sysmd); /* sysmd */

switch (status) {
case DPL_SUCCESS:
Dmsg1(100, "chunk %s exists. Calling callback.\n", path.c_str());
callback_status = callback(sysmd, m_ctx, path.c_str(), data);
if (callback_status == DPL_SUCCESS) {
i++;
} else {
Mmsg2(errmsg, _("Operation failed on chunk %s: ERR=%s."),
path.c_str(), dpl_status_str(callback_status));
dev_errno = droplet_errno_to_system_errno(callback_status);
/* exit loop */
retval = false;
}
break;
default:
Dmsg1(100, "chunk %s does not exists. Exiting.\n", path.c_str());
found = false;
break;
}
}

if (sysmd) {
dpl_sysmd_free(sysmd);
sysmd = NULL;
}

return retval;
}

/**
* Check if a specific path exists.
* It uses dpl_getattr() for this.
* However, dpl_getattr() results wrong results in a couple of situations,
* espescially directoy names should not be checked using a prepended "/".
*
* Results in detail:
*
* path | "name" | "name/" | target reachable | target not reachable | target not reachable | wrong credentials
* | exists | exists | | (already initialized) | (not initialized) |
* -------------------------------------------------------------------------------------------------------------------
* "" | - | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_FAILURE | DPL_EPERM
* "/" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
*
* "name" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "name/" | - | - | DPL_ENOENT | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name/" | - | - | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
*
* "name" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name" | yes | - | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "name/" | yes | - | DPL_ENOTDIR | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name/" | yes | - | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
*
* "name" | - | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name" | - | yes | DPL_ENOENT (!) | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "name/" | - | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name/" | - | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
*
* "name" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "name/" | yes | yes | DPL_SUCCESS | DPL_FAILURE | DPL_FAILURE | DPL_EPERM
* "/name/" | yes | yes | DPL_SUCCESS | DPL_SUCCESS (!) | DPL_SUCCESS (!) | DPL_SUCCESS (!)
*
* Best test for
* directories as "dir/"
* files as "file" or "/file".
*
* Returns DPL_SUCCESS - if path exists and can be accessed
* DPL_* errorcode - otherwise
*/
dpl_status_t droplet_device::check_path(const char *path)
{
dpl_status_t status;
dpl_sysmd_t *sysmd = NULL;

sysmd = dpl_sysmd_dup(&m_sysmd);
status = dpl_getattr(m_ctx, /* context */
"", /* locator */
path, /* locator */
NULL, /* metadata */
sysmd); /* sysmd */
Dmsg4(100, "check_path(device=%s, bucket=%s, path=%s): %s\n", prt_name, m_ctx->cur_bucket, path, dpl_status_str(status));
dpl_sysmd_free(sysmd);

switch (status) {
case DPL_SUCCESS:
Dmsg0(100, "check_remote: ok\n");
retval = true;
break;
case DPL_ENOENT:
case DPL_FAILURE:
default:
Dmsg0(100, "check_remote: failed\n");
break;
return status;
}



/**
* Checks if the connection to the backend storage system is possible.
*
* Returns true - if connection can be established
* false - otherwise
*/
bool droplet_device::check_remote()
{
if (!m_ctx) {
if (!initialize()) {
return false;
}
}

if (sysmd) {
dpl_sysmd_free(sysmd);
sysmd = NULL;
if (check_path("/") != DPL_SUCCESS) {
Dmsg1(100, "check_remote(%s): failed\n", prt_name);
return false;
}

return retval;
Dmsg1(100, "check_remote(%s): ok\n", prt_name);

return true;
}


Expand All @@ -356,22 +483,14 @@ bool droplet_device::remote_chunked_volume_exists()
{
bool retval = false;
dpl_status_t status;
dpl_sysmd_t *sysmd = NULL;
POOL_MEM chunk_dir(PM_FNAME);

if (!check_remote()) {
return false;
}

Mmsg(chunk_dir, "/%s", getVolCatName());

Dmsg1(100, "checking remote_chunked_volume_exists %s\n", chunk_dir.c_str());

sysmd = dpl_sysmd_dup(&m_sysmd);
status = dpl_getattr(m_ctx, /* context */
chunk_dir.c_str(), /* locator */
NULL, /* metadata */
sysmd); /* sysmd */
Mmsg(chunk_dir, "%s/", getVolCatName());
status = check_path(chunk_dir.c_str());

switch (status) {
case DPL_SUCCESS:
Expand All @@ -385,11 +504,6 @@ bool droplet_device::remote_chunked_volume_exists()
break;
}

if (sysmd) {
dpl_sysmd_free(sysmd);
sysmd = NULL;
}

return retval;
}

Expand Down Expand Up @@ -472,7 +586,7 @@ bool droplet_device::flush_remote_chunk(chunk_io_request *request)
case DPL_SUCCESS:
break;
default:
Mmsg2(errmsg, _("Failed to create direcory %s using dpl_mkdir(): ERR=%s.\n"),
Mmsg2(errmsg, _("Failed to create directory %s using dpl_mkdir(): ERR=%s.\n"),
chunk_dir.c_str(), dpl_status_str(status));
dev_errno = droplet_errno_to_system_errno(status);
goto bail_out;
Expand Down Expand Up @@ -631,7 +745,8 @@ bool droplet_device::truncate_remote_chunked_volume(DCR *dcr)
POOL_MEM chunk_dir(PM_FNAME);

Mmsg(chunk_dir, "/%s", getVolCatName());
if (!walk_directory(chunk_dir.c_str(), chunked_volume_truncate_callback, NULL)) {
//if (!walk_directory(chunk_dir.c_str(), chunked_volume_truncate_callback, NULL)) {
if (!walk_chunks(chunk_dir.c_str(), chunked_volume_truncate_callback, NULL)) {
/* errno already set in walk_directory. */
return false;
}
Expand Down Expand Up @@ -928,7 +1043,7 @@ int droplet_device::d_ioctl(int fd, ioctl_req_t request, char *op)
*/
ssize_t droplet_device::chunked_remote_volume_size()
{
dpl_status_t status;
//dpl_status_t status;
ssize_t volumesize = 0;
dpl_sysmd_t *sysmd = NULL;
POOL_MEM chunk_dir(PM_FNAME);
Expand Down Expand Up @@ -971,7 +1086,8 @@ ssize_t droplet_device::chunked_remote_volume_size()
}
#endif

if (!walk_directory(chunk_dir.c_str(), chunked_volume_size_callback, &volumesize)) {
//if (!walk_directory(chunk_dir.c_str(), chunked_volume_size_callback, &volumesize)) {
if (!walk_chunks(chunk_dir.c_str(), chunked_volume_size_callback, &volumesize)) {
/* errno is already set in walk_directory */
volumesize = -1;
goto bail_out;
Expand Down
6 changes: 6 additions & 0 deletions src/stored/backends/droplet_device.h
Expand Up @@ -39,6 +39,8 @@
*/
typedef dpl_status_t (*t_dpl_walk_directory_call_back)(dpl_dirent_t *dirent, dpl_ctx_t *ctx,
const char *dirname, void *data);
typedef dpl_status_t (*t_dpl_walk_chunks_call_back)(dpl_sysmd_t *sysmd, dpl_ctx_t *ctx, const char *chunkpath, void *data);




Expand All @@ -47,6 +49,8 @@ class droplet_device: public chunked_device {
/*
* Private Members
*/
/* maximun number of chunks in a volume (0000 to 9999) */
const int m_max_chunks = 10000;
char *m_configstring;
const char *m_profile;
const char *m_location;
Expand All @@ -60,6 +64,7 @@ class droplet_device: public chunked_device {
* Private Methods
*/
bool initialize();
dpl_status_t check_path(const char *path);

/*
* Interface from chunked_device
Expand All @@ -72,6 +77,7 @@ class droplet_device: public chunked_device {
bool truncate_remote_chunked_volume(DCR *dcr);

bool walk_directory(const char *dirname, t_dpl_walk_directory_call_back callback, void *data);
bool walk_chunks(const char *dirname, t_dpl_walk_chunks_call_back callback, void *data);

public:
/*
Expand Down

0 comments on commit 2826932

Please sign in to comment.