Skip to content

Commit

Permalink
Fixes for restore objects in python-fd
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco van Wieringen committed Dec 11, 2014
1 parent 2abb036 commit cb61a58
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 40 deletions.
47 changes: 32 additions & 15 deletions src/dird/fd_cmds.c
Expand Up @@ -640,15 +640,19 @@ struct OBJ_CTX {

static int restore_object_handler(void *ctx, int num_fields, char **row)
{
BSOCK *fd;
bool is_compressed;
OBJ_CTX *octx = (OBJ_CTX *)ctx;
JCR *jcr = octx->jcr;
BSOCK *fd;

fd = jcr->file_bsock;
if (jcr->is_job_canceled()) {
return 1;
}
/* Old File Daemon doesn't handle restore objects */

/*
* Old File Daemon doesn't handle restore objects
*/
if (jcr->FDVersion < FD_VERSION_3) {
Jmsg(jcr, M_WARNING, 0, _("Client \"%s\" may not be used to restore "
"this job. Please upgrade your client.\n"),
Expand All @@ -660,7 +664,9 @@ static int restore_object_handler(void *ctx, int num_fields, char **row)
fd->fsend("restoreobject JobId=%s %s,%s,%s,%s,%s,%s\n",
row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
} else {
/* bash spaces from PluginName */
/*
* bash spaces from PluginName
*/
bash_spaces(row[9]);
fd->fsend("restoreobject JobId=%s %s,%s,%s,%s,%s,%s,%s\n",
row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[9]);
Expand All @@ -672,22 +678,27 @@ static int restore_object_handler(void *ctx, int num_fields, char **row)

Dmsg1(010, "Send obj: %s\n", fd->msg);

// fd->msglen = str_to_uint64(row[1]); /* object length */
// Dmsg1(000, "obj size: %lld\n", (uint64_t)fd->msglen);

/* object */
db_unescape_object(jcr, jcr->db,
db_unescape_object(jcr,
jcr->db,
row[8], /* Object */
str_to_uint64(row[1]), /* Object length */
&fd->msg, &fd->msglen);
&fd->msg,
&fd->msglen);
fd->send(); /* send object */
octx->count++;

if (debug_level) {
for (int i=0; i < fd->msglen; i++)
if (!fd->msg[i])
/*
* Don't try to print compressed objects.
*/
is_compressed = str_to_uint64(row[5]) > 0;
if (debug_level >= 100 && !is_compressed) {
for (int i = 0; i < fd->msglen; i++) {
if (!fd->msg[i]) {
fd->msg[i] = ' ';
Dmsg1(000, "Send obj: %s\n", fd->msg);
}
}

Dmsg1(100, "Send obj: %s\n", fd->msg);
}

return 0;
Expand All @@ -711,6 +722,7 @@ bool send_plugin_options(JCR *jcr)
return false;
}
}

return true;
}

Expand All @@ -729,11 +741,15 @@ bool send_restore_objects(JCR *jcr)

/* restore_object_handler is called for each file found */

/* send restore objects for all jobs involved */
/*
* Send restore objects for all jobs involved
*/
Mmsg(query, get_restore_objects, jcr->JobIds, FT_RESTORE_FIRST);
db_sql_query(jcr->db, query.c_str(), restore_object_handler, (void *)&octx);

/* send config objects for the current restore job */
/*
* Send config objects for the current restore job
*/
Mmsg(query, get_restore_objects,
edit_uint64(jcr->JobId, ed1), FT_PLUGIN_CONFIG_FILLED);
db_sql_query(jcr->db, query.c_str(), restore_object_handler, (void *)&octx);
Expand All @@ -750,6 +766,7 @@ bool send_restore_objects(JCR *jcr)
return false;
}
}

return true;
}

