In [1]:
# Import necessary libraries
import os
from google.colab import auth
from google.colab import drive
from google.cloud import bigquery

In [2]:
# Mount Google Drive
print("Mounting Google Drive...")
try:
    drive.mount('/content/drive')
except Exception as e:
    print(f"Failed to mount Google Drive: {e}")

Mounting Google Drive...
Mounted at /content/drive


In [3]:
# Authenticate manually via OAuth
try:
    auth.authenticate_user()
    print("🔐 Successfully authenticated!")
except Exception as e:
    print(f"Authentication failed: {e}")

🔐 Successfully authenticated!


In [4]:
# Define the path to save the project on Drive
DBT_DIR = '/content/drive/MyDrive/dbt_projects'
PROJECT_NAME = "dbt_my_project"
PROJECT_PATH = f"{DBT_DIR}/{PROJECT_NAME}"

In [5]:
# Create the project directory if it doesn't exist
try:
    os.makedirs(PROJECT_PATH, exist_ok=True)
    print(f"📁 dbt directory created at: {DBT_DIR}")
except Exception as e:
    print(f"Failed to create dbt directory: {e}")

📁 dbt directory created at: /content/drive/MyDrive/dbt_projects


In [6]:
# Install dbt for BigQuery
!pip install dbt-bigquery --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.4/114.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.8/149.8 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.9/79.9 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.6/172.6 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.8/87.8 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m946.1/946.1 kB[0m [31m37.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.1/95.1 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m442.7/442.7 kB[0m [31m29.1 MB/s[0m eta

In [7]:
# Navigate to the base directory and initialize the dbt project
%cd {DBT_DIR}
!dbt init {PROJECT_NAME}

/content/drive/MyDrive/dbt_projects
[0m19:36:58  Running with dbt=1.9.4
[0m19:36:58  [ConfigFolderDirectory]: Unable to parse logging event dictionary. Failed to parse dir field: expected string or bytes-like object, got 'PosixPath'.. Dictionary: {'dir': PosixPath('/root/.dbt')}
[0m19:36:58  Creating dbt configuration folder at 
[0m19:36:58  A project called dbt_my_project already exists here.


In [8]:
# Create the profiles.yml file at the correct location
profiles_content = f"""
meu_projeto_dbt:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: oauth
      project: "meu-projeto-dbt"
      dataset: "meu_dataset"
      threads: 4
      location: US
"""

# Ensure the ~/.dbt directory exists
os.makedirs("/root/.dbt", exist_ok=True)

try:
    with open("/root/.dbt/profiles.yml", "w") as f:
        f.write(profiles_content)
    print("profiles.yml configured for BigQuery via OAuth")
except Exception as e:
    print(f"Failed to write profiles.yml: {e}")

profiles.yml configured for BigQuery via OAuth


In [9]:
# Save a backup of profiles.yml in the project directory
backup_path = f"{PROJECT_PATH}/profiles.yml"

try:
    with open(backup_path, "w") as f:
        f.write(profiles_content)
    print(f"Backup saved at: {backup_path}")
except Exception as e:
    print(f"Failed to write backup_profiles.yml: {e}")

Backup saved at: /content/drive/MyDrive/dbt_projects/dbt_my_project/profiles.yml


In [10]:
# Path where dbt_project.yml will be saved
dbt_project_yml_path = f"{PROJECT_PATH}/dbt_project.yml"

In [11]:
# Content for dbt_project.yml
dbt_project_content = """
name: "meu_projeto_dbt"
version: "1.0"
profile: "meu_projeto_dbt"
config-version: 2

model-paths: ["models"]
macro-paths: ["macros"]

target-path: "target"
clean-targets: ["target", "dbt_modules"]
"""


In [12]:
# Create the dbt_project.yml file in the project directory
with open(dbt_project_yml_path, "w") as f:
    f.write(dbt_project_content)

print(f"dbt_project.yml created at: {dbt_project_yml_path}")

dbt_project.yml created at: /content/drive/MyDrive/dbt_projects/dbt_my_project/dbt_project.yml


In [13]:
# Create the packages.yml file
packages_content = """
packages:
  - package: dbt-labs/dbt_utils
    version: [">=0.8.0", "<1.0.0"]
"""

with open(f"{PROJECT_PATH}/packages.yml", "w") as f:
    f.write(packages_content)

print("packages.yml created successfully")

packages.yml created successfully


In [14]:
# Install dbt packages
try:
    !cd {PROJECT_PATH} && dbt deps
except Exception as e:
    print(f"Failed to install dbt packages: {e}")

[0m19:37:03  Running with dbt=1.9.4
[0m19:37:04  Updating lock file in file path: /content/drive/MyDrive/dbt_projects/dbt_my_project/package-lock.yml
[0m19:37:04  Installing dbt-labs/dbt_utils
[0m19:37:10  Installed from version 0.9.6
[0m19:37:10  Updated version available: 1.3.0
[0m19:37:10  
[0m19:37:10  Updates available for packages: ['dbt-labs/dbt_utils']                 
Update your versions in packages.yml, then run dbt deps


In [15]:
# Test dbt connection to BigQuery
try:
    !cd {PROJECT_PATH} && dbt debug
except Exception as e:
    print(f"dbt debug failed: {e}")

[0m19:37:14  Running with dbt=1.9.4
[0m19:37:14  dbt version: 1.9.4
[0m19:37:14  python version: 3.11.12
[0m19:37:14  python path: /usr/bin/python3
[0m19:37:14  os info: Linux-6.1.123+-x86_64-with-glibc2.35
[0m19:37:16  Using profiles dir at /content/drive/MyDrive/dbt_projects/dbt_my_project
[0m19:37:16  Using profiles.yml file at /content/drive/MyDrive/dbt_projects/dbt_my_project/profiles.yml
[0m19:37:16  Using dbt_project.yml file at /content/drive/MyDrive/dbt_projects/dbt_my_project/dbt_project.yml
[0m19:37:16  adapter type: bigquery
[0m19:37:16  adapter version: 1.9.1
[0m19:37:16  Configuration:
[0m19:37:16    profiles.yml file [[32mOK found and valid[0m]
[0m19:37:16    dbt_project.yml file [[32mOK found and valid[0m]
[0m19:37:16  Required dependencies:
[0m19:37:16   - git [[32mOK found[0m]

[0m19:37:16  Connection:
[0m19:37:16    method: oauth
[0m19:37:16    database: meu-projeto-dbt
[0m19:37:16    execution_project: meu-projeto-dbt
[0m19:37:16    schema:

In [16]:
# Directory structure
folders = [
    "models/",
    "models/tests",
    "macros"
]

In [17]:
# Change to the project directory
os.chdir(PROJECT_PATH)

In [18]:
# Create folders for models, tests, and macros
for folder in folders:
    path = os.path.join(PROJECT_PATH, folder)
    os.makedirs(path, exist_ok=True)

In [19]:
# Initialize BigQuery client
client = bigquery.Client(project="meu-projeto-dbt")

In [20]:
# Define the complete table ID
table_id = "meu-projeto-dbt.sample_data.marketing_data"

In [21]:
# Retrieve the schema for the table
try:
    table = client.get_table(table_id)
except Exception as e:
    print(f"Could not retrieve table schema: {e}")

In [22]:
# Print column name and type, generate a list of table columns
all_columns = []
for schema_field in table.schema:
    print(f"{schema_field.name} ({schema_field.field_type})")
    all_columns.append(schema_field.name)

date (STRING)
tv (FLOAT)
radio (FLOAT)
ooh (FLOAT)
meta (FLOAT)
google (FLOAT)
tiktok (FLOAT)
digital (FLOAT)
sales (FLOAT)
holiday (FLOAT)


In [23]:
# Query to fetch the first 5 rows of data
query = """
SELECT * FROM `meu-projeto-dbt.sample_data.marketing_data`
LIMIT 5
"""

try:
    df = client.query(query).to_dataframe()
    display(df)
except Exception as e:
    print(f"Query failed: {e}")

Unnamed: 0,date,tv,radio,ooh,meta,google,tiktok,digital,sales,holiday
0,2023-04-27,0.0,,1084.2,3796.61,7159.81,4019.28,2106.97,7693449.97,0.0
1,2023-08-25,4055.11,,1012.66,4035.08,4607.99,2801.7,1784.53,7512609.44,0.0
2,2023-10-14,3640.98,,934.71,2275.59,4254.67,2727.94,1627.36,5274272.74,0.0
3,2023-11-17,6816.8,,752.11,3965.98,9265.49,4155.98,2098.45,49283224.1,0.0
4,2024-03-19,9435.95,,1296.58,5003.28,10658.48,8675.1,2588.44,24741935.76,0.0


In [24]:
# Function to save SQL content to a file
def write_sql(path, content):
    try:
        os.makedirs(os.path.dirname(os.path.join(PROJECT_PATH, path)), exist_ok=True)
        with open(os.path.join(PROJECT_PATH, path), "w") as f:
            f.write(content.strip())
        print(f"File written to: {path}")
    except Exception as e:
        print(f"Failed to write file {path}: {e}")

In [25]:
# Define project name and dataset for BigQuery
project_id = "meu-projeto-dbt"
dataset_id = "sample_data"
table_name = "marketing_data"

In [26]:
# Generate source file for dbt
write_sql("models/src_sample_data.yml", f"""
version: 2

sources:
  - name: sample_data
    database: {project_id}
    schema: {dataset_id}
    tables:
      - name: {table_name}
""")

print("src_sample_data.yml file created successfully.")

File written to: models/src_sample_data.yml
src_sample_data.yml file created successfully.


In [27]:
# Define Macros

# macro_remove_duplicates
write_sql(
    "macros/remove_duplicates.sql",
    """
    {% macro remove_duplicates(table) %}
      SELECT DISTINCT * FROM {{ table }}
    {% endmacro %}
    """
)
print("Macro `remove_duplicates` saved.")

File written to: macros/remove_duplicates.sql
Macro `remove_duplicates` saved.


In [28]:
# macro_remove_duplicate_dates_prioritize_fewer_nulls
write_sql(
    "macros/remove_duplicate_dates_prioritize_fewer_nulls.sql",
    """
    {% macro remove_duplicate_dates_prioritize_fewer_nulls(table, ref_col) %}
    {%- set cols = dbt_utils.get_filtered_columns_in_relation(ref(ref_col)) -%}

    ranked AS (
        SELECT
            *,
            ROW_NUMBER() OVER (
                PARTITION BY date
                ORDER BY
                    (
                        {%- for col in cols %}
                            (CASE WHEN {{ col }} IS NULL THEN 1 ELSE 0 END){% if not loop.last %} + {% endif %}
                        {%- endfor %}
                    ) ASC
            ) AS row_num
        FROM {{ table }}
        WHERE date IS NOT NULL
    )
    {% endmacro %}
    """
)
print("Macro `remove_duplicate_dates_prioritize_fewer_nulls` saved.")

File written to: macros/remove_duplicate_dates_prioritize_fewer_nulls.sql
Macro `remove_duplicate_dates_prioritize_fewer_nulls` saved.


In [29]:
# macro_interpolate_sales
write_sql(
    "macros/interpolate_sales.sql",
    """
    {% macro interpolate_sales(table_ref) %}
    (
        SELECT
            *,
            IFNULL(
                sales,
                (
                    LAG(sales) OVER (ORDER BY date) + LEAD(sales) OVER (ORDER BY date)
                ) / 2
            ) AS interpolated_sales
        FROM {{ table_ref }}
    )
    {% endmacro %}
    """
)
print("Macro `interpolate_sales` saved.")


File written to: macros/interpolate_sales.sql
Macro `interpolate_sales` saved.


In [30]:
# macro_coalesce_all_columns
write_sql(
    "macros/coalesce_all_columns.sql",
    """
    {% macro coalesce_all_columns(columns, default_value) %}
        {% for col in columns %}
            COALESCE({{ col }}, {{ default_value }}) AS {{ col }}{{ "," if not loop.last else "" }}
        {% endfor %}
    {% endmacro %}
    """
)
print("Macro `coalesce_all_columns` saved.")

File written to: macros/coalesce_all_columns.sql
Macro `coalesce_all_columns` saved.


In [31]:
# Create tables

# Filter out columns that shouldn't receive COALESCE
investment_cols = [col for col in all_columns if col not in ["date", "holiday", "sales"]]


In [32]:
# Bronze Investment

write_sql("models/bronze_investment.sql", f"""
WITH bronze AS (
    SELECT * FROM {{{{ source('sample_data', 'marketing_data') }}}}
),

    no_duplicates AS (
        {{{{ remove_duplicates('bronze') }}}}
    )

SELECT
    date,
    {', '.join(investment_cols)}
FROM no_duplicates
""")
print("Model `bronze_investment` created.")

File written to: models/bronze_investment.sql
Model `bronze_investment` created.


In [33]:
# Silver Investment

write_sql("models/silver_investment.sql", """
WITH silver AS (
    SELECT * FROM {{ ref('bronze_investment') }}
),

    {{ remove_duplicate_dates_prioritize_fewer_nulls('silver','bronze_investment') }},

    no_duplicate_dates AS (
    SELECT * FROM ranked WHERE row_num = 1
    )

SELECT
    *
FROM no_duplicate_dates
""")
print("Model `silver_investment` created.")

File written to: models/silver_investment.sql
Model `silver_investment` created.


In [34]:
# Gold Investment

investment_select = []
for col in investment_cols:
    if col == "digital":
        investment_select.append(",\n    " + f"{col} AS display_video")
    else:
        investment_select.append(",\n    " + col)

write_sql("models/gold_investment.sql", f"""
WITH gold AS (
    SELECT * FROM {{{{ ref('silver_investment') }}}}
),

filled AS (
    SELECT
        date,
        {{{{ coalesce_all_columns({investment_cols}, 0) }}}}
    FROM gold
)

SELECT
    date {''.join(investment_select)}
FROM filled
""")
print("Model `gold_investment` created.")

File written to: models/gold_investment.sql
Model `gold_investment` created.


In [35]:
# Bronze KPI

write_sql("models/bronze_kpi.sql", """
WITH bronze AS (
    SELECT * FROM {{ source('sample_data', 'marketing_data') }}
),

    no_duplicates AS (
        {{ remove_duplicates('bronze') }}
    )

SELECT
    date,
    sales
FROM no_duplicates
""")

print("Model `bronze_kpi` created.")

File written to: models/bronze_kpi.sql
Model `bronze_kpi` created.


In [36]:
# Silver KPI

write_sql("models/silver_kpi.sql", """
WITH silver AS (
    SELECT * FROM {{ ref('bronze_kpi') }}
),

    {{ remove_duplicate_dates_prioritize_fewer_nulls('silver','bronze_kpi') }},

    no_duplicate_dates AS (
    SELECT * FROM ranked WHERE row_num = 1
)

SELECT
    *
FROM no_duplicate_dates
""")

print("Model `silver_insilver_kpivestment` created.")

File written to: models/silver_kpi.sql
Model `silver_insilver_kpivestment` created.


In [37]:
# Gold KPI

write_sql("models/gold_kpi.sql", """
with interpolated AS (
 {{ interpolate_sales(ref('silver_kpi')) }}
)

SELECT
    date,
    case
      when sales is null then interpolated_sales
      else sales
    end as sales
FROM interpolated
""")

print("Model `gold_kpi` created.")

File written to: models/gold_kpi.sql
Model `gold_kpi` created.


In [38]:
# Bronze Auxiliar

write_sql("models/bronze_auxiliar.sql", f"""
WITH bronze AS (
    SELECT * FROM {{{{ source('sample_data', 'marketing_data') }}}}
),

    no_duplicates AS (
        {{{{ remove_duplicates('bronze') }}}}
    )

SELECT
    date,
    holiday
FROM no_duplicates
""")

print("Model `bronze_auxiliar` created.")

File written to: models/bronze_auxiliar.sql
Model `bronze_auxiliar` created.


In [39]:
# Silver Auxiliar

write_sql("models/silver_auxiliar.sql", """
WITH silver AS (
    SELECT * FROM {{ ref('bronze_auxiliar') }}
),

{{ remove_duplicate_dates_prioritize_fewer_nulls('silver','bronze_auxiliar') }},

    no_duplicate_dates AS (
    SELECT * FROM ranked WHERE row_num = 1
    )

SELECT
    *
FROM no_duplicate_dates
""")

print("Model `silver_auxiliar` created.")

File written to: models/silver_auxiliar.sql
Model `silver_auxiliar` created.


In [40]:
# Gold Auxiliar

write_sql("models/gold_auxiliar.sql", """
WITH gold AS (
    SELECT * FROM {{ ref('silver_auxiliar') }}
),

filled AS (
    SELECT
        date,
        {{ coalesce_all_columns(['holiday'], 0) }}
    FROM gold
)

SELECT
    *
FROM filled
""")

print("Model `gold_auxiliar` created.")

File written to: models/gold_auxiliar.sql
Model `gold_auxiliar` created.


In [41]:
# DBT Compile
!dbt compile

[0m19:37:24  Running with dbt=1.9.4
[0m19:37:26  Registered adapter: bigquery=1.9.1
[0m19:37:27  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m19:37:29  Found 9 models, 1 source, 692 macros
[0m19:37:29  
[0m19:37:29  Concurrency: 4 threads (target='dev')
[0m19:37:29  


In [42]:
# Run DBT
!dbt run

[0m19:37:35  Running with dbt=1.9.4
[0m19:37:37  Registered adapter: bigquery=1.9.1
[0m19:37:38  Found 9 models, 1 source, 692 macros
[0m19:37:38  
[0m19:37:38  Concurrency: 4 threads (target='dev')
[0m19:37:38  
[0m19:37:39  1 of 9 START sql view model meu_dataset.bronze_auxiliar ........................ [RUN]
[0m19:37:39  2 of 9 START sql view model meu_dataset.bronze_investment ...................... [RUN]
[0m19:37:39  3 of 9 START sql view model meu_dataset.bronze_kpi ............................. [RUN]
[0m19:37:39  3 of 9 OK created sql view model meu_dataset.bronze_kpi ........................ [[32mCREATE VIEW (0 processed)[0m in 0.87s]
[0m19:37:39  1 of 9 OK created sql view model meu_dataset.bronze_auxiliar ................... [[32mCREATE VIEW (0 processed)[0m in 0.87s]
[0m19:37:39  2 of 9 OK created sql view model meu_dataset.bronze_investment ................. [[32mCREATE VIEW (0 processed)[0m in 0.87s]
[0m19:37:39  4 of 9 START sql view model meu_dataset.s

In [43]:
# Testes path
tests_path = f"{PROJECT_PATH}/models/tests"

In [44]:
# Tests for gold_investment
try:
    index = investment_cols.index('digital')
    investment_cols[index] = 'display_video'
except ValueError:
    pass  # 'digital' not in list, so no replacement needed

test_investment = """version: 2

models:
  - name: gold_investment
    tests:
"""

# Add expression_is_true tests
for col in investment_cols:
    test_investment += f"""      - dbt_utils.expression_is_true:
          expression: "{col} >= 0"
"""

# Add column-specific tests
test_investment += "    columns:\n"
for col in investment_cols:
    test_investment += f"""      - name: {col}
        tests:
          - not_null
"""

with open(f"{tests_path}/test_gold_investment.yml", "w") as f:
    f.write(test_investment)

In [45]:
# Tests for gold_kpi
test_kpi = """
version: 2

models:
  - name: gold_kpi
    tests:
      - dbt_utils.expression_is_true:
          expression: "sales >= 0"
    columns:
      - name: sales
        tests:
          - not_null
"""

with open(f"{tests_path}/test_gold_kpi.yml", "w") as f:
    f.write(test_kpi)

In [46]:
# Tests for gold_auxiliar
test_aux = """
version: 2

models:
  - name: gold_auxiliar
    tests:
      - dbt_utils.expression_is_true:
          expression: "holiday in (0, 1)"
    columns:
      - name: holiday
        tests:
          - not_null
"""

with open(f"{tests_path}/test_gold_auxiliar.yml", "w") as f:
    f.write(test_aux)



In [47]:
# Run DBT

!dbt test

[0m19:37:45  Running with dbt=1.9.4
[0m19:37:48  Registered adapter: bigquery=1.9.1
[0m19:37:50  Found 9 models, 18 data tests, 1 source, 692 macros
[0m19:37:50  
[0m19:37:50  Concurrency: 4 threads (target='dev')
[0m19:37:50  
[0m19:37:51  1 of 18 START test dbt_utils_expression_is_true_gold_auxiliar_holiday_in_0_1_ .. [RUN]
[0m19:37:51  2 of 18 START test dbt_utils_expression_is_true_gold_investment_display_video_0  [RUN]
[0m19:37:51  3 of 18 START test dbt_utils_expression_is_true_gold_investment_google_0 ....... [RUN]
[0m19:37:51  4 of 18 START test dbt_utils_expression_is_true_gold_investment_meta_0 ......... [RUN]
[0m19:37:52  1 of 18 PASS dbt_utils_expression_is_true_gold_auxiliar_holiday_in_0_1_ ........ [[32mPASS[0m in 1.35s]
[0m19:37:52  5 of 18 START test dbt_utils_expression_is_true_gold_investment_ooh_0 .......... [RUN]
[0m19:37:52  3 of 18 PASS dbt_utils_expression_is_true_gold_investment_google_0 ............. [[32mPASS[0m in 1.43s]
[0m19:37:52  2 of 18