Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 136 additions & 127 deletions data_diff/__main__.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,138 @@
from multiprocessing.sharedctypes import Value
import sys
import time
import logging
from itertools import islice

from .diff_tables import TableSegment, TableDiffer
from .database import connect_to_uri
from .parse_time import parse_time_before_now, UNITS_STR, ParseError

import click

LOG_FORMAT = "[%(asctime)s] %(levelname)s - %(message)s"
DATE_FORMAT = "%H:%M:%S"


@click.command()
@click.argument("db1_uri")
@click.argument("table1_name")
@click.argument("db2_uri")
@click.argument("table2_name")
@click.option("-k", "--key-column", default="id", help="Name of primary key column")
@click.option("-t", "--update-column", default=None, help="Name of updated_at/last_updated column")
@click.option("-c", "--columns", default=[], multiple=True, help="Names of extra columns to compare")
@click.option("-l", "--limit", default=None, help="Maximum number of differences to find")
@click.option("--bisection-factor", default=32, help="Segments per iteration")
@click.option("--bisection-threshold", default=1024**2, help="Minimal bisection threshold")
@click.option(
"--min-age",
default=None,
help="Considers only rows older than specified. "
"Example: --min-age=5min ignores rows from the last 5 minutes. "
f"\nValid units: {UNITS_STR}",
)
@click.option("--max-age", default=None, help="Considers only rows younger than specified. See --min-age.")
@click.option("-s", "--stats", is_flag=True, help="Print stats instead of a detailed diff")
@click.option("-d", "--debug", is_flag=True, help="Print debug info")
@click.option("-v", "--verbose", is_flag=True, help="Print extra info")
@click.option("-i", "--interactive", is_flag=True, help="Confirm queries, implies --debug")
@click.option("-j", "--threads", default=None, help="Number of threads to use. 1 means no threading. Auto if not specified.")
def main(
db1_uri,
table1_name,
db2_uri,
table2_name,
key_column,
update_column,
columns,
limit,
bisection_factor,
bisection_threshold,
min_age,
max_age,
stats,
debug,
verbose,
interactive,
threads,
):
if limit and stats:
print("Error: cannot specify a limit when using the -s/--stats switch")
return
if interactive:
debug = True

if debug:
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
elif verbose:
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)

if threads is not None:
threads = int(threads)
if threads < 1:
logging.error("Error: threads must be >= 1")
return

db1 = connect_to_uri(db1_uri, threads)
db2 = connect_to_uri(db2_uri, threads)

if interactive:
db1.enable_interactive()
db2.enable_interactive()

start = time.time()

try:
options = dict(
min_time=min_age and parse_time_before_now(min_age), max_time=max_age and parse_time_before_now(max_age)
)
except ParseError as e:
logging.error("Error while parsing age expression: %s" % e)
return

table1 = TableSegment(db1, (table1_name,), key_column, update_column, columns, **options)
table2 = TableSegment(db2, (table2_name,), key_column, update_column, columns, **options)

differ = TableDiffer(
bisection_factor=bisection_factor,
bisection_threshold=bisection_threshold,
debug=debug,
threaded=threads != 1,
max_threadpool_size=threads,
)
diff_iter = differ.diff_tables(table1, table2)

if limit:
diff_iter = islice(diff_iter, int(limit))

if stats:
diff = list(diff_iter)
from multiprocessing.sharedctypes import Value
import sys
import time
import logging
from itertools import islice

from .diff_tables import TableSegment, TableDiffer
from .database import connect_to_uri
from .parse_time import parse_time_before_now, UNITS_STR, ParseError

import rich
import click

LOG_FORMAT = "[%(asctime)s] %(levelname)s - %(message)s"
DATE_FORMAT = "%H:%M:%S"

COLOR_SCHEME = {
"+": "green",
"-": "red",
}


