Skip to content

Commit

Permalink
fd python libcloud plugin: replace __ by _ in private functions
Browse files Browse the repository at this point in the history
  • Loading branch information
franku committed Nov 3, 2020
1 parent 54276c4 commit e91f6a4
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 44 deletions.
20 changes: 10 additions & 10 deletions core/src/plugins/filed/python/libcloud/BareosFdPluginLibcloud.py
Expand Up @@ -97,7 +97,7 @@ def __init__(self, plugindef):
super(BareosFdPluginLibcloud, self).parse_plugin_definition(plugindef)
self.options["fail_on_download_error"] = False
self.options["job_message_after_each_number_of_objects"] = 100
self.__parse_options()
self._parse_options()

self.last_run = datetime.datetime.fromtimestamp(self.since)
self.last_run = self.last_run.replace(tzinfo=None)
Expand All @@ -111,7 +111,7 @@ def __init__(self, plugindef):
self.active = True
self.api = None

def __parse_options(self):
def _parse_options(self):
accurate = bVarAccurate
accurate = bareosfd.GetValue(accurate)
if accurate is None or accurate == 0:
Expand All @@ -123,13 +123,13 @@ def parse_plugin_definition(self, plugindef):
debugmessage(100, "parse_plugin_definition()")
config_filename = self.options.get("config_file")
if config_filename:
if self.__parse_config_file(config_filename):
if self._parse_config_file(config_filename):
return bRC_OK
debugmessage(100, "Could not load configfile %s" % (config_filename))
jobmessage(M_FATAL, "Could not load configfile %s" % (config_filename))
return bRC_Error

def __parse_config_file(self, config_filename):
def _parse_config_file(self, config_filename):
"""
Parse the config file given in the config_file plugin option
"""
Expand All @@ -150,9 +150,9 @@ def __parse_config_file(self, config_filename):
)
return False

return self.__check_config(config_filename)
return self._check_config(config_filename)

