<a href="https://colab.research.google.com/github/datacommonsorg/tools/blob/master/notebooks/Your_Data_Commons_Load_Data_Workflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright 2024 Google LLC.
SPDX-License-Identifier: Apache-2.0

In [None]:
# Install dependencies

%%capture
!pip install google-cloud-storage
!pip install google-cloud-run

In [None]:
# User parameters

# Replace the values in this section with yours.
# Running without replacement can lead to errors since you won't have permissions to those resources.

# Path to your local folder with files to be uploaded to GCS.
# Note that colab works best with Google drive, so upload your data to a drive and use that path here.
# This is only needed when working within colab. With standalone scripts, you can obviously use local folder paths.
google_drive_folder_path = '/content/drive/MyDrive/demo-data'

# Your GCP project ID.
# This notebook uses this project ID for all GCP resources - GCS, Cloud SQL, Cloud Run.
# If your resources are spread across different projects, you will need to specify different project IDs accordingly.
gcp_project_id = 'datcom-website-dev'

# The GCS bucket where your data will be uploaded.
gcs_bucket_name = 'customdc-data'
# The folder under the GCP bucket where your data will be uploaded.
gcs_folder_path = 'load-data-workflow-demo'

# The name of your Cloud SQL DB where your data will be imported.
# Note that DB names can only have alpha-numeric characters.
# Otherwise it fails with obscure messages.
cloud_sql_db_name = 'demodb'

# The region where your cloud run jobs and services are deployed.
cloud_run_region = 'us-central1'

# Cloud Run resources.
# The script does not create a new job or service but runs and deploys existing ones respectively.
# You can programmatically creates them as well but will need to use different APIs to do so.

# Name of the cloud run job used for loading data.
load_data_cloud_run_job_name = 'demo-job'
# Name of the cloud run service for your datacommons instance.
datacommons_cloud_run_service_name = 'demo-service'
# The name of the docker image that will be deployed to the cloud run service.
datacommons_service_image = 'gcr.io/datcom-ci/datacommons-services:latest'

full_gcs_path = f'gs://{gcs_bucket_name}/{gcs_folder_path}'
full_gcs_path

'gs://customdc-data/load-data-workflow-demo'

In [None]:
# Imports

import os
import glob
from datetime import datetime
from google.colab import auth
from google.cloud import storage
from google.cloud import run_v2
from google.colab import drive

In [None]:
# Authenticate user, mount google drive and instantiate GCP objects
auth.authenticate_user()

drive.mount('/content/drive')

gcs_client = storage.Client(project=gcp_project_id)
gcs_bucket = gcs_client.bucket(gcs_bucket_name)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Workflow functions

def upload_local_directory_to_gcs():
  assert os.path.isdir(google_drive_folder_path), f'Not a directory: {google_drive_folder_path}'
  for local_file in glob.glob(google_drive_folder_path + '/**'):
    if os.path.isfile(local_file):
      remote_path = os.path.join(gcs_folder_path, local_file[1 + len(google_drive_folder_path):])
      blob = gcs_bucket.blob(remote_path)
      blob.upload_from_filename(local_file)
      print(f'Uploaded {local_file} to gs://{gcs_bucket_name}/{remote_path}')

def run_load_data_job():
  client = run_v2.JobsClient()

  job_path = f'projects/{gcp_project_id}/locations/{cloud_run_region}/jobs/{load_data_cloud_run_job_name}'

  env_vars = [
      run_v2.EnvVar(name='RUN_TIMESTAMP', value=str(datetime.now())),
      run_v2.EnvVar(name='INPUT_DIR', value=full_gcs_path),
      run_v2.EnvVar(name='OUTPUT_DIR', value=full_gcs_path),
      run_v2.EnvVar(name='DB_NAME', value=cloud_sql_db_name)
  ]

  request = run_v2.RunJobRequest(
      name=job_path,
      etag='*',  # Use * for latest revision
      validate_only=False,
      overrides=run_v2.RunJobRequest.Overrides(
          task_count=1,
          container_overrides=[
              run_v2.RunJobRequest.Overrides.ContainerOverride(env=env_vars)
          ],
      ),
  )

  operation = client.run_job(request=request)

  logs_url = f'https://console.cloud.google.com/run/jobs/details/{cloud_run_region}/{load_data_cloud_run_job_name}/logs?project={gcp_project_id}'

  print('Waiting for load data job to complete. This will take a few minutes...')
  print(f'You can monitor the logs at: {logs_url}')
  response = operation.result()
  print(f'Load data job completed: {response.name}')

def deploy_datacommons_service():
  client = run_v2.ServicesClient()

  service_path = f'projects/{gcp_project_id}/locations/{cloud_run_region}/services/{datacommons_cloud_run_service_name}'

  env = {
      'DEPLOY_TIMESTAMP': str(datetime.now()),
      'INPUT_DIR': full_gcs_path,
      'OUTPUT_DIR': full_gcs_path,
      'DB_NAME': cloud_sql_db_name
  }

  service = client.get_service(name=service_path)
  container = service.template.containers[0]
  env_vars = []

  for var_name, var_value in env.items():
    env_vars.append(run_v2.EnvVar(name=var_name, value=var_value))

  for env_var in container.env:
    var_name = env_var.name
    if var_name not in env:
      env_vars.append(env_var)

  container.env = env_vars
  service.template.containers = [container]

  request = run_v2.UpdateServiceRequest(
      service=service,
      validate_only=False,
      allow_missing=False,
  )

  operation = client.update_service(request=request)

  logs_url = f'https://console.cloud.google.com/run/detail/{cloud_run_region}/{datacommons_cloud_run_service_name}/logs?project={gcp_project_id}'

  print('Waiting for service to get deployed. This will take a few minutes...')
  print(f'You can monitor the logs at: {logs_url}')
  response = operation.result()
  print(f'Service deployed: {response.name}')
  print(f'Service URL: {response.urls[-1]}')

In [None]:
# Execute workflow

print("\n==== STEP 1 of 3: Uploading data to GCS ====\n")
upload_local_directory_to_gcs()

print("\n==== STEP 2 of 3: Loading data in your datacommons store ====\n")
run_load_data_job()

print("\n==== STEP 3 of 3: Deploying your datacommons service ====\n")
deploy_datacommons_service()

print("\n==== DONE ====")


==== STEP 1 of 3: Uploading data to GCS ====

Uploaded /content/drive/MyDrive/demo-data/gender_wage_gap.csv to gs://customdc-data/load-data-workflow-demo/gender_wage_gap.csv
Uploaded /content/drive/MyDrive/demo-data/config.json to gs://customdc-data/load-data-workflow-demo/config.json
Uploaded /content/drive/MyDrive/demo-data/average_annual_wage.csv to gs://customdc-data/load-data-workflow-demo/average_annual_wage.csv

==== STEP 2 of 3: Loading data in your datacommons store ====

Waiting for load data job to complete. This will take a few minutes...
You can monitor the logs at: https://console.cloud.google.com/run/jobs/details/us-central1/demo-job/logs?project=datcom-website-dev
Load data job completed: projects/datcom-website-dev/locations/us-central1/jobs/demo-job/executions/demo-job-2fhrs

==== STEP 3 of 3: Deploying your datacommons service ====

Waiting for service to get deployed. This will take a few minutes...
You can monitor the logs at: https://console.cloud.google.com/run/