# **Meridian with Cortex for Marketing**

## Introduction

Welcome to the Meridian with Cortex for Marketing notebook. This *example* notebook showcases the integration between Cortex for Marketing and Meridian.

**❗️IMPORTANT❗️** The default configuration parameters and sample data for Meridian are intended for demo purposes only and should not be deployed for production use. Meridian configuration parameters should be chosen with great care as they will influence the behaviour of the model and results. Please consult Meridian documentation for guidance on how to setup the model configuration for your unique business needs and goals. See [Meridian modeling](https://developers.google.com/meridian/docs/basics/about-the-project). If needed consult with an official [Google Meridian partner](https://developers.google.com/meridian/partners) and/or your Google Ads representative. For any questions or issues related to the Google Cloud Cortex Framework itself, see the [Cortex Framework Support](https://cloud.google.com/cortex/docs/support) page.

This notebook is intended for automated execution in Colab Enterprise, but can also be used as a starting point for your own experiments.

The overall execution flow of the code in the notebook is:
1. Install packages (Meridian) and imports
2. Load helper functions for GCS and BigQuery
3. Load execution configuration from config file in GCS (`configuration/cortex_meridian_config.json`)
4. Load Cortex data from Cortex Data Foundation view in BigQuery
5. Configure Meridian model and map Cortex Marketing Cross Media and SAP or Oracle EBS sales data to Meridian data model
6. Run Meridian sampling and output summary report to GCS (`reporting` folder)
7. Run budget optimizer and output results report to GCS (`reporting` folder)
8. Save model to GCS (`models` folder)
9. Save CSV results to GCS (`csv` folder)
10. Generate overview report and save to GCS (`reporting` folder)

### Pre-reqs

*   Knowledge of Cortex for Marketing deployed on GCP (see https://cloud.google.com/cortex/docs/overview) with relevant marketing and sales data in BQ
*   Access to Cortex for Marketing data in BigQuery
*   Knowledge of Meridian (see https://developers.google.com/meridian)

### Compatibility

This notebook has been tested with Cortex Framework version {{TESTED_CORTEX_VERSION}} and Meridian version {{TESTED_MERIDIAN_VERSION}}.

## Setup

In [None]:
# @title Install packages
!pip install --upgrade "google-meridian[colab,and-cuda]=={{TESTED_MERIDIAN_VERSION}}"

In [None]:
import subprocess

def get_package_version(package_name):
  try:
    output = subprocess.check_output(['pip', 'show', package_name], text=True)
    for line in output.splitlines():
      if line.startswith('Version: '):
        return line.split(': ')[1]
    return None  # Package not found or version not available
  except subprocess.CalledProcessError:
    return None #pip show command failed.

package_name = "google-meridian"
version = get_package_version(package_name)

if version:
  print(f"Meridian version: {version}")
else:
  print(f"Could not find version information for {package_name}")

In [None]:
# @title Imports
import datetime
import json
import os
import pprint
from pprint import pprint
import sys

import google.auth
from google.cloud import bigquery
from google.cloud import storage

import IPython

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp
import arviz as az

import jinja2

from meridian import constants
from meridian.data import load
from meridian.data import test_utils
from meridian.model import model
from meridian.model import spec
from meridian.model import prior_distribution
from meridian.analysis import optimizer
from meridian.analysis import analyzer
from meridian.analysis import visualizer
from meridian.analysis import summarizer
from meridian.analysis import formatter

In [None]:
# @title Output system specs

# Check RAM, CPUs and if GPU is available
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU')))

## Helper functions

In [None]:
# @title GCS helper functions

def check_files_in_gcs_folder(bucket_name, folder_path, file_names, project_id):
    """
    Checks if specific files exist within a given folder in a GCS bucket.

    Args:
        bucket_name: The name of the GCS bucket.
        folder_path: The path to the folder within the bucket (e.g., "configuration/").
        file_names: A list of file names to check for (e.g., ["config1.txt", "config2.json"]).
        project_id: The ID of the Google Cloud project. If None, the default project will be used.

    Returns:
        A dictionary where keys are file names, and values are booleans indicating
        whether the file exists (True) or not (False).
    """
    try:
        storage_client = storage.Client(project=project_id)
        bucket = storage_client.bucket(bucket_name)

        file_existence = {}
        for file_name in file_names:
            blob_path = os.path.join(folder_path, file_name)  # Construct the full blob path
            blob = bucket.blob(blob_path)
            file_existence[file_name] = blob.exists()

        return file_existence

    except google.auth.exceptions.DefaultCredentialsError:
        print("Google Cloud credentials not found.")
        return {}  # Return an empty dictionary to indicate failure
    except Exception as e:
        print(f"An error occurred: {e}")
        return {} # Return an empty dictionary to indicate failure



def upload_file_to_gcs(bucket_name, file_path, destination_blob_name, project_id):
    """
    Uploads a file to a Google Cloud Storage bucket.

    Args:
        bucket_name: The name of the GCS bucket.
        file_path: The path to the file to upload.
        destination_blob_name: The name of the blob in the bucket where the
            file will be uploaded.
        project_id: The ID of the Google Cloud project. If None, the default
            project will be used.
    """
    try:
        storage_client = storage.Client(project=project_id)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)

        blob.upload_from_filename(file_path)

        print(f"File {file_path} uploaded to gs://{bucket_name}/{destination_blob_name}")

    except google.auth.exceptions.DefaultCredentialsError:
        print("Google Cloud credentials not found. Please set up Application Default Credentials.")
    except Exception as e:
        print(f"An error occurred: {e}")

def load_json_from_gcs(bucket_name, file_path, project_id):
    """
    Loads JSON data from a GCS file into a Python dictionary.

    Args:
        bucket_name: The name of the GCS bucket.
        file_path: The full path to the JSON file within the bucket (e.g., "configuration/data.json").
        project_id: The ID of the Google Cloud project. If None, the default project will be used.

    Returns:
        A Python dictionary containing the JSON data, or None if an error occurs.
    """
    try:
        storage_client = storage.Client(project=project_id)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_path)

        if not blob.exists():
            print(f"File '{file_path}' does not exist in bucket '{bucket_name}'.")
            return None

        # Download the JSON file as a string
        json_string = blob.download_as_string()

        # Parse the JSON string into a Python dictionary
        json_data = json.loads(json_string)

        return json_data

    except google.auth.exceptions.DefaultCredentialsError:
        print("Google Cloud credentials not found. Please set up Application Default Credentials.")
        return None
    except json.JSONDecodeError:
        print(f"Error decoding JSON from file '{file_path}'.")
        return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

### BigQuery helper functions

In [None]:
def check_bigquery_columns(project_id, dataset_id, table_or_view_id, column_names):
    """
    Checks if a list of column names exists in a BigQuery table or view.

    Args:
        project_id (str): The ID of the BigQuery project.
        dataset_id (str): The ID of the BigQuery dataset.
        table_or_view_id (str): The ID of the BigQuery table or view.
        column_names (list[str]): A list of column names to check for.

    Returns:
        tuple: A tuple containing two lists:
            - The list of columns that exist in the table/view.
            - The list of columns that do not exist in the table/view.

    Raises:
        google.cloud.exceptions.NotFound: If the table or view does not exist.
    """

    client = bigquery.Client(project=project_id)
    table_ref = client.dataset(dataset_id).table(table_or_view_id)

    try:
        table = client.get_table(table_ref)
    except Exception as e:
        raise e

    existing_columns = [field.name for field in table.schema]
    found_columns = []
    missing_columns = []

    for column_name in column_names:
        if column_name in existing_columns:
            found_columns.append(column_name)
        else:
            missing_columns.append(column_name)

    return found_columns, missing_columns

In [None]:
def load_cortex_bigquery_meridian_data(project_id, dataset_id, table_id):
    """
    Loads data from a BigQuery table into a Pandas DataFrame.

    Meridian requirement:
        Time column values must be formatted in yyyy-mm-dd date format.

    Args:
        project_id: The ID of the Google Cloud project.
        dataset_id: The ID of the BigQuery dataset.
        table_id: The ID of the BigQuery table.

    Returns:
        A Pandas DataFrame containing the data, or None if an error occurs.
    """
    try:
        # Construct the full table ID
        table_full_id = f"{project_id}.{dataset_id}.{table_id}"

        # Initialize the BigQuery client
        client = bigquery.Client(project=project_id)

        # Construct the query
        query = f"SELECT * FROM `{table_full_id}` ORDER BY time, geo ASC"

        # Run the query and load the results into a DataFrame
        query_job = client.query(query)  # Make an API request.
        results = query_job.result().to_dataframe() # Waits for query to finish and converts to a Pandas DataFrame.

        print(f"Loaded data from BigQuery into Pandas DataFrame. Found {len(results)} rows.")
        return results

    except google.auth.exceptions.DefaultCredentialsError:
        print("Google Cloud credentials not found.")
        return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

## Load configuration

In [None]:
# @title Set variables
# 1. Project ID Configuration
gcs_project_id = os.environ.get('GCLOUD_PROJECT') or os.environ.get('GOOGLE_CLOUD_PROJECT')

if gcs_project_id:
    print(f"Current Project ID (from environment): {gcs_project_id}")
else:
    print("Project ID not found in environment variables.")
    gcs_project_id = ""  # Set to empty string to avoid errors later

# 2. GCS Bucket and Folder Paths
gcs_bucket_name_suffix =  "{{BUCKET_SUFFIX}}" # default value is "cortex-meridian"
gcs_bucket_name = f"{gcs_project_id}-{gcs_bucket_name_suffix}"
gcs_bucket = f"gs://{gcs_bucket_name}"

gcs_config_folder_path = "configuration"
gcs_config_file_name = "cortex_meridian_config.json"
gcs_config_file_path = os.path.join(gcs_config_folder_path, gcs_config_file_name)

gcs_reports_folder_path = "reporting"
gcs_csv_folder_path = "csv"
gcs_models_folder_path = "models"

# 3. Timestamp Generation
now = datetime.datetime.now()
timestamp_str = now.strftime("%Y%m%d-%H%M%S")
print(f"Timestamp: {timestamp_str} will be added to all files")

# 4. Local File Paths
local_reports_path = '/content/reporting'
local_csv_path = '/content/csv'
local_models_path = '/content/models'

# 5. Filename Definitions
optimization_report_filename = f'optimization_output_{timestamp_str}.html'
summary_report_filename = f'summary_output_{timestamp_str}.html'
overview_report_template_filename = 'cortex_meridian_overview.html.jinja'
overview_report_filename = 'overview.html'
nonoptimized_data_results_filename = f"nonoptimized_results_{timestamp_str}.csv"
optimized_data_results_file_name = f"optimized_results_{timestamp_str}.csv"
model_filename = f"mmm_{timestamp_str}.pkl"


In [None]:
# @title Local folders
folder_paths = [local_reports_path, local_csv_path, local_models_path]
for folder_path in folder_paths:
        try:
            if not os.path.exists(folder_path):
                os.makedirs(folder_path)
                print(f"Folder '{folder_path}' created successfully.")
            else:
                print(f"Folder '{folder_path}' already exists.")
        except OSError as e:
            print(f"Error creating folder '{folder_path}': {e}")
            sys.exit(1)

In [None]:
# @title Validate config file exists and load

file_check = check_files_in_gcs_folder(gcs_bucket_name, gcs_config_folder_path, [gcs_config_file_name], gcs_project_id)

if file_check[gcs_config_file_name]:
    print("File exists")

    cortex_meridian_config = load_json_from_gcs(gcs_bucket_name, gcs_config_file_path, gcs_project_id)

    if cortex_meridian_config:
        print("Cortex JSON config data loaded successfully:")
        pprint(cortex_meridian_config)
    else:
        print("Failed to load Cortex JSON config data.")
        sys.exit(1)
else:
    print("File does not exist")
    sys.exit(1)

In [None]:
# @title Validate columns from configuration exists in view
fields_to_columns = [
    cortex_meridian_config["column_mappings"]["time"],
    cortex_meridian_config["column_mappings"]["geo"],
    cortex_meridian_config["column_mappings"]["controls"],
    cortex_meridian_config["column_mappings"]["population"],
    cortex_meridian_config["column_mappings"]["kpi"],
    cortex_meridian_config["column_mappings"]["revenue_per_kpi"],
    cortex_meridian_config["column_mappings"]["media"],
    cortex_meridian_config["column_mappings"]["media_spend"]
]

#Build final list of columns
columns_to_check = []
for item in fields_to_columns:
    if isinstance(item, list):
        columns_to_check.extend([x for x in item if x is not None])  # Extend with non-None elements
    elif item is not None and item != "":
        columns_to_check.append(item)  # Append if not None

print(f"Column names read from config: {columns_to_check} will now check view for matching columns")

try:
    found, missing = check_bigquery_columns(cortex_meridian_config["cortex_bq_project_id"], cortex_meridian_config["cortex_meridian_marketing_data_set_id"], cortex_meridian_config["cortex_meridian_marketing_view_name"], columns_to_check)
    print(f"Found columns: {found}")
    print(f"Missing columns: {missing}")
    if len(missing) > 0:
        raise Exception(f"Missing columns: {missing}")
    else:
        print("All columns found in view.")
except Exception as e:
    print(f"An error occurred: {e}")



## Load data from Cortex

In [None]:
# @title Load from BigQuery into dataframe
project_id = cortex_meridian_config["cortex_bq_project_id"]
dataset_id = cortex_meridian_config["cortex_meridian_marketing_data_set_id"]
table_id = cortex_meridian_config["cortex_meridian_marketing_view_name"]

cortex_df = load_cortex_bigquery_meridian_data(project_id, dataset_id, table_id)
cortex_df['time'] = cortex_df['time'].astype(str)

print(f"Data frame size: {len(cortex_df)}")
cortex_df

In [None]:
# @title Output dataframe info
cortex_df.info()

## Meridian model

**‼ IMPORTANT ‼**: The Meridian modelling parameters read from `cortex_meridian_config.json` should be chosen with great care! **NEVER** make actual changes to your marketing spend before reading and understanding all details about [how to do Meridian modelling](https://developers.google.com/meridian/docs/basics/about-the-project). If needed consult with an [official Google Meridian partner](https://developers.google.com/meridian/partners) and/or your Google Ads representative.

### Configure model

In [None]:
# @title Columns to channel mapping

revenue_per_kpi = cortex_meridian_config["column_mappings"]["revenue_per_kpi"]

base_columns = {
    "time": cortex_meridian_config["column_mappings"]["time"],
    "geo": cortex_meridian_config["column_mappings"]["geo"],
    "population": cortex_meridian_config["column_mappings"]["population"],
    "controls": cortex_meridian_config["column_mappings"]["controls"],
    "media": cortex_meridian_config["column_mappings"]["media"],
    "media_spend": cortex_meridian_config["column_mappings"]["media_spend"],
    "kpi": cortex_meridian_config["column_mappings"]["kpi"]
}

if revenue_per_kpi and not len(revenue_per_kpi) == 0:
    base_columns["revenue_per_kpi"] = revenue_per_kpi
else:
  print("revenue_per_kpi not set will not be added to coord_to_columns")

coord_to_columns = load.CoordToColumns(**base_columns)

# Process media array and spend array
correct_media_to_channel = {}
correct_media_spend_to_channel = {}
channel_index = 0
for media_name in cortex_meridian_config["column_mappings"]["media"]:
    channel_name = cortex_meridian_config["channel_names"][channel_index]
    correct_media_to_channel[media_name] = channel_name
    channel_index += 1

channel_index = 0

for media_spend_name in cortex_meridian_config["column_mappings"]["media_spend"]:
  channel_name = cortex_meridian_config["channel_names"][channel_index]
  correct_media_spend_to_channel[media_spend_name] = channel_name
  channel_index += 1

print("Media to channel mapping:", correct_media_to_channel)
print("Media to channel spend mapping:", correct_media_spend_to_channel)
print("coord_to_columns:", coord_to_columns)

In [None]:
# @title Load dataframe
loader = load.DataFrameDataLoader(
    df=cortex_df,
    kpi_type=cortex_meridian_config["data_processing"]["kpi_type"],
    coord_to_columns=coord_to_columns,
    media_to_channel=correct_media_to_channel,
    media_spend_to_channel=correct_media_spend_to_channel,
)
data = loader.load()

In [None]:
# @title Model specs
roi_mu = cortex_meridian_config["data_processing"]["roi_mu"] # Mu for ROI prior for each media channel.
roi_sigma = cortex_meridian_config["data_processing"]["roi_sigma"] # Sigma for ROI prior for each media channel.
prior = prior_distribution.PriorDistribution(
    roi_m=tfp.distributions.LogNormal(roi_mu, roi_sigma, name=constants.ROI_M)
)
model_spec = spec.ModelSpec(prior=prior)

mmm = model.Meridian(input_data=data, model_spec=model_spec)

### Sampling

In [None]:
# @title Sample prior and posterior
mmm.sample_prior(cortex_meridian_config["data_processing"]["sample"]["prior"])
mmm.sample_posterior(
    n_chains=cortex_meridian_config["data_processing"]["sample"]["posterior"]["n_chains"],
    n_adapt=cortex_meridian_config["data_processing"]["sample"]["posterior"]["n_adapt"],
    n_burnin=cortex_meridian_config["data_processing"]["sample"]["posterior"]["n_burnin"],
    n_keep=cortex_meridian_config["data_processing"]["sample"]["posterior"]["n_keep"])

In [None]:
# @title Model fit
model_fit = visualizer.ModelFit(mmm)
model_fit.plot_model_fit()

In [None]:
# @title Summarizer
mmm_summarizer = summarizer.Summarizer(mmm)

In [None]:
# @title Define start/end for report
earliest_date = cortex_df['time'].min()
latest_date = cortex_df['time'].max()

print("Earliest date:", earliest_date)
print("Latest date:", latest_date)

In [None]:
# @title Generate summary report
start_date = earliest_date
end_date = latest_date
mmm_summarizer.output_model_results_summary(summary_report_filename, local_reports_path, start_date, end_date)

In [None]:
# @title Save summary report to GCS
upload_file_to_gcs(gcs_bucket_name, f"{local_reports_path}/{summary_report_filename}",f"{gcs_reports_folder_path}/{summary_report_filename}", gcs_project_id)

### Optimization

In [None]:
# @title Budget optimizer
budget_optimizer = optimizer.BudgetOptimizer(mmm)
optimization_results = budget_optimizer.optimize()

In [None]:
# @title Export optimization report with optimized spend allocations and ROI
optimization_results.output_optimization_summary(optimization_report_filename, local_reports_path)

In [None]:
# @title Save optimization report to GCS
upload_file_to_gcs(gcs_bucket_name, f"{local_reports_path}/{optimization_report_filename}",f"{gcs_reports_folder_path}/{optimization_report_filename}", gcs_project_id)

In [None]:
# @title Output non-optimized summary
optimization_results.nonoptimized_data

In [None]:
# @title Save non-optimized summary as CSV local and upload to GCS

# Convert xarray Dataset to Pandas DataFrame for saving
dataset_nonoptimized_data_results = optimization_results.nonoptimized_data
nonoptimized_data_results_df = dataset_nonoptimized_data_results.to_dataframe()

# Save the DataFrame to a CSV file
nonoptimized_data_results_df.to_csv(f"{local_csv_path}/{nonoptimized_data_results_filename}")

#Upload GCS
upload_file_to_gcs(gcs_bucket_name, f"{local_csv_path}/{nonoptimized_data_results_filename}",f"{gcs_csv_folder_path}/{nonoptimized_data_results_filename}", gcs_project_id)

In [None]:
# @title Output optimized summary
optimization_results.optimized_data

In [None]:
# @title Save optimized summary as CSV local and upload to GCS
# Convert xarray Dataset to Pandas DataFrame
dataset_optimized_data_results = optimization_results.optimized_data
optimized_data_results_df = dataset_optimized_data_results.to_dataframe()

# Save the DataFrame to a CSV file
optimized_data_results_df.to_csv(f"{local_csv_path}/{optimized_data_results_file_name}")


#Upload GCS
upload_file_to_gcs(gcs_bucket_name, f"{local_csv_path}/{optimized_data_results_file_name}",f"{gcs_csv_folder_path}/{optimized_data_results_file_name}", gcs_project_id)

In [None]:
# @title Output model object
model.save_mmm(mmm, f"{local_models_path}/{model_filename}")

In [None]:
# @title Save model object to GCS
upload_file_to_gcs(gcs_bucket_name, f"{local_models_path}/{model_filename}",f"{gcs_models_folder_path}/{model_filename}", gcs_project_id)

## Create/update overview HTML page

In [None]:
# @title Get list of reports and CSV files from GCS
from google.cloud import storage
from google.auth import default
from datetime import datetime, timedelta

def remove_before_slash(filepath):
  """Removes everything before the last '/' in a filepath.

  Args:
    filepath: The input filepath string.

  Returns:
    The filepath string with everything before the last '/' removed,
    or the original string if no '/' is found.
  """
  last_slash_index = filepath.rfind('/')
  if last_slash_index != -1:
    return filepath[last_slash_index + 1:]
  else:
    return filepath

def list_gcs_files(bucket_name, folder_path, includeFileExtension):
    """
    Lists files in a Google Cloud Storage (GCS) bucket/folder, sorted by filename descending,
    and returns their name, creation date, and authenticated URL.

    Args:
        bucket_name (str): The name of the GCS bucket.
        folder_path (str, optional): The path to the folder within the bucket. Defaults to None (root).

    Returns:
        list: A list of dictionaries, where each dictionary represents a file and contains
              'name', 'created', and 'authenticated_url' keys. Returns an empty list if no files are found.
              Returns None if there are errors with GCS.
    """
    try:
        credentials, project = default()
        storage_client = storage.Client(credentials=credentials, project=project)
        bucket = storage_client.bucket(bucket_name)

        blobs = bucket.list_blobs(prefix=folder_path) if folder_path else bucket.list_blobs()

        file_list = []
        for blob in blobs:
            if not blob.name.endswith('/'): # Skip folders
              if blob.name.endswith(f'{includeFileExtension}'): # Apply file extension filter
                file_list.append({
                    'name': remove_before_slash(blob.name),
                    'created': blob.time_created,
                    'authenticated_url': f"https://storage.mtls.cloud.google.com/{project_id}-{gcs_bucket_name_suffix}/{blob.name}",
                })

        # Sort by filename in descending order
        sorted_files = sorted(file_list, key=lambda x: x['name'], reverse=True)

        return sorted_files

    except Exception as e:
        print(f"Error accessing GCS: {e}")
        return None

def print_files_info(files):
    """
    Prints information about a list of files.

    Args:
        files: A list of dictionaries, where each dictionary represents a file
               and contains 'name', 'created', and 'authenticated_url' keys.
               Can also be None.
    """
    if files is not None:
        if files:
            for file_info in files:
                print(f"Name: {file_info['name']}")
                print(f"Created: {file_info['created']}")
                print(f"Authenticated URL: {file_info['authenticated_url']}")
                print("-" * 20)
        else:
            print("No files found.")

print("Loading all files on GCS to update overview HTML page")
report_files = list_gcs_files(gcs_bucket_name, f"{gcs_reports_folder_path}", ".html")
csv_files = list_gcs_files(gcs_bucket_name, f"{gcs_csv_folder_path}", ".csv")

print_files_info(report_files)
print_files_info(csv_files)

In [None]:
# @title Load template from GCS to current directory
from google.cloud import storage
import os

def download_blob(bucket_name, blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "/path/to/your/local/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    blob = bucket.blob(blob_name)
    try:
        blob.download_to_filename(destination_file_name)

        print(
            "Downloaded storage object {} from bucket {} to local file {}.".format(
                blob_name, bucket_name, destination_file_name
            )
        )
    except Exception as e:
        print(f"Error downloading {blob_name}: {e}")
        return False

    return True

download_blob(gcs_bucket_name, f"{gcs_reports_folder_path}/{overview_report_template_filename}", f"/content/{overview_report_template_filename}")



In [None]:
# @title Execute Jinja on template
from jinja2 import Environment, FileSystemLoader
import datetime

# Set up the Jinja2 environment
env = Environment(loader=FileSystemLoader('/content'))  # Load templates from the content directory root
template = env.get_template(f"{overview_report_template_filename}")

#Set last update
now = datetime.datetime.now()
last_update = now.strftime("%Y-%m-%d %H:%M:%S")

# Render the template with the data
output = template.render(report_files=report_files, csv_files=csv_files, last_update=last_update)

# Print the output
print(output)

# To save to a file:
with open(overview_report_filename, 'w') as f:
    f.write(output)

In [None]:
# @title View overview page in notebook (commented out for normal executions)
#from IPython.display import HTML
#HTML(filename=f"/content/{overview_report_filename}")

In [None]:
# @title Save overview page to GCS
upload_file_to_gcs(gcs_bucket_name, f"{overview_report_filename}",f"{gcs_reports_folder_path}/{overview_report_filename}", gcs_project_id)