<a href="https://colab.research.google.com/github/brendanlooker/df-demo/blob/main/dataform.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install the Client Library

In [None]:
pip install google-cloud-dataform

In [8]:

import logging
import time

from google.cloud import dataform_v1beta1 as dataform

In [9]:
# Initialise client

df_client = dataform.DataformClient()


In [None]:
# Authenticate via ADC (Application Default Credentials)

!gcloud auth application-default login

In [None]:
# Complete set-up
project = '' # Set GCP project

!gcloud auth application-default set-quota-project {project}

In [None]:
# Create a Dataform Repository

gcp_project = ''
location = 'europe-west4'
repo_name = 'demo_repo-df'

repo_uri = f'projects/{gcp_project}/locations/{location}'



request = dataform.CreateRepositoryRequest(
    parent=repo_uri,
    repository=dataform.Repository(
        name=repo_name),
    repository_id=repo_name
)

print(request)

df_client.create_repository(request=request)

In [None]:
# Create a Dataform Workspace

gcp_project = ''
location = 'europe-west4'
repo_name = 'demo_repo-df'
workspace='demo_repo-df-dev1'


repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'



request = dataform.CreateWorkspaceRequest(
    parent=repo_uri,
    workspace=dataform.Workspace(
        name=repo_name),
    workspace_id=workspace
)

print(request)

df_client.create_workspace(request=request)


In [None]:
# Compile a workflow (Create a complilation result)


gcp_project = ''
bq_dataset = 'df_demo_ds'
location = 'europe-west4'
repo_name = 'dataform-demo'
branch = 'main'


repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'

request = dataform.CreateCompilationResultRequest(
    parent=repo_uri,
    compilation_result=dataform.types.CompilationResult(
        git_commitish=branch,
        code_compilation_config=dataform.types.CompilationResult.CodeCompilationConfig(
            default_database=gcp_project,
            default_schema=bq_dataset,
        )
    )
)

print(request)

response = df_client.create_compilation_result(request=request)
compilation_result = response.name
logging.info(f'compiled workflow {compilation_result}')
print(compilation_result)


In [None]:
# Execute a workflow (Using the complilation result created in the previous step)


repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'



request = dataform.CreateWorkflowInvocationRequest(
    parent=repo_uri,
    workflow_invocation=dataform.types.WorkflowInvocation(
        compilation_result=compilation_result
    )
)

print(request)

response = df_client.create_workflow_invocation(request=request)
workflow_invocation = response.name
logging.info(f'created workflow invocation {workflow_invocation}')
print (workflow_invocation)

In [None]:
# Get workflow state

repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'



while True:
    request = dataform.GetWorkflowInvocationRequest(
        name=workflow_invocation
    )
    response = df_client.get_workflow_invocation(request)
    state = response.state.name
    logging.info(f'workflow state: {state}')
    if state == 'RUNNING':
        time.sleep(10)
    else:
      break

if state in ('FAILED', 'CANCELING', 'CANCELLED'):
  raise Exception(f'Error while running workflow {workflow_invocation}')
elif state == 'SUCCEEDED':
        print("Success!")