Expand Down
61 changes: 49 additions & 12 deletions src/filed/dir_cmd.c
Expand Up @@ -1022,7 +1022,9 @@ static bool restore_object_cmd(JCR *jcr)
&rop.object_type, &rop.object_compression, &FileIndex,
rop.plugin_name) != 8) {

/* Old version, no plugin_name */
/*
* Old version, no plugin_name
*/
if (sscanf(dir->msg, restoreobjcmd1, &rop.JobId, &rop.object_len,
&rop.object_full_len, &rop.object_index,
&rop.object_type, &rop.object_compression, &FileIndex) != 7) {
Expand All @@ -1039,39 +1041,72 @@ static bool restore_object_cmd(JCR *jcr)
"FI=%d plugin_name=%s\n",
rop.JobId, rop.object_len, rop.object_full_len,
rop.object_index, rop.object_type, FileIndex, rop.plugin_name);
/* Read Object name */

/*
* Read Object name
*/
if (dir->recv() < 0) {
goto bail_out;
}
Dmsg2(100, "Recv Oname object: len=%d Oname=%s\n", dir->msglen, dir->msg);
rop.object_name = bstrdup(dir->msg);

/* Read Object */
/*
* Read Object
*/
if (dir->recv() < 0) {
goto bail_out;
}
/* Transfer object from message buffer, and get new message buffer */

/*
* Transfer object from message buffer, and get new message buffer
*/
rop.object = dir->msg;
dir->msg = get_pool_memory(PM_MESSAGE);

/* If object is compressed, uncompress it */
if (rop.object_compression == 1) { /* zlib level 9 */
/*
* If object is compressed, uncompress it
*/
switch (rop.object_compression) {
case 1: { /* zlib level 9 */
int status;
int out_len = rop.object_full_len + 100;
POOLMEM *obj = get_memory(out_len);

Dmsg2(100, "Inflating from %d to %d\n", rop.object_len, rop.object_full_len);
status = Zinflate(rop.object, rop.object_len, obj, out_len);
Dmsg1(100, "Zinflate status=%d\n", status);

if (out_len != rop.object_full_len) {
Jmsg3(jcr, M_ERROR, 0, ("Decompression failed. Len wanted=%d got=%d. Object=%s\n"),
rop.object_full_len, out_len, rop.object_name);
Jmsg3(jcr, M_ERROR, 0, ("Decompression failed. Len wanted=%d got=%d. Object_name=%s\n"),
rop.object_full_len, out_len, rop.object_name);
}
free_pool_memory(rop.object); /* release compressed object */
rop.object = obj; /* new uncompressed object */

free_pool_memory(rop.object); /* Release compressed object */
rop.object = obj; /* New uncompressed object */
rop.object_len = out_len;
break;
}
default:
break;
}
Dmsg2(100, "Recv Object: len=%d Object=%s\n", rop.object_len, rop.object);
/* we still need to do this to detect a vss restore */

if (debug_level >= 100) {
POOL_MEM object_content(PM_MESSAGE);

/*
* Convert the object into a null terminated string.
*/
object_content.check_size(rop.object_len + 1);
memset(object_content.c_str(), 0, rop.object_len + 1);
memcpy(object_content.c_str(), rop.object, rop.object_len);

Dmsg2(100, "Recv Object: len=%d Object=%s\n", rop.object_len, object_content.c_str());
}

/*
* We still need to do this to detect a vss restore
*/
if (bstrcmp(rop.object_name, "job_metadata.xml")) {
Dmsg0(100, "got job metadata\n");
jcr->got_metadata = true;
Expand All @@ -1082,9 +1117,11 @@ static bool restore_object_cmd(JCR *jcr)
if (rop.object_name) {
free(rop.object_name);
}

if (rop.object) {
free_pool_memory(rop.object);
}

if (rop.plugin_name) {
free(rop.plugin_name);
}
Expand Down
74 changes: 64 additions & 10 deletions src/plugins/filed/python-fd.c
Expand Up @@ -221,13 +221,14 @@ static bRC newPlugin(bpContext *ctx)
* any other events it is interested in.
*/
bfuncs->registerBareosEvents(ctx,
6,
7,
bEventNewPluginOptions,
bEventPluginCommand,
bEventJobStart,
bEventRestoreCommand,
bEventEstimateCommand,
bEventBackupCommand);
bEventBackupCommand,
bEventRestoreObject);

return bRC_OK;
}
Expand Down Expand Up @@ -393,6 +394,23 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
*/
p_ctx->plugin_options = bstrdup((char *)value);
break;
case bEventRestoreObject: {
struct restore_object_pkt *rop;

rop = (struct restore_object_pkt *)value;

/*
* Only use the plugin definition of a restore object if we
* didn't get any other plugin definition from some other source before.
*/
if (!p_ctx->python_loaded) {
if (rop && *rop->plugin_name) {
event_dispatched = true;
retval = parse_plugin_definition(ctx, rop->plugin_name);
}
}
break;
}
default:
break;
}
Expand Down Expand Up @@ -442,9 +460,38 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
retval = PyParsePluginDefinition(ctx, value);
}
break;
case bEventRestoreObject:
retval = PyRestoreObjectData(ctx, (struct restore_object_pkt *)value);
case bEventRestoreObject: {
struct restore_object_pkt *rop;

rop = (struct restore_object_pkt *)value;
if (!rop) {
/*
* If rop == NULL this means we got the last restore object.
* No need to call into python so just return.
*/
retval = bRC_OK;
} else {
/*
* See if we already loaded the Python modules.
*/
if (!p_ctx->python_loaded && *rop->plugin_name) {
retval = PyLoadModule(ctx, rop->plugin_name);

/*
* Only try to call when the loading succeeded.
*/
if (retval == bRC_OK) {
retval = PyParsePluginDefinition(ctx, rop->plugin_name);
if (retval == bRC_OK) {
retval = PyRestoreObjectData(ctx, rop);
}
}
} else {
retval = PyRestoreObjectData(ctx, rop);
}
}
break;
}
case bEventHandleBackupFile:
retval = PyHandleBackupFile(ctx, (struct save_pkt *)value);
break;
Expand Down Expand Up @@ -1323,12 +1370,13 @@ static inline bool PySavePacketToNative(PySavePacket *pSavePkt, struct save_pkt
sp->object_len = pSavePkt->object_len;
sp->index = pSavePkt->object_index;

if (!(buf = PyByteArray_AsString(pSavePkt->object))) {
if ((buf = PyByteArray_AsString(pSavePkt->object))) {
if (p_ctx->object) {
free(p_ctx->object);
}
p_ctx->object = (char *)malloc(pSavePkt->object_len);
memcpy(p_ctx->object, buf, pSavePkt->object_len);
sp->object = p_ctx->object;
} else {
goto bail_out;
}
Expand Down Expand Up @@ -1790,7 +1838,7 @@ static inline PyRestoreObject *NativeToPyRestoreObject(struct restore_object_pkt
PyRestoreObject *pRestoreObject = PyObject_New(PyRestoreObject, &PyRestoreObjectType);

if (pRestoreObject) {
pRestoreObject->object_name = rop->object_name;
pRestoreObject->object_name = PyString_FromString(rop->object_name);
pRestoreObject->object = PyByteArray_FromStringAndSize(rop->object, rop->object_len);
pRestoreObject->plugin_name = rop->plugin_name;
pRestoreObject->object_type = rop->object_type;
Expand Down Expand Up @@ -2306,9 +2354,9 @@ static PyObject *PyRestoreObject_repr(PyRestoreObject *self)
POOL_MEM buf(PM_MESSAGE);

Mmsg(buf, "RestoreObject(object_name=\"%s\", object=\"%s\", plugin_name=\"%s\", "
"object_type=%ld, object_len=%ld, object_full_len=%ld, "
"object_index=%ld, object_compression=%ld, stream=%ld, jobid=%ld)",
self->object_name, self->object, self->plugin_name,
"object_type=%d, object_len=%d, object_full_len=%d, "
"object_index=%d, object_compression=%d, stream=%d, jobid=%u)",
PyGetStringValue(self->object_name), PyGetByteArrayValue(self->object), self->plugin_name,
self->object_type, self->object_len, self->object_full_len,
self->object_index, self->object_compression, self->stream, self->JobId);
s = PyString_FromString(buf.c_str());
Expand Down Expand Up @@ -2348,7 +2396,7 @@ static int PyRestoreObject_init(PyRestoreObject *self, PyObject *args, PyObject

if (!PyArg_ParseTupleAndKeywords(args,
kwds,
"|sosiiiiiiI",
"|oosiiiiiiI",
kwlist,
&self->object_name,
&self->object,
Expand All @@ -2371,6 +2419,12 @@ static int PyRestoreObject_init(PyRestoreObject *self, PyObject *args, PyObject
*/
static void PyRestoreObject_dealloc(PyRestoreObject *self)
{
if (self->object_name) {
Py_XDECREF(self->object_name);
}
if (self->object) {
Py_XDECREF(self->object);
}
PyObject_Del(self);
}

Expand Down
6 changes: 3 additions & 3 deletions src/plugins/filed/python-fd.h
Expand Up @@ -57,7 +57,7 @@ static plugin_argument plugin_arguments[] = {
*/
typedef struct {
PyObject_HEAD
char *object_name; /* Object name */
PyObject *object_name; /* Object name */
PyObject *object; /* Restore object data to restore */
char *plugin_name; /* Plugin name */
int32_t object_type; /* FT_xx for this file */
Expand All @@ -81,8 +81,8 @@ static PyMethodDef PyRestoreObject_methods[] = {
};

static PyMemberDef PyRestoreObject_members[] = {
{ (char *)"object_name", T_STRING, offsetof(PyRestoreObject, object_name), 0, (char *)"Object Name" },
{ (char *)"object", T_STRING, offsetof(PyRestoreObject, object), 0, (char *)"Object Content" },
{ (char *)"object_name", T_OBJECT, offsetof(PyRestoreObject, object_name), 0, (char *)"Object Name" },
{ (char *)"object", T_OBJECT, offsetof(PyRestoreObject, object), 0, (char *)"Object Content" },
{ (char *)"plugin_name", T_STRING, offsetof(PyRestoreObject, plugin_name), 0, (char *)"Plugin Name" },
{ (char *)"object_type", T_INT, offsetof(PyRestoreObject, object_type), 0, (char *)"Object Type" },
{ (char *)"object_len", T_INT, offsetof(PyRestoreObject, object_len), 0, (char *)"Object Length" },
Expand Down

0 comments on commit cb61a58

Please sign in to comment.