forked from anubia/py_pg_tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replicator.py
133 lines (110 loc) · 5.17 KB
/
replicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
from const.const import Messenger as Msg
from const.const import Queries
from date_tools.date_tools import DateTools
from logger.logger import Logger
class Replicator:
# An object with connection parameters to connect to PostgreSQL
connecter = None
logger = None # Logger to show and log some messages
new_dbname = '' # Name of the copy
original_dbname = '' # Name of the original database
def __init__(self, connecter=None, new_dbname='', original_dbname='',
logger=None):
if logger:
self.logger = logger
else:
self.logger = Logger()
if connecter:
self.connecter = connecter
else:
self.logger.stop_exe(Msg.NO_CONNECTION_PARAMS)
# First check whether the name of the copy already exists in PostgreSQL
self.connecter.cursor.execute(Queries.PG_DB_EXISTS, (new_dbname, ))
# Do not replicate if the name already exists
result = self.connecter.cursor.fetchone()
if result:
msg = Msg.DB_ALREADY_EXISTS.format(dbname=new_dbname)
self.logger.stop_exe(msg)
if new_dbname:
self.new_dbname = new_dbname
else:
self.logger.stop_exe(Msg.NO_NEW_DBNAME)
# First check whether the name of the source exists in PostgreSQL
self.connecter.cursor.execute(Queries.PG_DB_EXISTS,
(original_dbname, ))
result = self.connecter.cursor.fetchone()
if not result:
msg = Msg.DB_DOES_NOT_EXIST.format(dbname=original_dbname)
self.logger.stop_exe(msg)
if original_dbname:
self.original_dbname = original_dbname
else:
self.logger.stop_exe(Msg.NO_ORIGINAL_DBNAME)
msg = Msg.REPLICATOR_VARS.format(server=self.connecter.server,
user=self.connecter.user,
port=self.connecter.port,
original_dbname=self.original_dbname,
new_dbname=self.new_dbname)
self.logger.debug(Msg.REPLICATOR_VARS_INTRO)
self.logger.debug(msg)
def replicate_pg_db(self):
'''
Target:
- clone a specified database in PostgreSQL.
'''
try:
pg_pid = self.connecter.get_pid_str()
formatted_sql = Queries.BACKEND_PG_DB_EXISTS.format(
pg_pid=pg_pid, target_db=self.original_dbname)
self.connecter.cursor.execute(formatted_sql)
result = self.connecter.cursor.fetchone()
if result:
msg = Msg.ACTIVE_CONNS_ERROR.format(
dbname=self.original_dbname)
self.logger.stop_exe(msg)
formatted_query_clone_pg_db = Queries.CLONE_PG_DB.format(
dbname=self.new_dbname, original_dbname=self.original_dbname,
user=self.connecter.user)
msg = Msg.BEGINNING_REPLICATOR.format(
original_dbname=self.original_dbname)
self.logger.highlight('info', msg, 'white')
# Get the database's "datallowconn" value
datallowconn = self.connecter.get_datallowconn(
self.original_dbname)
# If datallowconn is allowed, change it temporarily
if datallowconn:
# Disallow connections to the database during the
# process
result = self.connecter.disallow_db_conn(self.original_dbname)
if not result:
msg = Msg.DISALLOW_CONN_TO_PG_DB_FAIL.format(
dbname=self.original_dbname)
self.logger.highlight('warning', msg, 'yellow')
# self.connecter.cursor.execute('commit')
start_time = DateTools.get_current_datetime()
# Replicate the database
self.connecter.cursor.execute(formatted_query_clone_pg_db)
end_time = DateTools.get_current_datetime()
# Get and show the process' duration
diff = DateTools.get_diff_datetimes(start_time, end_time)
# If datallowconn was allowed, leave it as it was
if datallowconn:
# Allow connections to the database at the end of
# the process
result = self.connecter.allow_db_conn(self.original_dbname)
if not result:
msg = Msg.ALLOW_CONN_TO_PG_DB_FAIL.format(
dbname=self.original_dbname)
self.logger.highlight('warning', msg, 'yellow')
msg = Msg.REPLICATE_DB_DONE.format(
new_dbname=self.new_dbname,
original_dbname=self.original_dbname, diff=diff)
self.logger.highlight('info', msg, 'green')
self.logger.highlight('info', Msg.REPLICATOR_DONE, 'green',
effect='bold')
except Exception as e:
self.logger.debug('Error en la función "clone_pg_db": '
'{}.'.format(str(e)))
self.logger.stop_exe(Msg.REPLICATE_DB_FAIL)