Skip to content
This repository has been archived by the owner on May 9, 2023. It is now read-only.

GeneralClass refactoring and usage changes #406

Merged
merged 3 commits into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 89 additions & 110 deletions general_conf/generalops.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,107 @@
import configparser
from os.path import isfile
import humanfriendly
import humanfriendly # type: ignore
import logging
from general_conf import path_config
from general_conf import path_config # type: ignore
from typing import Dict
logger = logging.getLogger(__name__)


class GeneralClass:

def __init__(self, config=path_config.config_path_file):

def __init__(self, config: str = path_config.config_path_file) -> None:
if isfile(config):
con = configparser.ConfigParser()
con.read(config)

DB = con['MySQL']
self.mysql = DB['mysql']
self.mycnf = DB['mycnf']
self.mysqladmin = DB['mysqladmin']
self.mysql_user = DB['mysql_user']
self.mysql_password = DB['mysql_password']
if 'mysql_socket' in DB:
self.mysql_socket = DB['mysql_socket']
if 'mysql_host' in DB:
self.mysql_host = DB['mysql_host']
if 'mysql_port' in DB:
self.mysql_port = DB['mysql_port']
self.data_dir = DB['datadir']
self.con = configparser.ConfigParser()
self.con.read(config)
else:
logger.critical("Missing config file : {}".format(path_config.config_path_file))

LOG = con['Logging']
if 'log' in LOG:
self.log_level = LOG['log']
if 'log_file_max_bytes' in LOG:
self.log_file_max_bytes = LOG['log_file_max_bytes']
if 'log_file_backup_count' in LOG:
self.log_file_backup_count = LOG['log_file_backup_count']
def mysql_options(self) -> Dict[str, str]:
section = 'MySQL'
return {'mysql': self.con.get(section, 'mysql'),
'mycnf': self.con.get(section, 'mycnf'),
'mysqladmin': self.con.get(section, 'mysqladmin'),
'mysql_user': self.con.get(section, 'mysql_user'),
'mysql_password': self.con.get(section, 'mysql_password'),
'mysql_socket': self.con.get(section, 'mysql_socket'),
'mysql_host': self.con.get(section, 'mysql_host'),
'mysql_port': self.con.get(section, 'mysql_port'),
'data_dir': self.con.get(section, 'data_dir')}

BCK = con['Backup']
self.pid_dir = BCK['pid_dir'] if 'pid_dir' in BCK else '/tmp/'
self.tmpdir = BCK['tmp_dir']
if 'pid_runtime_warning' in BCK:
self.pid_runtime_warning = humanfriendly.parse_timespan(
BCK['pid_runtime_warning'])
self.backupdir = BCK['backup_dir']
self.full_dir = self.backupdir + '/full'
self.inc_dir = self.backupdir + '/inc'
self.backup_tool = BCK['backup_tool']
if 'prepare_tool' in BCK:
self.prepare_tool = BCK['prepare_tool']
if 'xtra_backup' in BCK:
self.xtra_backup = BCK['xtra_backup']
if 'xtra_prepare_options' in BCK:
self.xtra_prepare_options = BCK['xtra_prepare_options']
if 'xtra_options' in BCK:
self.xtra_options = BCK['xtra_options']
if 'full_backup_interval' in BCK:
self.full_backup_interval = humanfriendly.parse_timespan(
BCK['full_backup_interval'])
else:
self.full_backup_interval = 86400
if 'archive_dir' in BCK:
self.archive_dir = BCK['archive_dir']
if 'prepare_archive' in BCK:
self.prepare_archive = BCK['prepare_archive']
if 'move_archive' in BCK:
self.move_archive = BCK['move_archive']
# backward compatible with old config 'max_archive_size' and newer 'archive_max_size'
if 'max_archive_size' in BCK:
self.archive_max_size = humanfriendly.parse_size(BCK['max_archive_size'])
elif 'archive_max_size' in BCK:
self.archive_max_size = humanfriendly.parse_size(BCK['archive_max_size'])
# backward compatible with old config 'max_archive_duration' and newer 'archive_max_duration'
if 'max_archive_duration' in BCK:
self.archive_max_duration = humanfriendly.parse_timespan(BCK['max_archive_duration'])
elif 'archive_max_duration' in BCK:
self.archive_max_duration = humanfriendly.parse_timespan(BCK['archive_max_duration'])
if 'partial_list' in BCK:
self.partial_list = BCK['partial_list']
def logging_options(self) -> Dict[str, str]:
section = 'Logging'
return {'log_level': self.con.get(section, 'log_level'),
'log_file_max_bytes': self.con.get(section, 'log_file_max_bytes'),
'log_file_backup_count': self.con.get(section, 'log_file_backup_count')}

def compression_options(self) -> Dict[str, str]:
section = 'Compress'
return {'compress': self.con.get(section, 'compress'),
'compress_chunk_size': self.con.get(section, 'compress_chunk_size'),
'compress_threads': self.con.get(section, 'compress_threads'),
'decompress': self.con.get(section, 'decompress'),
'remove_original': self.con.get(section, 'remove_original')}

COM = con['Compress']
if 'compress' in COM:
self.compress = COM['compress']
if 'compress_chunk_size' in COM:
self.compress_chunk_size = COM['compress_chunk_size']
if 'compress_threads' in COM:
self.compress_threads = COM['compress_threads']
if 'decompress' in COM:
self.decompress = COM['decompress']
if 'remove_original' in COM:
self.remove_original_comp = COM['remove_original']
def xbstream_options(self) -> Dict[str, str]:
section = 'Xbstream'
return {'xbstream': self.con.get(section, 'xbstream'),
'stream': self.con.get(section, 'stream'),
'xbstream_options': self.con.get(section, 'xbstream_options'),
'xbs_decrypt': self.con.get(section, 'xbs_decrypt')}

