Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ psycopg2-binary = "^2.9.5"
sqlalchemy-utils = "^0.38.3"
mimesis = "^6.1.1"
typer = "^0.7.0"
pyyaml = "^6.0"


[tool.poetry.group.dev.dependencies]
black = "^22.10.0"
isort = "^5.10.1"
pylint = "^2.15.8"
mypy = "^0.991"
types-pyyaml = "^6.0.12.4"

[build-system]
requires = ["poetry-core"]
Expand Down
17 changes: 15 additions & 2 deletions sqlsynthgen/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from subprocess import CalledProcessError, run
from sys import stderr
from types import ModuleType
from typing import Any, Optional

import typer
import yaml

from sqlsynthgen.create import create_db_data, create_db_tables
from sqlsynthgen.make import make_generators_from_tables
Expand All @@ -22,6 +24,13 @@ def import_file(file_path: str) -> ModuleType:
return import_module(module_path)


def read_yaml_file(path: str) -> Any:
"""Read a yaml file in to dictionary, given a path."""
with open(path, "r", encoding="utf8") as f:
config = yaml.safe_load(f)
return config


@app.command()
def create_data(
orm_file: str = typer.Argument(...),
Expand All @@ -44,10 +53,14 @@ def create_tables(orm_file: str = typer.Argument(...)) -> None:


@app.command()
def make_generators(orm_file: str = typer.Argument(...)) -> None:
def make_generators(
orm_file: str = typer.Argument(...),
config_file: Optional[str] = typer.Argument(None),
) -> None:
"""Make a SQLSynthGen file of generator classes."""
orm_module = import_file(orm_file)
result = make_generators_from_tables(orm_module)
generator_config = read_yaml_file(config_file) if config_file is not None else {}
result = make_generators_from_tables(orm_module, generator_config)
print(result)


Expand Down
175 changes: 115 additions & 60 deletions sqlsynthgen/make.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,129 @@
"""Functions to make a module of generator classes."""
import inspect
from types import ModuleType
from typing import Final
from typing import Any, Final

from mimesis.providers.base import BaseProvider
from sqlalchemy.sql import sqltypes

HEADER_TEXT: Final[str] = "\n".join(
from sqlsynthgen import providers

HEADER_TEXT: str = "\n".join(
(
'"""This file was auto-generated by sqlsynthgen but can be edited manually."""',
"from mimesis import Generic",
"from mimesis.locales import Locale",
"from sqlsynthgen.providers import BytesProvider, ColumnValueProvider",
"",
"generic = Generic(locale=Locale.EN)",
"generic.add_provider(ColumnValueProvider)",
"generic.add_provider(BytesProvider)",
"",
)
)
for entry_name, entry in inspect.getmembers(providers, inspect.isclass):
if issubclass(entry, BaseProvider) and entry.__module__ == "sqlsynthgen.providers":
HEADER_TEXT += f"\nfrom sqlsynthgen.providers import {entry_name}"
HEADER_TEXT += f"\ngeneric.add_provider({entry_name})"
HEADER_TEXT += "\n"

INDENTATION: Final[str] = " " * 4

SQL_TO_MIMESIS_MAP = {
sqltypes.BigInteger: "generic.numeric.integer_number()",
sqltypes.Boolean: "generic.development.boolean()",
sqltypes.Date: "generic.datetime.date()",
sqltypes.DateTime: "generic.datetime.datetime()",
sqltypes.Float: "generic.numeric.float_number()",
sqltypes.Integer: "generic.numeric.integer_number()",
sqltypes.LargeBinary: "generic.bytes_provider.bytes()",
sqltypes.Numeric: "generic.numeric.float_number()",
sqltypes.String: "generic.text.color()",
sqltypes.Text: "generic.text.color()",
}


def _add_custom_generators(content: str, table_config: dict) -> tuple[str, list[str]]:
"""Add to the generators file, written in the string `content`, the custom
generators for the given table.
"""
generators_config = table_config.get("custom_generators", {})
columns_covered = []
for gen_conf in generators_config:
name = gen_conf["name"]
columns_assigned = gen_conf["columns_assigned"]
args = gen_conf["args"]
if isinstance(columns_assigned, str):
columns_assigned = [columns_assigned]

content += INDENTATION * 2
content += ", ".join(map(lambda x: f"self.{x}", columns_assigned))
try:
columns_covered += columns_assigned
except TypeError:
# Might be a single string, rather than a list of strings.
columns_covered.append(columns_assigned)
content += f" = {name}("
if args is not None:
content += ", ".join(f"{key}={value}" for key, value in args.items())
content += ")\n"
return content, columns_covered


def _add_default_generator(content: str, column: Any) -> str:
"""Add to the generator file `content` a default generator for the given column,
determined by the column's type.
"""
content += INDENTATION * 2
# If it's a primary key column, we presume that primary keys are populated
# automatically.
if column.primary_key:
content += "pass"
# If it's a foreign key column, pull random values from the column it
# references.
elif column.foreign_keys:
if len(column.foreign_keys) > 1:
raise NotImplementedError(
"Can't handle multiple foreign keys for one column."
)
fkey = column.foreign_keys.pop()
fk_schema, fk_table, fk_column = fkey.target_fullname.split(".")
content += (
f"self.{column.name} = "
f"generic.column_value_provider.column_value(dst_db_conn, "
f'"{fk_schema}", "{fk_table}", "{fk_column}"'
")"
)

def make_generators_from_tables(tables_module: ModuleType) -> str:
# Otherwise generate values based on just the datatype of the column.
else:
provider = SQL_TO_MIMESIS_MAP[type(column.type)]
content += f"self.{column.name} = {provider}"
content += "\n"
return content


def _add_generator_for_table(
content: str, table_config: dict, table: Any
) -> tuple[str, str]:
"""Add to the generator file `content` a generator for the given table."""
new_class_name = table.name + "Generator"
if table_config.get("vocabulary_table", False):
raise NotImplementedError("Vocabulary tables currently unimplemented.")

content += (
f"\n\nclass {new_class_name}:\n"
f"{INDENTATION}def __init__(self, src_db_conn, dst_db_conn):\n"
)
content, columns_covered = _add_custom_generators(content, table_config)
for column in table.columns:
if column.name in columns_covered:
# A generator for this column was already covered in the user config.
continue
content = _add_default_generator(content, column)
return content, new_class_name


def make_generators_from_tables(
tables_module: ModuleType, generator_config: dict
) -> str:
"""Creates sqlsynthgen generator classes from a sqlacodegen-generated file.

Args:
Expand All @@ -30,65 +132,18 @@ def make_generators_from_tables(tables_module: ModuleType) -> str:
Returns:
A string that is a valid Python module, once written to file.
"""

new_content = HEADER_TEXT
generator_module_name = generator_config.get("custom_generators_module", None)
if generator_module_name is not None:
new_content += f"\nfrom . import {generator_module_name}"

sorted_generators = "[\n"

sql_to_mimesis_map = {
sqltypes.BigInteger: "generic.numeric.integer_number()",
sqltypes.Boolean: "generic.development.boolean()",
sqltypes.Date: "generic.datetime.date()",
sqltypes.DateTime: "generic.datetime.datetime()",
sqltypes.Float: "generic.numeric.float_number()",
sqltypes.Integer: "generic.numeric.integer_number()",
sqltypes.LargeBinary: "generic.bytes_provider.bytes()",
sqltypes.Numeric: "generic.numeric.float_number()",
sqltypes.String: "generic.text.color()",
sqltypes.Text: "generic.text.color()",
}

for table in tables_module.Base.metadata.sorted_tables:
new_class_name = table.name + "Generator"
sorted_generators += INDENTATION + new_class_name + ",\n"
new_content += (
"\n\nclass "
+ new_class_name
+ ":\n"
+ INDENTATION
+ "def __init__(self, src_db_conn, dst_db_conn):\n"
table_config = generator_config.get("tables", {}).get(table.name, {})
new_content, new_generator_name = _add_generator_for_table(
new_content, table_config, table
)

for column in table.columns:
# We presume that primary keys are populated automatically
if column.primary_key:
new_content += f"{INDENTATION*2}pass\n"

elif column.foreign_keys:
if len(column.foreign_keys) > 1:
raise NotImplementedError("Can't handle multiple foreign keys.")
fkey = column.foreign_keys.pop()
fk_column_path = fkey.target_fullname
fk_schema, fk_table, fk_column = fk_column_path.split(".")
new_content += (
f"{INDENTATION*2}self.{column.name} = "
f"generic.column_value_provider.column_value(dst_db_conn, "
f'"{fk_schema}", "{fk_table}", "{fk_column}"'
")\n"
)

else:
new_content += (
INDENTATION * 2
+ "self."
+ column.name
+ " = "
+ sql_to_mimesis_map[type(column.type)]
+ "\n"
)

sorted_generators += f"{INDENTATION}{new_generator_name},\n"
sorted_generators += "]"

new_content += "\n\n" + "sorted_generators = " + sorted_generators + "\n"

return new_content
Loading