def __check_config(self, config_filename):
def _check_config(self, config_filename):
"""
Check the configuration and set or override options if necessary,
considering mandatory: username and password in the [credentials] section
Expand Down Expand Up @@ -276,15 +276,15 @@ def start_backup_job(self):

def end_backup_job(self):
if self.active:
self.__shutdown()
self._shutdown()
return bRC_OK

def check_file(self, fname):
# All existing files/objects are passed to bareos
# If bareos has not seen one, it does not exists anymore
return bRC_Error

def __shutdown(self):
def _shutdown(self):
self.active = False
jobmessage(
M_INFO,
Expand Down Expand Up @@ -320,7 +320,7 @@ def start_backup_file(self, savepkt):
sleep(0.01)

if not self.active:
self.__shutdown()
self._shutdown()
savepkt.fname = "" # dummy value
if error:
jobmessage(M_FATAL, "Shutdown after worker error")
Expand All @@ -347,7 +347,7 @@ def start_backup_file(self, savepkt):
self.FILE = io.open(self.current_backup_task["tmpfile"], "rb")
except Exception as e:
jobmessage(M_FATAL, "Could not open temporary file for reading.")
self.__shutdown()
self._shutdown()
return bRC_Error
elif self.current_backup_task["type"] == TASK_TYPE.STREAM:
try:
Expand Down
10 changes: 5 additions & 5 deletions core/src/plugins/filed/python/libcloud/BareosLibcloudApi.py
Expand Up @@ -43,7 +43,7 @@ def __init__(self, options, last_run, tmp_dir_path):
self.downloaded_objects_queue = Queue(queue_size)
self.message_queue = Queue()

self.__create_tmp_dir()
self._create_tmp_dir()

jobmessage(M_INFO, "Initialize BucketExplorer")

Expand Down Expand Up @@ -151,18 +151,18 @@ def shutdown(self):
self.bucket_explorer.terminate()

try:
self.__remove_tmp_dir()
self._remove_tmp_dir()
except:
pass

jobmessage(M_INFO, "Finished shutdown of worker processes")

def __create_tmp_dir(self):
def _create_tmp_dir(self):
debugmessage(100, "Try to create temporary directory: %s" % (self.tmp_dir_path))
os.makedirs(self.tmp_dir_path)
debugmessage(100, "Created temporary directory: %s" % (self.tmp_dir_path))

def __remove_tmp_dir(self):
def _remove_tmp_dir(self):
debugmessage(100, "Try to remove leftover files from: %s" % (self.tmp_dir_path))
try:
files = glob.glob(self.tmp_dir_path + "/*")
Expand All @@ -186,6 +186,6 @@ def __remove_tmp_dir(self):

def __del__(self):
try:
self.__remove_tmp_dir()
self._remove_tmp_dir()
except:
pass
Expand Up @@ -74,7 +74,7 @@ def run_process(self):

while not self.shutdown_event.is_set():
try:
self.__iterate_over_buckets()
self._iterate_over_buckets()
self.shutdown_event.set()
except Exception as e:
self.error_message("Error while iterating buckets (%s)" % str(e))
Expand All @@ -84,7 +84,7 @@ def run_process(self):
for _ in range(self.number_of_workers):
self.discovered_objects_queue.put(None)

def __iterate_over_buckets(self):
def _iterate_over_buckets(self):
for bucket in self.driver.iterate_containers():
if self.shutdown_event.is_set():
break
Expand All @@ -99,11 +99,11 @@ def __iterate_over_buckets(self):

self.info_message('Exploring bucket "%s"' % (bucket.name,))

self.__generate_tasks_for_bucket_objects(
self._generate_tasks_for_bucket_objects(
self.driver.iterate_container_objects(bucket)
)

def __generate_tasks_for_bucket_objects(self, object_iterator):
def _generate_tasks_for_bucket_objects(self, object_iterator):
for obj in object_iterator:
if self.shutdown_event.is_set():
break
Expand Down
Expand Up @@ -68,7 +68,7 @@ def info_message(self, message):
self.message_queue.put(InfoMessage(self.worker_id, message))

def error_message(self, message, exception=None):
s = self.__format_exception_string(exception)
s = self._format_exception_string(exception)
self.message_queue.put(ErrorMessage(self.worker_id, message + s))

def debug_message(self, level, message):
Expand All @@ -90,5 +90,5 @@ def queue_try_put(self, queue, obj):
continue

@staticmethod
def __format_exception_string(exception):
def _format_exception_string(exception):
return (" (%s)" % str(exception)) if exception != None else ""
Expand Up @@ -70,23 +70,23 @@ def run_process(self):

status = CONTINUE
while status == CONTINUE and not self.shutdown_event.is_set():
status = self.__iterate_input_queue()
status = self._iterate_input_queue()
if status == FINISH_ON_ERROR:
self.abort_message()

def __iterate_input_queue(self):
def _iterate_input_queue(self):
while not self.input_queue.empty():
if self.shutdown_event.is_set():
return FINISH_ON_REQUEST
task = self.input_queue.get()
if task == None: # poison pill
return FINISH_ON_REQUEST
if self.__run_download_task(task) == FINISH_ON_ERROR:
if self._run_download_task(task) == FINISH_ON_ERROR:
return FINISH_ON_ERROR
# try again
return CONTINUE

def __run_download_task(self, task):
def _run_download_task(self, task):
try:
obj = self.driver.get_object(task["bucket"], task["name"])

Expand All @@ -96,34 +96,34 @@ def __run_download_task(self, task):
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except requests.exceptions.ConnectionError as e:
self.error_message(
"Connection error while getting file object, %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except Exception as e:
self.error_message(
"Exception while getting file object, %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()

action = CONTINUE

if task["size"] < 1024 * 10:
action = self.__download_object_into_queue(task, obj)
action = self._download_object_into_queue(task, obj)
elif task["size"] < self.options["prefetch_size"]:
action = self.__download_object_into_tempfile(task, obj)
action = self._download_object_into_tempfile(task, obj)
else: # files larger than self.options["prefetch_size"]
action = self.__put_stream_object_into_queue(task, obj)
action = self._put_stream_object_into_queue(task, obj)

return action

def __download_object_into_queue(self, task, obj):
def _download_object_into_queue(self, task, obj):
try:
self.debug_message(
110, "Put complete file %s into queue" % (task_object_full_name(task)),
Expand All @@ -142,7 +142,7 @@ def __download_object_into_queue(self, task, obj):
task["size"],
),
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()

task["data"] = io.BytesIO(content)
task["type"] = TASK_TYPE.DOWNLOADED
Expand All @@ -154,9 +154,9 @@ def __download_object_into_queue(self, task, obj):
self.error_message(
"Could not download file %s" % (task_object_full_name(task)), e
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()

def __download_object_into_tempfile(self, task, obj):
def _download_object_into_tempfile(self, task, obj):
try:
self.debug_message(
110, "Prefetch object to temp file %s" % (task_object_full_name(task)),
Expand All @@ -174,37 +174,37 @@ def __download_object_into_tempfile(self, task, obj):
"Error downloading object, skipping: %s"
% (task_object_full_name(task))
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except OSError as e:
silentremove(tmpfilename)
self.error_message(
"Could not download to temporary file %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except ObjectDoesNotExistError as e:
silentremove(tmpfilename)
self.error_message("Could not open object, skipping: %s" % e.object_name)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except LibcloudError as e:
silentremove(tmpfilename)
self.error_message(
"Error downloading object, skipping: %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except Exception as e:
silentremove(tmpfilename)
self.error_message(
"Error using temporary file, skipping: %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()

def __put_stream_object_into_queue(self, task, obj):
def _put_stream_object_into_queue(self, task, obj):
try:
self.debug_message(
110,
Expand All @@ -221,16 +221,16 @@ def __put_stream_object_into_queue(self, task, obj):
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()
except Exception as e:
self.error_message(
"Error preparing stream object, skipping: %s"
% (task_object_full_name(task)),
e,
)
return self.__fail_job_or_continue_running()
return self._fail_job_or_continue_running()

def __fail_job_or_continue_running(self):
def _fail_job_or_continue_running(self):
if self.fail_on_download_error:
return FINISH_ON_ERROR
return CONTINUE

0 comments on commit e91f6a4

Please sign in to comment.