ENC = con['Encrypt']
if 'xbcrypt' in ENC:
self.xbcrypt = ENC['xbcrypt']
if 'encrypt' in ENC:
self.encrypt = ENC['encrypt']
if 'encrypt_key' in ENC:
self.encrypt_key = ENC['encrypt_key']
if 'encrypt_key_file' in ENC:
self.encrypt_key_file = ENC['encrypt_key_file']
if 'encrypt_threads' in ENC:
self.encrypt_threads = ENC['encrypt_threads']
if 'encrypt_chunk_size' in ENC:
self.encrypt_chunk_size = ENC['encrypt_chunk_size']
if 'decrypt' in ENC:
self.decrypt = ENC['decrypt']
if 'remove_original' in ENC:
self.remove_original_enc = ENC['remove_original']
def command_options(self) -> Dict[str, str]:
section = 'Commands'
return {'start_mysql_command': self.con.get(section, 'start_mysql_command'),
'stop_mysql_command': self.con.get(section, 'stop_mysql_command'),
'chown_command': self.con.get(section, 'chown_command')}

XBS = con['Xbstream']
if 'xbstream' in XBS:
self.xbstream = XBS['xbstream']
if 'stream' in XBS:
self.stream = XBS['stream']
if 'xbstream_options' in XBS:
self.xbstream_options = XBS['xbstream_options']
if 'xbs_decrypt' in XBS:
self.xbs_decrypt = XBS['xbs_decrypt']
def encryption_options(self) -> Dict[str, str]:
section = 'Encrypt'
return {'xbcrypt': self.con.get(section, 'xbcrypt'),
'encrypt': self.con.get(section, 'encrypt'),
'encrypt_key': self.con.get(section, 'enrypt_key'),
'encrypt_key_file': self.con.get(section, 'encrypt_key_file'),
'encrypt_threads': self.con.get(section, 'encrypt_threads'),
'encrypt_chunk_size': self.con.get(section, 'encrypt_chunk_size'),
'decrypt': self.con.get(section, 'decrypt'),
'remove_original': self.con.get(section, 'remove_original')}

CM = con['Commands']
self.start_mysql = CM['start_mysql_command']
self.stop_mysql = CM['stop_mysql_command']
self.chown_command = CM['chown_command']
def backup_archive_options(self) -> Dict[str, str]:
section = 'Backup'
# backward compatible with old config 'max_archive_size' and newer 'archive_max_size'
archive_max_size = self.con.get(section, 'max_archive_size')
if archive_max_size:
archive_max_size = humanfriendly.parse_size(archive_max_size)
else:
archive_max_size = humanfriendly.parse_size(self.con.get(section, 'archive_max_size'))

# backward compatible with old config 'max_archive_duration' and newer 'archive_max_duration'
archive_max_duration = self.con.get(section, 'max_archive_duration')
if archive_max_duration:
archive_max_duration = humanfriendly.parse_timespan(archive_max_duration)
else:
logger.critical("Missing config file : {}".format(path_config.config_path_file))
archive_max_duration = humanfriendly.parse_timespan(self.con.get(section, 'archive_max_size'))

return {'archive_dir': self.con.get(section, 'archive_dir'),
'prepare_archive': self.con.get(section, 'prepare_archive'),
'move_archive': self.con.get(section, 'move_archive'),
'archive_max_size': archive_max_size,
'archive_max_duration': archive_max_duration
}

def backup_options(self) -> Dict[str, str]:
section = 'Backup'
return {'pid_dir': self.con.get(section, 'pid_dir', fallback='/tmp/'),
'tmp_dir': self.con.get(section, 'tmp_dir'),
'pid_runtime_warning': humanfriendly.parse_timespan(self.con.get(section, 'pid_runtime_warning')),
'backup_dir': self.con.get(section, 'backup_dir'),
'full_dir': self.con.get(section, 'backup_dir') + '/full',
'inc_dir': self.con.get(section, 'backup_dir') + '/inc',
'backup_tool': self.con.get(section, 'backup_tool'),
'prepare_tool': self.con.get(section, 'prepare_tool'),
'xtra_backup': self.con.get(section, 'xtra_backup'),
'xtra_prepare_options': self.con.get(section, 'xtra_prepare_options'),
'xtra_options': self.con.get(section, 'xtra_options'),
'full_backup_interval': humanfriendly.parse_timespan(self.con.get(section, 'full_backup_interval',
fallback=86400)),
'partial_list': self.con.get(section, 'partial_list')}
8 changes: 4 additions & 4 deletions general_conf/path_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is simply place holder for default config file path, which is used in many places.
# If you decide to change the default config path then please edit this file and reinstall tool.
from os.path import expanduser, join
home = expanduser("~")
config_path = join(home, '.autoxtrabackup')
config_path_file = join(config_path, 'autoxtrabackup.cnf')
log_file_path = join(config_path, 'autoxtrabackup.log')
home: str = expanduser("~")
config_path: str = join(home, '.autoxtrabackup')
config_path_file: str = join(config_path, 'autoxtrabackup.cnf')
log_file_path: str = join(config_path, 'autoxtrabackup.log')