In [1]:
import re
from collections import namedtuple

In [2]:
from google.cloud.bigquery import Client, Dataset, Table

In [3]:
from bigquery_erd.models import Column, Table, Relation
from bigquery_erd.main import intermediary_to_schema, intermediary_to_markdown

In [17]:
# _DATASET_ID = "test-project-jjagusch.test_dataset"
_DATASET_ID = "futbin-scraper-294615.futbin"

In [18]:
def get_tables(self, dataset):
    return [self.get_table(table) for table in self.list_tables(dataset)]

Client.get_tables = get_tables

In [19]:
client = Client()



In [26]:
tables = client.get_tables(_DATASET_ID)

In [21]:
BQColumn = namedtuple("BQColumn", ("name", "field_type", "mode", "description"))

def walk_columns(fields, name_prefix=""):
    for col in fields:
        name = ".".join((name_prefix, col.name)) if name_prefix else col.name
        yield BQColumn(name, col.field_type, col.mode, col.description)
        if col.fields:
            # new_name_prefix = ".".join((name_prefix, col.name)) if name_prefix else col.name
            for col in walk_columns(col.fields, name):
                yield col

In [28]:
_PATTERN = re.compile(r"^->\s([?*+1 ]:[?*+1 ]\s)?(.*\.)?(.*)\.(.*)$")
_DEFAULT_CARDINALITY = ("*", "1")

def _process_relation(column_description, right_dataset, right_table):
    if not column_description:
        return
    result = re.search(_PATTERN, column_description)
    if not result:
        return
    cardinality = result.group(1)
    cardinality = tuple(cardinality.strip().split(":")) if cardinality else _DEFAULT_CARDINALITY
    left_dataset = result.group(2)
    left_dataset = left_dataset.strip(".") if left_dataset else right_dataset
    left_table = result.group(3)
    return Relation(
        left_col=f"{left_dataset}.{left_table}",
        right_col=f"{right_dataset}.{right_table}",
        left_cardinality=cardinality[1],
        right_cardinality=cardinality[0],
    )

In [29]:
def _process_column_type(column):
    mode = column.mode
    if mode:
        return f"{mode}({column.field_type})"
    return column.field_type

def _process_column_is_key(column):
    return False

def _process_table(table):
    columns = [Column(col.name, _process_column_type(col), _process_column_is_key(col)) for col in walk_columns(table.schema)]
    table = Table(f"{table.dataset_id}.{table.table_id}", columns)
    return table

def _process_tables(tables):
    tables_ = [_process_table(table) for table in tables]
    relations = [_process_relation(col.description, table.dataset_id, table.table_id) for table in tables for col in walk_columns(table.schema)]
    relations = [relation for relation in relations if relation]
    return tables_, relations

In [35]:
intermediary_to_schema(*_process_tables([table for table in tables if table.table_id.endswith("latest")]), "test.png")

In [36]:
intermediary_to_markdown(*_process_tables([table for table in tables if table.table_id.endswith("latest")]), "test.md")