### Automated Model Renaming and File Update in dbt Projects

*Do you have a lot of legacy models that don't follow best naming practices?*

This script streamlines the process of updating model names and related files within a dbt (Data Build Tool) project. 



- **Prefix Enforcement:** Identifies models in a specific layer that do not adhere to the intended prefix convention.
- **Model Renaming:** Automatically renames models to align with the latest best practices.
- **File Synchronization:** Updates the names and contents of corresponding `.yml` and `.md` files (if they exist).
- **Reference Management:** Adjusts references in downstream models to ensure they point to the newly renamed models.

**Important Considerations:**

- The script only processes `.yml` files, not `.yaml` files.
- It assumes `.yml` files share the same name and directory as their associated SQL files.
- Downstream models may still use old aliases, which aren't updated automatically. These instances will be flagged in the output for manual correction.

In [None]:
import json
import pandas as pd
import os
import re

In [None]:
# Load the JSON file
repo_path = '/Users/mikaelthorup/repos/lunar/lunar-way-hubble-transformations'
manifest_path = f'{repo_path}/target/prod/manifest.json'
all_model_path = f'{repo_path}/models'
project_identifier = 'model.lw_go_events'
schema_prefix = 'e_'
schema_name = 'public_events'
test_sample_size = None # Set to None to use all data / Set to a number to use a sample of the data
old_model_views_to_keep = ['transaction_posted','user_suspended','userid_to_marketuserid']

In [None]:
with open(manifest_path, 'r') as file:
    manifest_data = json.load(file)

# Prepare data for the DataFrame
table_data = []
for key, value in manifest_data.get('nodes', {}).items():
    if value.get('unique_id').startswith(project_identifier):
        table_data.append({
            'unique_id': value.get('unique_id'),
            'old_sql_path': value.get('original_file_path'),
            'old_model_name': value.get('name'),
            'schema_name': value.get('schema')
        })

# Create the DataFrame
df_all = pd.DataFrame(table_data, columns=['unique_id','old_model_name','old_sql_path', 'schema_name'])
df = df_all.copy()
if test_sample_size:
    df = df.head(test_sample_size)

df = df[~df['old_model_name'].str.startswith(schema_prefix) & (df['schema_name'] == schema_name)]
df.reset_index(drop=True, inplace=True)
df['new_model_name'] = df['old_model_name'].apply(lambda x: schema_prefix + x)
df['new_sql_path'] = df['old_sql_path'].apply(lambda x: x.rsplit('/', 1)[0] + '/' + schema_prefix + x.rsplit('/', 1)[1])
df['old_yml_path'] = df['old_sql_path'].apply(lambda x: x[:-3] + 'yml')
df['old_yml_path'] = df['old_yml_path'].apply(lambda x: x if os.path.exists(repo_path + '/' + x) else None)
df['new_yml_path'] = df['new_sql_path'].apply(lambda x: x[:-3] + 'yml')
df['new_yml_path'] = df.apply(lambda row: None if pd.isnull(row['old_yml_path']) else row['new_yml_path'], axis=1)
df['old_md_path'] = df['old_sql_path'].apply(lambda x: x[:-3] + 'md')
df['old_md_path'] = df['old_md_path'].apply(lambda x: x if os.path.exists(repo_path + '/' + x) else None)
df['new_md_path'] = df['new_sql_path'].apply(lambda x: x[:-3] + 'md')
df['new_md_path'] = df.apply(lambda row: None if pd.isnull(row['old_md_path']) else row['new_md_path'], axis=1)
df['keep_old_model'] = df['old_model_name'].apply(lambda x: x in old_model_views_to_keep)
# Display the DataFrame
df = df[[
    'old_model_name', 'new_model_name',
    'old_sql_path', 'new_sql_path',
    'old_yml_path', 'new_yml_path',
    'old_md_path', 'new_md_path',
    'unique_id', 'schema_name',
    'keep_old_model']]

In [None]:
# Convert the dictionary into a list of tuples
data_tuples = [(old_model_name, child_model_name) for old_model_name, child_model_name in manifest_data['child_map'].items()]

# Create a DataFrame from the list of tuples
df_child_map = pd.DataFrame(data_tuples, columns=['old_model_name', 'child_model_name'])
df_child_map = df_child_map[df_child_map['old_model_name'].isin(df['unique_id'])]

df_child_map = df_child_map.explode('child_model_name')
df_child_map = df_child_map.dropna()

