Skip to content

Commit

Permalink
libcloud-plugin: removed plugin context and use new constants
Browse files Browse the repository at this point in the history
  • Loading branch information
franku committed Aug 30, 2020
1 parent 7286a3d commit d244fd9
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 97 deletions.
133 changes: 65 additions & 68 deletions core/src/plugins/filed/python/libcloud/BareosFdPluginLibcloud.py
Expand Up @@ -22,14 +22,13 @@

import BareosFdPluginBaseclass
import bareosfd
import bareos_fd_consts
from bareosfd import *
import ConfigParser as configparser
import datetime
import dateutil.parser
from bareos_libcloud_api.bucket_explorer import TASK_TYPE
from bareos_libcloud_api.debug import debugmessage
from bareos_libcloud_api.debug import jobmessage
from bareos_libcloud_api.debug import set_plugin_context
import io
import itertools
import libcloud
Expand All @@ -41,7 +40,6 @@
from BareosLibcloudApi import ERROR
from BareosLibcloudApi import ABORT
from BareosLibcloudApi import BareosLibcloudApi
from bareos_fd_consts import bRCs, bCFs, bIOPS, bJobMessageType, bFileType
from libcloud.storage.types import Provider
from libcloud.storage.types import ObjectDoesNotExistError
from sys import version_info
Expand Down Expand Up @@ -69,16 +67,15 @@ def read(self, n=None):


class BareosFdPluginLibcloud(BareosFdPluginBaseclass.BareosFdPluginBaseclass):
def __init__(self, context, plugindef):
set_plugin_context(context)
def __init__(self, plugindef):
debugmessage(
100, "BareosFdPluginLibcloud called with plugindef: %s" % (plugindef,)
)

super(BareosFdPluginLibcloud, self).__init__(context, plugindef)
super(BareosFdPluginLibcloud, self).parse_plugin_definition(context, plugindef)
super(BareosFdPluginLibcloud, self).__init__(plugindef)
super(BareosFdPluginLibcloud, self).parse_plugin_definition(plugindef)
self.options["treat_download_errors_as_warnings"] = False
self.__parse_options(context)
self.__parse_options()

self.last_run = datetime.datetime.fromtimestamp(self.since)
self.last_run = self.last_run.replace(tzinfo=None)
Expand All @@ -92,25 +89,25 @@ def __init__(self, context, plugindef):
self.active = True
self.api = None

def __parse_options(self, context):
accurate = bareos_fd_consts.bVariable["bVarAccurate"]
accurate = bareosfd.GetValue(context, accurate)
def __parse_options(self):
accurate = bVarAccurate
accurate = bareosfd.GetValue(accurate)
if accurate is None or accurate == 0:
self.options["accurate"] = False
else:
self.options["accurate"] = True

def parse_plugin_definition(self, context, plugindef):
def parse_plugin_definition(self, plugindef):
debugmessage(100, "parse_plugin_definition()")
config_filename = self.options.get("config_file")
if config_filename:
if self.__parse_config_file(context, config_filename):
return bRCs["bRC_OK"]
if self.__parse_config_file(config_filename):
return bRC_OK
debugmessage(100, "Could not load configfile %s" % (config_filename))
jobmessage("M_FATAL", "Could not load configfile %s" % (config_filename))
return bRCs["bRC_Error"]
jobmessage(M_FATAL, "Could not load configfile %s" % (config_filename))
return bRC_Error

def __parse_config_file(self, context, config_filename):
def __parse_config_file(self, config_filename):
"""
Parse the config file given in the config_file plugin option
"""
Expand All @@ -131,9 +128,9 @@ def __parse_config_file(self, context, config_filename):
)
return False

return self.__check_config(context, config_filename)
return self.__check_config(config_filename)

