/
copy_db_poc.py
150 lines (116 loc) · 4.26 KB
/
copy_db_poc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3
import argparse
import copy
import os
import sys
import structlog
import traceback
from uuid import uuid4
import sqlalchemy
from sqlalchemy import create_engine, select, event
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy.engine import Engine
logger = structlog.get_logger(__name__)
TABLE_PREFIX = "dbin_"
DEFAULT_BATCH_SIZE = 1000
DEFAULT_DB_IN = "postgresql+psycopg2://user:password@127.0.0.1:5432/dbin"
DEFAULT_DB_OUT = "mysql+mysqldb://user:password@127.0.0.1:3306/dbout"
def setup_fixtures(in_engine: Engine) -> None:
"""Install fixtures for testing."""
metadata_in = MetaData()
users = Table(
"users",
metadata_in,
Column(
"id",
sqlalchemy.dialects.postgresql.UUID(as_uuid=True),
primary_key=True,
default=uuid4,
),
Column("num", Integer),
Column("full_name", String),
)
metadata_in.drop_all(in_engine)
metadata_in.create_all(in_engine)
ins = users.insert().values(num=2, full_name="Louis de Funès")
conn = in_engine.connect()
conn.execute(ins)
conn.close()
def get_generic_type(type):
if isinstance(type, sqlalchemy.dialects.postgresql.UUID):
return String(length=36)
try:
new = type.as_generic()
if isinstance(new, String) and not new.length:
# Length is required for VARCHAR
# (turns out we use the same length as Airbyte)
new.length = 512
return new
except NotImplementedError:
traceback.print_exc()
return type
def copy_table(
table: Table,
*,
in_engine: Engine,
out_engine: Engine,
batch_size: int = DEFAULT_BATCH_SIZE,
) -> None:
"""Copy a table."""
log = logger.bind(table_name=table.name)
out_table = copy.copy(table)
out_table.name = f"{TABLE_PREFIX}{table.name}"
# Do not copy constraints
out_table.constraints = set([])
# Do not copy indexes
out_table.indexes = set([])
out_table.drop(out_engine, checkfirst=True)
log.info("created_table")
out_table.create(out_engine)
with in_engine.connect() as conn_in:
with out_engine.connect() as conn_out:
stmt = select(table)
# stream_results does not work for all db dialects
for i, r in enumerate(
conn_in.execution_options(stream_results=True).execute(stmt)
):
# TODO: could use batched queries with bound params, see
# sqlalchemy's doc
ins = out_table.insert().values(**r)
conn_out.execute(ins)
if i and i % 1000 == 0:
log.info("inserted", n=i)
def copy_db(in_db_url: str, out_db_url: str) -> None:
"""Copy the db to its destination"""
in_engine = create_engine(in_db_url, connect_args={"connect_timeout": 10})
out_engine = create_engine(out_db_url, connect_args={"connect_timeout": 10})
metadata = MetaData()
@event.listens_for(metadata, "column_reflect")
def genericize_datatypes(inspector, table, column_dict):
previously = column_dict["type"]
# No need for default value (such as nextval(...))
del column_dict["default"]
column_dict["type"] = get_generic_type(previously)
logger.info(
"reflected type",
table=table.name,
column=column_dict["name"],
previous=previously,
new=column_dict["type"],
)
metadata.reflect(bind=in_engine)
for t in reversed(metadata.sorted_tables):
copy_table(t, in_engine=in_engine, out_engine=out_engine)
def main(should_install_fixtures: bool = False) -> int:
in_db_url = os.environ.get("DB_IN", DEFAULT_DB_IN)
out_db_url = os.environ.get("DB_OUT", DEFAULT_DB_OUT)
logger.info(f"copy db from={in_db_url} to={out_db_url}")
if should_install_fixtures:
setup_fixtures(in_db_url=in_db_url)
copy_db(in_db_url=in_db_url, out_db_url=out_db_url)
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="copy db to destination")
parser.add_argument("--fixtures", action="store_true", help="install fixtures")
args = parser.parse_args()
sys.exit(main(should_install_fixtures=args.fixtures))