# keep only nodes that are models
df_child_map = df_child_map[df_child_map['child_model_name'].str.startswith(project_identifier)]
df_child_map = df_child_map.reset_index(drop=True)


df_child_map['old_model_name'] = df_child_map['old_model_name'].apply(lambda x: x[len(project_identifier)+1:])
df_child_map['new_model_name'] = df_child_map['old_model_name'].apply(lambda x: schema_prefix + x)

df_child_map = df_child_map[['child_model_name','old_model_name', 'new_model_name']]

df_child_map = df_child_map.merge(df_all[['unique_id', 'old_sql_path']], left_on='child_model_name', right_on='unique_id', how='left')
df_child_map = df_child_map.drop(columns='unique_id')
df_child_map = df_child_map.rename(columns={'old_sql_path': 'child_sql_path'})
df_child_map['is_in_df'] = df_child_map['child_model_name'].isin(df['unique_id'])
df_child_map.loc[df_child_map['is_in_df'], 'child_sql_path'] = df_child_map.loc[df_child_map['is_in_df'], 'child_sql_path'].apply(lambda x: x.rsplit('/', 1)[0] + '/' + schema_prefix + x.rsplit('/', 1)[1])
df_child_map = df_child_map[['child_model_name', 'child_sql_path', 'old_model_name', 'new_model_name','is_in_df']]
df_child_map.sort_values(by='child_model_name', inplace=True)
df_child_map.reset_index(drop=True, inplace=True)

In [None]:
for index, row in df.iterrows():
    old_path = os.path.join(repo_path, row['old_sql_path'])
    new_path = os.path.join(repo_path, row['new_sql_path'])
    os.rename(old_path, new_path)

    if row['old_yml_path']:
        old_path = os.path.join(repo_path, row['old_yml_path'])
        new_path = os.path.join(repo_path, row['new_yml_path'])
        os.rename(old_path, new_path)

    if row['old_md_path']:
        old_path = os.path.join(repo_path, row['old_md_path'])
        new_path = os.path.join(repo_path, row['new_md_path'])
        os.rename(old_path, new_path)

In [None]:
for index, row in df.iterrows():
    if row['old_yml_path']:
        new_path = os.path.join(repo_path, row['new_yml_path'])

        with open(new_path, 'r+') as file:
            content = file.read()
            content = content.replace("  - name: " + row['old_model_name'], "  - name: " + row['new_model_name'])
            file.seek(0)
            file.write(content)
            file.truncate()

In [None]:
# replace downstream refs in child models with new model names
old_ref_found = 0
for index, row in df_child_map.iterrows():
    path = os.path.join(repo_path, row['child_sql_path'])
    with open(path, 'r+') as file:
            content = file.read()
            from_str = r"\{\{.*['\"]" + re.escape(row['old_model_name']) + r"['\"].*\}\}"
            to_str = "{{ ref('" + row['new_model_name'] + "') }}"
            content = re.sub(from_str, to_str, content)
            # note the following regex might produce false positives
            old_model_re = r"(?<!{}){}".format(re.escape(schema_prefix), re.escape(row['old_model_name']))
            if re.search(old_model_re, content):
                old_ref_found += 1
                if old_ref_found == 1:
                    print("one or more files found with reference to old model name, please correct manually")
                    print("path, old_model_name")
                print(f"{path},{row['old_model_name']}")
            file.seek(0)
            file.write(content)
            file.truncate()

In [None]:
for index, row in df[df['keep_old_model']].iterrows():
    sql_text = """{{ config(
    materialized='view',
    schema='events'
)}}

-- this model is a temp model to keep the old model name.


-- THIS MODEL IS DEPRECATED (ノಠ益ಠ)ノ彡┻━┻
-- USE UPSTREAM MODEL FOR NEW QUERIES



select * from {{ ref('""" + row['new_model_name'] + """') }}"""
    yml_text = """version: 2
models:
  - name: """ + row['old_model_name'] + """
    description: |
      this model is a temp model to keep the old model name.
      It should be deleted after all consumers have shifted away.

    meta:
      domain: not_applicable
      owner: data_warehouse
      criticality: low
      contains_pii: false

    deprecation_date: 2024-10-01
    """
    with open(os.path.join(repo_path, row['old_sql_path']), 'w') as file:
        file.write(sql_text)

    with open(os.path.join(repo_path, row['old_sql_path'][:-3] + 'yml'), 'w') as file:
        file.write(yml_text)