def __check_config(self, context, config_filename):
def __check_config(self, config_filename):
"""
Check the configuration and set or override options if necessary,
considering mandatory: username and password in the [credentials] section
Expand Down Expand Up @@ -216,23 +213,23 @@ def __check_config(self, context, config_filename):

return True

def start_backup_job(self, context):
def start_backup_job(self):
jobmessage(
"M_INFO",
M_INFO,
"Start backup, try to connect to %s:%s"
% (self.options["host"], self.options["port"]),
)

if BareosLibcloudApi.probe_driver(self.options) == "failed":
jobmessage(
"M_FATAL",
M_FATAL,
"Could not connect to libcloud driver: %s:%s"
% (self.options["host"], self.options["port"]),
)
return bRCs["bRC_Error"]
return bRC_Error

jobmessage(
"M_INFO",
M_INFO,
"Connected, last backup: %s (ts: %s)" % (self.last_run, self.since),
)

Expand All @@ -245,25 +242,25 @@ def start_backup_job(self, context):
debugmessage(100, "BareosLibcloudApi started")
except Exception as e:
debugmessage(100, "Error: %s" % e)
jobmessage("M_FATAL", "Starting BareosLibcloudApi failed: %s" % e)
return bRCs["bRC_Cancel"]
jobmessage(M_FATAL, "Starting BareosLibcloudApi failed: %s" % e)
return bRC_Cancel

return bRCs["bRC_OK"]
return bRC_OK

def end_backup_job(self, context):
def end_backup_job(self):
if self.active:
self.__shutdown()
return bRCs["bRC_OK"]
return bRC_OK

def check_file(self, context, fname):
def check_file(self, fname):
# All existing files/objects are passed to bareos
# If bareos has not seen one, it does not exists anymore
return bRCs["bRC_Error"]
return bRC_Error

def __shutdown(self):
self.active = False
jobmessage(
"M_INFO",
M_INFO,
"BareosFdPluginLibcloud finished with %d files"
% (self.number_of_objects_to_backup),
)
Expand All @@ -275,7 +272,7 @@ def __shutdown(self):
self.api.shutdown()
debugmessage(100, "BareosLibcloudApi is shut down")

def start_backup_file(self, context, savepkt):
def start_backup_file(self, savepkt):
error = False
while self.active:
worker_result = self.api.check_worker_messages()
Expand All @@ -301,10 +298,10 @@ def start_backup_file(self, context, savepkt):
self.__shutdown()
savepkt.fname = "" # dummy value
if error:
jobmessage("M_FATAL", "Shutdown after worker error")
return bRCs["bRC_Cancel"]
jobmessage(M_FATAL, "Shutdown after worker error")
return bRC_Cancel
else:
return bRCs["bRC_Skip"]
return bRC_Skip

filename = FilenameConverter.BucketToBackup(
"%s/%s"
Expand All @@ -320,76 +317,76 @@ def start_backup_file(self, context, savepkt):

savepkt.statp = statp
savepkt.fname = filename
savepkt.type = bareos_fd_consts.bFileType["FT_REG"]
savepkt.type = FT_REG

if self.current_backup_task["type"] == TASK_TYPE.DOWNLOADED:
self.FILE = self.current_backup_task["data"]
elif self.current_backup_task["type"] == TASK_TYPE.TEMP_FILE:
try:
self.FILE = io.open(self.current_backup_task["tmpfile"], "rb")
except Exception as e:
jobmessage("M_FATAL", "Could not open temporary file for reading.")
jobmessage(M_FATAL, "Could not open temporary file for reading.")
self.__shutdown()
return bRCs["bRC_Error"]
return bRC_Error
elif self.current_backup_task["type"] == TASK_TYPE.STREAM:
try:
self.FILE = IterStringIO(self.current_backup_task["data"].as_stream())
except ObjectDoesNotExistError:
if self.options["treat_download_errors_as_warnings"]:
jobmessage(
"M_WARNING",
M_WARNING,
"Skipped file %s because it does not exist anymore"
% (self.current_backup_task["name"]),
)
return bRCs["bRC_Skip"]
return bRC_Skip
else:
jobmessage(
"M_ERROR",
M_ERROR,
"File %s does not exist anymore"
% (self.current_backup_task["name"]),
)
return bRCs["bRC_Error"]
return bRC_Error

else:
raise Exception(value='Wrong argument for current_backup_task["type"]')

return bRCs["bRC_OK"]
return bRC_OK

def create_file(self, context, restorepkt):
def create_file(self, restorepkt):
debugmessage(
100, "create_file() entry point in Python called with %s\n" % (restorepkt)
)
FNAME = FilenameConverter.BackupToBucket(restorepkt.ofname)
dirname = os.path.dirname(FNAME)
if not os.path.exists(dirname):
jobmessage("M_INFO", "Directory %s does not exist, creating it\n" % dirname)
jobmessage(M_INFO, "Directory %s does not exist, creating it\n" % dirname)
os.makedirs(dirname)
if restorepkt.type == bFileType["FT_REG"]:
restorepkt.create_status = bCFs["CF_EXTRACT"]
return bRCs["bRC_OK"]
if restorepkt.type == FT_REG:
restorepkt.create_status = CF_EXTRACT
return bRC_OK

def plugin_io(self, context, IOP):
def plugin_io(self, IOP):
if self.current_backup_task is None:
return bRCs["bRC_Error"]
if IOP.func == bIOPS["IO_OPEN"]:
return bRC_Error
if IOP.func == IO_OPEN:
# Only used by the 'restore' path
if IOP.flags & (os.O_CREAT | os.O_WRONLY):
self.FILE = open(FilenameConverter.BackupToBucket(IOP.fname), "wb")
return bRCs["bRC_OK"]
return bRC_OK

elif IOP.func == bIOPS["IO_READ"]:
elif IOP.func == IO_READ:
IOP.buf = bytearray(IOP.count)
IOP.io_errno = 0
if self.FILE is None:
return bRCs["bRC_Error"]
return bRC_Error
try:
buf = self.FILE.read(IOP.count)
IOP.buf[:] = buf
IOP.status = len(buf)
return bRCs["bRC_OK"]
return bRC_OK
except IOError as e:
jobmessage(
"M_ERROR",
M_ERROR,
"Cannot read from %s/%s: %s"
% (
self.current_backup_task["bucket"],
Expand All @@ -399,20 +396,20 @@ def plugin_io(self, context, IOP):
)
IOP.status = 0
if self.options["treat_download_errors_as_warnings"]:
return bRCs["bRC_Skip"]
return bRC_Skip
else:
return bRCs["bRC_Error"]
return bRC_Error

elif IOP.func == bIOPS["IO_WRITE"]:
elif IOP.func == IO_WRITE:
try:
self.FILE.write(IOP.buf)
IOP.status = IOP.count
IOP.io_errno = 0
except IOError as msg:
IOP.io_errno = -1
jobmessage("M_ERROR", "Failed to write data: %s" % (msg,))
return bRCs["bRC_OK"]
elif IOP.func == bIOPS["IO_CLOSE"]:
jobmessage(M_ERROR, "Failed to write data: %s" % (msg,))
return bRC_OK
elif IOP.func == IO_CLOSE:
if self.FILE:
self.FILE.close()
if "type" in self.current_backup_task:
Expand All @@ -430,13 +427,13 @@ def plugin_io(self, context, IOP):
"Could not remove temporary file: %s"
% (self.current_backup_task["tmpfile"]),
)
return bRCs["bRC_OK"]
return bRC_OK

return bRCs["bRC_OK"]
return bRC_OK

def end_backup_file(self, context):
def end_backup_file(self):
if self.current_backup_task is not None:
self.number_of_objects_to_backup += 1
return bRCs["bRC_More"]
return bRC_More
else:
return bRCs["bRC_OK"]
return bRC_OK
17 changes: 9 additions & 8 deletions core/src/plugins/filed/python/libcloud/BareosLibcloudApi.py
@@ -1,3 +1,4 @@
from bareosfd import M_INFO, M_ERROR
from bareos_libcloud_api.bucket_explorer import BucketExplorer
from bareos_libcloud_api.bucket_explorer import TASK_TYPE
from bareos_libcloud_api.debug import debugmessage, jobmessage
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(self, options, last_run, tmp_dir_path):

self.__create_tmp_dir()

jobmessage("M_INFO", "Initialize BucketExplorer")
jobmessage(M_INFO, "Initialize BucketExplorer")

self.bucket_explorer = BucketExplorer(
options,
Expand All @@ -53,7 +54,7 @@ def __init__(self, options, last_run, tmp_dir_path):
self.number_of_worker,
)

jobmessage("M_INFO", "Initialize %d Workers" % self.number_of_worker)
jobmessage(M_INFO, "Initialize %d Workers" % self.number_of_worker)

self.worker = [
Worker(
Expand All @@ -67,10 +68,10 @@ def __init__(self, options, last_run, tmp_dir_path):
for i in range(self.number_of_worker)
]

jobmessage("M_INFO", "Start BucketExplorer")
jobmessage(M_INFO, "Start BucketExplorer")
self.bucket_explorer.start()

jobmessage("M_INFO", "Start Workers")
jobmessage(M_INFO, "Start Workers")
for w in self.worker:
w.start()

Expand All @@ -85,9 +86,9 @@ def check_worker_messages(self):
try:
message = self.message_queue.get_nowait()
if message.type == MESSAGE_TYPE.INFO_MESSAGE:
jobmessage("M_INFO", message.message_string)
jobmessage(M_INFO, message.message_string)
elif message.type == MESSAGE_TYPE.ERROR_MESSAGE:
jobmessage("M_ERROR", message.message_string)
jobmessage(M_ERROR, message.message_string)
elif message.type == MESSAGE_TYPE.READY_MESSAGE:
if message.worker_id == 0:
self.count_bucket_explorer_ready += 1
Expand All @@ -100,7 +101,7 @@ def check_worker_messages(self):
else:
raise Exception(value="Unknown message type")
except Exception as e:
jobmessage("M_INFO", "check_worker_messages exception: %s" % e)
jobmessage(M_INFO, "check_worker_messages exception: %s" % e)
return SUCCESS

def get_next_task(self):
Expand Down Expand Up @@ -149,7 +150,7 @@ def shutdown(self):
except:
pass

jobmessage("M_INFO", "Finished shutdown of worker processes")
jobmessage(M_INFO, "Finished shutdown of worker processes")

def __create_tmp_dir(self):
debugmessage(100, "Try to create temporary directory: %s" % (self.tmp_dir_path))
Expand Down

0 comments on commit d244fd9

Please sign in to comment.