@click.command()
@click.argument("db1_uri")
@click.argument("table1_name")
@click.argument("db2_uri")
@click.argument("table2_name")
@click.option("-k", "--key-column", default="id", help="Name of primary key column")
@click.option("-t", "--update-column", default=None, help="Name of updated_at/last_updated column")
@click.option("-c", "--columns", default=[], multiple=True, help="Names of extra columns to compare")
@click.option("-l", "--limit", default=None, help="Maximum number of differences to find")
@click.option("--bisection-factor", default=32, help="Segments per iteration")
@click.option("--bisection-threshold", default=1024**2, help="Minimal bisection threshold")
@click.option(
"--min-age",
default=None,
help="Considers only rows older than specified. "
"Example: --min-age=5min ignores rows from the last 5 minutes. "
f"\nValid units: {UNITS_STR}",
)
@click.option("--max-age", default=None, help="Considers only rows younger than specified. See --min-age.")
@click.option("-s", "--stats", is_flag=True, help="Print stats instead of a detailed diff")
@click.option("-d", "--debug", is_flag=True, help="Print debug info")
@click.option("-v", "--verbose", is_flag=True, help="Print extra info")
@click.option("-i", "--interactive", is_flag=True, help="Confirm queries, implies --debug")
@click.option(
"-j", "--threads", default=None, help="Number of threads to use. 1 means no threading. Auto if not specified."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the default 1 thread? What does auto mean? I think it should be 1 so that the output is nice and expected in the default case. It's also far safer on a relational database where much more could really have an impact (especially with the current queries).

Sorry I know this is not strictly related here, but I must've missed this on the other PR. I thought I read for this when I reviewed the threading PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss it on slack.

)
def main(
db1_uri,
table1_name,
db2_uri,
table2_name,
key_column,
update_column,
columns,
limit,
bisection_factor,
bisection_threshold,
min_age,
max_age,
stats,
debug,
verbose,
interactive,
threads,
):
if limit and stats:
print("Error: cannot specify a limit when using the -s/--stats switch")
return
if interactive:
debug = True

if debug:
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
elif verbose:
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)

if threads is not None:
threads = int(threads)
if threads < 1:
logging.error("Error: threads must be >= 1")
return

db1 = connect_to_uri(db1_uri, threads)
db2 = connect_to_uri(db2_uri, threads)

if interactive:
db1.enable_interactive()
db2.enable_interactive()

start = time.time()

try:
options = dict(
min_time=min_age and parse_time_before_now(min_age), max_time=max_age and parse_time_before_now(max_age)
)
except ParseError as e:
logging.error("Error while parsing age expression: %s" % e)
return

table1 = TableSegment(db1, (table1_name,), key_column, update_column, columns, **options)
table2 = TableSegment(db2, (table2_name,), key_column, update_column, columns, **options)

differ = TableDiffer(
bisection_factor=bisection_factor,
bisection_threshold=bisection_threshold,
debug=debug,
threaded=threads != 1,
max_threadpool_size=threads,
)
diff_iter = differ.diff_tables(table1, table2)

if limit:
diff_iter = islice(diff_iter, int(limit))

if stats:
diff = list(diff_iter)
unique_diff_count = len({i[0] for _, i in diff})
percent = 100 * unique_diff_count / table1.count
print(f"Diff-Total: {len(diff)} changed rows out of {table1.count}")
print(f"Diff-Percent: {percent:.4f}%")
plus = len([1 for op, _ in diff if op == "+"])
minus = len([1 for op, _ in diff if op == "-"])
print(f"Diff-Split: +{plus} -{minus}")
else:
for op, key in diff_iter:
print(op, key)
sys.stdout.flush()

end = time.time()

logging.info(f"Duration: {end-start:.2f} seconds.")


if __name__ == "__main__":
main()
print(f"Diff-Total: {len(diff)} changed rows out of {table1.count}")
print(f"Diff-Percent: {percent:.4f}%")
plus = len([1 for op, _ in diff if op == "+"])
minus = len([1 for op, _ in diff if op == "-"])
print(f"Diff-Split: +{plus} -{minus}")
else:
for op, key in diff_iter:
color = COLOR_SCHEME[op]
rich.print(f"[{color}]{op} {key!r}[/{color}]")
sys.stdout.flush()

end = time.time()

logging.info(f"Duration: {end-start:.2f} seconds.")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ python = "^3.7"
runtype = "^0.2.4"
dsnparse = "*"
click = "^8.1"
rich = "^10.16.2"


preql = { version = "^0.2.13", optional = true }
psycopg2 = { version = "*", optional = true }
Expand Down