In [None]:
# first import all the relevant dbt packages

# Question: what's the difference between
# from dbt import project
# import dbt.project
# Apart from the fact that you reference them differently?

import dbt
import dbt.project
import dbt.config
import dbt.parser
import dbt.compilation


In [None]:
# generate a list of model fqns to be checked against 
project = dbt.project.read_project("dbt_project.yml")

model1 = dbt.parser.get_fqn('models/adwords/adwords_ads.sql', project)
model2 = dbt.parser.get_fqn('models/snowplow/base/snowplow_events.sql', project)

models = [model1, model2]
models

# To do: use the get_nodes function to generate a list of all fqns (couldn't get this to work)

In [None]:
# generate a list of PQNs (partially qualified names?) for each model path specified in the project yml file
# I've used the word "config" here, but it's not really the right word.
ModelConfigKeys = [
        'schema',
        'enabled',
        'materialized',
        'dist',
        'sort',
        'sql_where',
        'unique_key',
        'sort_type',
        'bind',
        'vars'
    ]
# Want to return this as filepaths (or close to), and only return up to when a reserved word is hit
model_config_pqns = []

# traverse each path in the config file
# stop when you hit one of the above keywords
# and add the list of keys to the model_config_pqns list

def get_model_config_pqn(pqn, config_models):
    # why can't I have model_config_pqns = [] here?
    for k,v in config_models.items():
        # If the next level is a dictionary
        if isinstance(v,dict):
            # If the key is a config key, add the list of keys to the model_config_pqns list
            if k in ModelConfigKeys: 
                if pqn not in model_config_pqns and pqn:
                    model_config_pqns.append(pqn)
            # Else, keep iterating
            else:
                get_model_config_pqn(pqn + [k], v)
        # If you've reached the end of the path, add the path
        else:
            if pqn not in model_config_pqns and pqn: # This should remove duplicates
                model_config_pqns.append(pqn)

    return model_config_pqns
            
model_config_pqns = get_model_config_pqn([],project['models']) 
model_config_pqns


In [None]:
def check_config_pqns(model_config_pqns, model_fqns):
    for pqn in model_config_pqns:
        for fqn in model_fqns:
            if is_a_in_x(pqn, fqn):
                print(":WOO: Your config " + str(pqn) + " is valid")
                break
            print("Your config " + str(pqn) + " doesn't point to a model")
check_config_pqns(model_config_pqns, model_fqns)

In [None]:
results = dbt.compilation.Compiler(project)

# for each node, get_fqn
def get_nodes(package_name, root_project, all_projects, root_dir,
                       relative_dirs, resource_type):
    extension = "[!.#~]*.sql"

    if dbt.flags.STRICT_MODE:
        dbt.contracts.project.validate_list(all_projects)

    file_matches = dbt.clients.system.find_matching(
        root_dir,
        relative_dirs,
        extension)

    result = []

    for file_match in file_matches:
        file_contents = dbt.clients.system.load_file_contents(
            file_match.get('absolute_path'))

        parts = dbt.utils.split_path(file_match.get('relative_path', ''))
        name, _ = os.path.splitext(parts[-1])

        if resource_type == NodeType.Test:
            path = dbt.utils.get_pseudo_test_path(
                name, file_match.get('relative_path'), 'data_test')
        elif resource_type == NodeType.Analysis:
            path = os.path.join('analysis', file_match.get('relative_path'))
        else:
            path = file_match.get('relative_path')

        original_file_path = os.path.join(
            file_match.get('searched_path'),
            path)

        result.append({
            'name': name,
            'root_path': root_dir,
            'resource_type': resource_type,
            'path': path,
            'original_file_path': original_file_path,
            'package_name': package_name,
            'raw_sql': file_contents
        })

package_name='my_package_name',
root_project=project,
all_projects=results.get_all_projects(),
root_dir=project.get('project-root'),
relative_dirs=project.get('source-paths', []),
resource_type=NodeType.Model

nodes = get_nodes(package_name, 
                  root_project, 
                  all_projects, 
                  root_dir,
                  relative_dirs,
                  resource_type)
# for node in nodes:
#     print(node)

In [None]:
A = ['a', 'b', 'e']
X = ['a', 'b', 'c', 'd']
def is_a_in_x(A,X):
    for item in A:
        # check that the item exists in the list
        if item in X:
            # if it does, then update the list to that it now only contains item after that item
            X = X[X.index(item)+1:]
        else:
            # if it doesn't then return false and exit the loop
            return False
            break
    # if the loop doesn't get broken, turn True
    return True
        
# these should return true
print(is_a_in_x(['a', 'b'], ['a', 'b', 'c', 'd']))
print(is_a_in_x(['a', 'c'], ['a', 'b', 'c', 'd']))
print(is_a_in_x(['b', 'd'], ['a', 'b', 'c', 'd']))

# these should return false
print(is_a_in_x(['a', 'b', 'e'], ['a', 'b', 'c', 'd']))
print(is_a_in_x(['a', 'a', 'b'], ['a', 'b', 'c', 'd']))
print(is_a_in_x(['b', 'a'], ['a', 'b', 'c', 'd']))


In [None]:

for model in models:
    if is_a_in_x(package_name, model):
        print(True)
        break
    