In [1]:
import json

In [2]:
def load_catalog(file):
    with open(file) as f:
        catalog = json.load(f)
    print(f"From catalog {file}:")
    print("--------------------")
    print(json.dumps(catalog, sort_keys=True, indent=4))
    print("--------------------")
    return catalog

catalog = load_catalog("../sample_files/catalog.json")
github = load_catalog("../sample_files/catalog_github.json")
stripe = load_catalog("../sample_files/catalog_stripe.json")

From catalog ../sample_files/catalog.json:
--------------------
{
    "streams": [
        {
            "json_schema": {
                "$id": "http://example.com/example.json",
                "$schema": "http://json-schema.org/draft-07/schema",
                "additionalProperties": true,
                "default": {},
                "description": "The root schema comprises the entire JSON document.",
                "examples": [
                    {
                        "Author": "Mary Cadogan",
                        "Description": "Combine a few key Christmas flavours here to make a pie that both children and adults will adore",
                        "Ingredients": [
                            {
                                "ingredient": "olive oil",
                                "nutritionfacts": {
                                    "calories": 119,
                                    "fat": "13.5g"
                                },
                          

In [14]:
def is_string(property_type) -> bool:
    return property_type == "string" or "string" in property_type

def is_integer(property_type) -> bool:
    return property_type == "integer" or "integer" in property_type

def is_boolean(property_type) -> bool:
    return property_type == "boolean" or "boolean" in property_type
    
def is_array(property_type) -> bool:
    return property_type == "array" or "array" in property_type

def is_object(property_type) -> bool:
    return property_type == "object" or "object" in property_type

def find_combining_schema(properties: dict):
    return set(properties).intersection(set(["anyOf", "oneOf", "allOf"]))
    
def json_extract_base_property(path: str, json_col: str, name: str, definition: dict) -> str:
    current = ".".join([path, name])
    if not "type" in definition:
        return None
    elif is_string(definition["type"]):
        return f"cast(json_extract_scalar({json_col}, '{current}') as string) as {name}"
    elif is_integer(definition["type"]):
        return f"cast(json_extract_scalar({json_col}, '{current}') as int64) as {name}"
    elif is_boolean(definition["type"]):
        return f"cast(json_extract_scalar({json_col}, '{current}') as boolean) as {name}"
    else:
        return None
    
def json_extract_nested_property(path: str, json_col: str, name: str, definition: dict) -> str:
    current = ".".join([path, name])    
    if definition == None or not "type" in definition:
        return (None, None)
    elif is_array(definition["type"]):
        return (
            f"json_extract_array({json_col}, '{current}') as {name}",
            f"cross join unnest({name}) as {name}"
        )
    elif is_object(definition["type"]):
        return (
            f"json_extract({json_col}, '{current}') as {name}",
            ""
        )
    else:
        return (None, None)


In [25]:
def select_table(table: str, columns="*"):
    return f"\nselect {columns} from {table}"

def extract_node_properties(path: str, json_col: str, properties: dict) -> dict:
    result = {}
    if properties:
        for field in properties.keys():
            sql_field = json_extract_base_property(path=path, json_col=json_col, name=field, definition=properties[field])
            if sql_field:
                result[field] = sql_field    
    return result

def find_properties_object(path: str, field: str, properties) -> dict:    
    if isinstance(properties, str) or isinstance(properties, int):
        return None    
    else:        
        if "items" in properties:
            return find_properties_object(path, field, properties["items"])
        elif "properties" in properties:        
            # we found a properties object
            return {field: properties['properties']}
        elif "type" in properties and json_extract_base_property(path=path, json_col="", name="", definition=properties):        
            # we found a basic type        
            return {field: None}
        elif isinstance(properties, dict):        
            for key in properties.keys():
                if not json_extract_base_property(path, "", key, properties[key]):
                    child = find_properties_object(path, key, properties[key])
                    if child:
                        return child
        elif isinstance(properties, list):        
            for item in properties:
                child = find_properties_object(path=path, field=field, properties=item)
                if child:
                    return child    
    return None
    
def extract_nested_properties(path: str, json_col: str, field: str, properties: dict) -> dict:
    result = {}
    if properties:                
        for key in properties.keys():
            combining = find_combining_schema(properties[key])
            if combining:                
                # skip combining schemas
                for combo in combining:
                    found = find_properties_object(path=f"{path}.{field}.{key}", field=key, properties=properties[key][combo])                    
                    result.update(found) 
            elif not "type" in properties[key]:                
                pass
            elif is_array(properties[key]['type']): 
                combining = find_combining_schema(properties[key]['items'])
                if combining:
                    # skip combining schemas
                    for combo in combining:
                        found = find_properties_object(path=f"{path}.{key}", field=key, properties=properties[key]['items'][combo])
                        result.update(found)            
                else:
                    found = find_properties_object(path=f"{path}.{key}", field=key, properties=properties[key]['items'])              
                    result.update(found)            
            elif is_object(properties[key]['type']):            
                found = find_properties_object(path=f"{path}.{key}", field=key, properties=properties[key])              
                result.update(found)            
    return result

def process_node(path: str, json_col: str, name: str, properties: dict, from_table: str = "", previous="with ", inject_cols="") -> dict:
    result = {}
    if previous == "with ":
        prefix = previous
    else:
        prefix = previous + ","
    node_properties = extract_node_properties(path=path, json_col=json_col, properties=properties)
    node_columns = ',\n    '.join([sql for sql in node_properties.values()])
    # FIXME: use DBT macros to be cross_db compatible instead
    hash_node_columns = 'coalesce(cast(' + ' as string), ""),\n      coalesce(cast('.join([column for column in node_properties.keys()]) + ' as string), "")'    
    node_sql = f"""{prefix}
{name}_node as (
  select     
    {inject_cols}
    {node_columns}
  from {from_table}
),
{name}_with_id as (
  select
    *,
    to_hex(md5(concat(
      {hash_node_columns}
    ))) as _{name}_hashid
  from {name}_node
)"""
    # SQL Query for current node's basic properties
    result[name] = node_sql + select_table(f"{name}_with_id")

    children_columns = extract_nested_properties(path=path, json_col=json_col, field=name, properties=properties)
    if children_columns:            
        for col in children_columns.keys():              
            child_col, join_child_table = json_extract_nested_property(path=path, json_col=json_col, name=col, definition=properties[col])
            child_sql = f"""{prefix}
{name}_node as (
  select     
    {child_col},
    {node_columns}
  from {from_table}
),
{name}_with_id as (
  select
    to_hex(md5(concat(
      {hash_node_columns}
    ))) as _{name}_hashid,
    {col}
  from {name}_node
  {join_child_table}
)"""
            if children_columns[col]:
                children = process_node(path="$", json_col=col, name=f"{name}_{col}", properties=children_columns[col], from_table=f"{name}_with_id", previous=child_sql, inject_cols=f"_{name}_hashid as _{name}_foreign_hashid,")
                result.update(children)
            else:
                # SQL Query for current node's basic properties
                result[f"{name}_{col}"] = child_sql + select_table(f"{name}_with_id", columns=f"""
  _{name}_hashid as _{name}_foreign_hashid,
  {col}
""")
    return result

def generate_dbt_model(catalog: dict, json_col: str, from_table: str) -> dict:
    result = {}
    for obj in catalog["streams"]:
        name = obj['name']
        if "json_schema" in obj:
            properties = obj['json_schema']['properties']
        elif "schema" in obj:
            properties = obj['schema']['properties']
        result.update(process_node(path="$", json_col=json_col, name=name, properties=properties, from_table=from_table))
    return result

def print_result(result):
    for name in result.keys():
        print(f"In File {name}.sql:")
        print("--------------------")
        print(result[name])
        print("--------------------")

print_result(generate_dbt_model(catalog=catalog, json_col="json_blob", from_table="`airbytesandbox.data.one_recipe_json`"))

In File one_recipe.sql:
--------------------
with 
one_recipe_node as (
  select     
    
    cast(json_extract_scalar(json_blob, '$.Name') as string) as Name,
    cast(json_extract_scalar(json_blob, '$.url') as string) as url,
    cast(json_extract_scalar(json_blob, '$.Description') as string) as Description,
    cast(json_extract_scalar(json_blob, '$.Author') as string) as Author
  from `airbytesandbox.data.one_recipe_json`
),
one_recipe_with_id as (
  select
    *,
    to_hex(md5(concat(
      coalesce(cast(Name as string), ""),
      coalesce(cast(url as string), ""),
      coalesce(cast(Description as string), ""),
      coalesce(cast(Author as string), "")
    ))) as _one_recipe_hashid
  from one_recipe_node
)
select * from one_recipe_with_id
--------------------
In File one_recipe_Ingredients.sql:
--------------------
with 
one_recipe_node as (
  select     
    json_extract_array(json_blob, '$.Ingredients') as Ingredients,
    cast(json_extract_scalar(json_blob, '$.Name') as s

In [23]:
print_result(generate_dbt_model(catalog=stripe, json_col="json_blob", from_table="`airbytesandbox.data.stripe_json`"))

In File customers.sql:
--------------------
with 
customers_node as (
  select     
    
    cast(json_extract_scalar(json_blob, '$.delinquent') as boolean) as delinquent,
    cast(json_extract_scalar(json_blob, '$.description') as string) as description,
    cast(json_extract_scalar(json_blob, '$.livemode') as boolean) as livemode,
    cast(json_extract_scalar(json_blob, '$.default_source') as string) as default_source,
    cast(json_extract_scalar(json_blob, '$.email') as string) as email,
    cast(json_extract_scalar(json_blob, '$.default_card') as string) as default_card,
    cast(json_extract_scalar(json_blob, '$.account_balance') as int64) as account_balance,
    cast(json_extract_scalar(json_blob, '$.currency') as string) as currency,
    cast(json_extract_scalar(json_blob, '$.id') as string) as id,
    cast(json_extract_scalar(json_blob, '$.invoice_prefix') as string) as invoice_prefix,
    cast(json_extract_scalar(json_blob, '$.tax_info_verification') as string) as tax_info_ve

In [24]:
print_result(generate_dbt_model(catalog=github, json_col="json_blob", from_table="`airbytesandbox.data.github_json`"))

In File commits.sql:
--------------------
with 
commits_node as (
  select     
    
    cast(json_extract_scalar(json_blob, '$._sdc_repository') as string) as _sdc_repository,
    cast(json_extract_scalar(json_blob, '$.sha') as string) as sha,
    cast(json_extract_scalar(json_blob, '$.url') as string) as url,
    cast(json_extract_scalar(json_blob, '$.html_url') as string) as html_url,
    cast(json_extract_scalar(json_blob, '$.comments_url') as string) as comments_url,
    cast(json_extract_scalar(json_blob, '$.pr_number') as int64) as pr_number,
    cast(json_extract_scalar(json_blob, '$.pr_id') as string) as pr_id,
    cast(json_extract_scalar(json_blob, '$.id') as string) as id
  from `airbytesandbox.data.github_json`
),
commits_id as (
  select
    *,
    to_hex(md5(concat(
      coalesce(cast(_sdc_repository as string), ""),
      coalesce(cast(sha as string), ""),
      coalesce(cast(url as string), ""),
      coalesce(cast(html_url as string), ""),
      coalesce(cast(comment