Skip to content

GCP-3046: Add GCP Dataflow Quick Start Script#57

Merged
tedkahwaji merged 4 commits into
mainfrom
teddy.kahwaji/gcp-3046
Oct 30, 2025
Merged

GCP-3046: Add GCP Dataflow Quick Start Script#57
tedkahwaji merged 4 commits into
mainfrom
teddy.kahwaji/gcp-3046

Conversation

@tedkahwaji
Copy link
Copy Markdown
Collaborator

@tedkahwaji tedkahwaji commented Oct 26, 2025

Summary

This PR adds a QuickStart script for automated GCP log forwarding to Datadog via Dataflow. The implementation follows the same pattern established by the Integration QuickStart setup, but is specifically tailored for Dataflow log forwarding configuration.

  • Uses static, predefined values as much as possible to minimize required customer inputs in the UI
  • Implements idempotent operations (checks if resources exist before creation, simpler upsert operations)

Note:
This change also includes some refactoring. It creates a new shared directory for shared logic between the Integration & Log Forwarding QuickStart scripts and generalizes many of the reporter methods to take in a workflow_type, since these will use independent workflow types.

Testing

I ran the script and verified that all steps were executed correctly. I also removed some resources to allow the script to perform the necessary upsert operations.

You can view the logs.


Workflow Steps

  1. Authentication & Scope Selection

    • (same as Integration QuickStart)
    • Collects available GCP projects and folders for configuration scope
  2. User Input Collection

    • Script polls for user selections including:
    • Default project (where all infrastructure will be created)
    • Dataflow region
    • Dataflow Prime enablement flag
    • Optional inclusion filter for log sink
    • Optional exclusion filters for log sink
  3. Pub/Sub Infrastructure Setup

    • Creates two Pub/Sub topics with pull-based subscriptions:
      • export-logs-to-datadog (main topic)
      • export-failed-logs-to-datadog (dead letter topic)
    • Idempotent: checks for existing topics/subscriptions before creation
  4. Service Account Creation

    • Creates or finds datadog-dataflow service account in the default project
    • Used to run the Dataflow job with appropriate permissions
  5. IAM Role Assignment

    • Assigns required roles to the service account:
      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.viewer
      • roles/pubsub.publisher
      • roles/pubsub.subscriber
      • roles/storage.objectAdmin
  6. Secret Management

    • Creates or retrieves Datadog API key via Datadog API
    • Stores API key in Secret Manager as gcp-dataflow-logs-api-key
    • Binds roles/secretmanager.secretAccessor to the Dataflow service account
  7. Log Sink Creation

    • Creates datadog-log-sink for each selected project and folder
    • Folder-level sinks use --include-children for aggregated log collection
    • Applies user-defined inclusion and exclusion filters
    • Grants Pub/Sub publisher permissions to each sink's writer identity
  8. Dataflow Job Deployment

    • Enables Dataflow API on the project
    • Creates pubsub-to-datadog-job using Google's Cloud Pub/Sub to Datadog template
    • Configures job parameters (subscription, Datadog intake URL, API key, dead letter topic)
    • Optionally enables Dataflow Prime if requested by user
    • Idempotent: checks for existing running jobs before creation

Documents

@tedkahwaji tedkahwaji force-pushed the teddy.kahwaji/gcp-3046 branch 3 times, most recently from ad7f89c to a062a88 Compare October 26, 2025 17:00
@tedkahwaji tedkahwaji changed the title Teddy.kahwaji/gcp 3046 GCP-3046: Add GCP Dataflow Quick Start Script Oct 26, 2025
@tedkahwaji tedkahwaji force-pushed the teddy.kahwaji/gcp-3046 branch 5 times, most recently from c9c005c to 3d78fa6 Compare October 27, 2025 11:57
@tedkahwaji tedkahwaji marked this pull request as ready for review October 27, 2025 11:58
@tedkahwaji tedkahwaji requested a review from a team as a code owner October 27, 2025 11:58
@tedkahwaji tedkahwaji requested review from mvhdd and removed request for a team October 27, 2025 11:58
@tedkahwaji tedkahwaji requested review from benjjs and gpalmz October 27, 2025 15:04
@gpalmz
Copy link
Copy Markdown
Collaborator

gpalmz commented Oct 27, 2025

image

Copy link
Copy Markdown
Collaborator

@benjjs benjjs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repo structure and such look good, I'll leave the business logic to the GCP reviewer. Glad this template works for y'all as well.

@tedkahwaji tedkahwaji force-pushed the teddy.kahwaji/gcp-3046 branch from 1240c52 to 5d27a90 Compare October 27, 2025 19:42
Copy link
Copy Markdown

@ash-ddog ash-ddog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of comments, but this looks pretty good for a first pass. We may make edits moving forward but I'm ok with this to start! Great work!

]

PUBSUB_TOPIC_ID: str = "export-logs-to-datadog"
PUBSUB_DEAD_LETTER_TOPIC_ID: str = "export-failed-logs-to-datadog"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd prob just call this export-logs-to-datadog-dlq or something. This name sounds like customers can just replay these logs to the original topic but they actually need to be edited before hand. See: https://cloud.google.com/architecture/stream-logs-from-google-cloud-to-splunk#replay_unprocessed_messages


PUBSUB_TOPIC_ID: str = "export-logs-to-datadog"
PUBSUB_DEAD_LETTER_TOPIC_ID: str = "export-failed-logs-to-datadog"
SECRET_MANAGER_NAME: str = "gcp-dataflow-logs-api-key"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might I suggest we pick a prefix and use it for everything?

i.e.

if we decide on prefix export-logs-to-datadog

then from there we have:

  • export-logs-to-datadog-topic
  • export-logs-to-datadog-dlq
  • export-logs-to-datadog-api-key
  • export-logs-to-datadog-job

so on and so on. Thoughts?

)
continue

if subscription_search[0].get("topic") != topic_full_name:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why/how could this ever happen? 🤔

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encountered this issue while testing. I successfully ran the script from start to finish, then deleted the topic.

When I reran the script, the topic was recreated, but the subscription still pointed to the deleted topic. This check ensures that if the subscription is pointing to an invalid or deleted topic, it will be redirected to the correct one.

step_reporter.report(
message=f"Creating Pub/Sub topic '{topic_id}' in project '{project_id}'..."
)
gcloud(f"pubsub topics create {topic_id} --project={project_id}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that I do not see any options to topics create or subscription create

From a quick search, that does seem right.

Comment on lines +147 to +192
def create_datadog_logs_api_key() -> str:
"""Create a Datadog logs API key."""
response, status = dd_request(
"GET",
f"/api/v2/api_keys?filter={SECRET_MANAGER_NAME}",
)
if status != 200:
raise RuntimeError(f"Failed to get API key: {response}")

json_response = json.loads(response)
data: list[dict[str, Any]] = list(
filter(
lambda key: key.get("attributes", {}).get("name") == SECRET_MANAGER_NAME,
json_response.get("data", []),
)
)

if len(data) > 0:
api_key_id = data[0].get("id")
response, status = dd_request(
"GET",
f"/api/v2/api_keys/{api_key_id}",
)
if status != 200:
raise RuntimeError(f"Failed to get API key: {response}")

json_response = json.loads(response)
return json_response.get("data", {}).get("attributes", {}).get("key")

response, status = dd_request(
"POST",
"/api/v2/api_keys",
{
"data": {
"type": "api_keys",
"attributes": {
"name": SECRET_MANAGER_NAME,
},
},
},
)
if status != 201:
raise RuntimeError(f"Failed to create API key: {response}")

json_response = json.loads(response)
return json_response.get("data", {}).get("attributes", {}).get("key")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed some duplicated code so tried to refactor. But prob more important: can we name differently and change error messages?

for naming, I mean find_or_create (and the comment as well)
For error I mean Failed to search API keys:

def find_or_create_datadog_api_key() -> str:
    """Find or create a Datadog API key."""
    response, status = dd_request(
        "GET",
        f"/api/v2/api_keys?filter={SECRET_MANAGER_NAME}",
    )
    if status != 200:
        raise RuntimeError(f"Failed to search API keys: {response}")

    api_key_info: dict[str, Any] = next(
        filter(lambda key: key.get("attributes", {}).get("name") == SECRET_MANAGER_NAME, json.loads(response).get("data", [])),
        None
    )

    if api_key_info:
        response, status = dd_request(
            "GET",
            f"/api/v2/api_keys/{api_key_info.get('id')}",
        )
        if status != 200:
            raise RuntimeError(f"Failed to get API key: {response}")
    else:
        response, status = dd_request(
            "POST",
            "/api/v2/api_keys",
            {
                "data": {
                    "type": "api_keys",
                    "attributes": {
                        "name": SECRET_MANAGER_NAME,
                    },
                },
            },
        )
        if status != 201:
                raise RuntimeError(f"Failed to create API key: {response}")

    return json.loads(response).get("data", {}).get("attributes", {}).get("key")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this seems semi-sensitive. Are we sure we want to raise an error with the entire response? Any risk of leaking sensitive info there? Can we audit the entire script for this type of thing? 🙏

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty I took the refactor.

I'm not concerned about sensitive info here, as I'm confident these error responses don’t contain anything sensitive. However, just to be safe, I removed returning the full error response in the GET call.

Also these error will only be visible to the customer and won't appear in our logs or system.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed returning the full error response in the GET call.

Sorry for the back and forth. If you're confident this won't print anything sensitive, we may want to leave it in? i.e. A customer with a support case can then give us the error message?

f"logging sinks describe {log_sink_name} {resource_container_filter}",
*["writerIdentity"],
)
if writer_identity := sink_info.get("writerIdentity"):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐘 🤷

--quiet"
)

dataflow_job_name: str = "pubsub-to-datadog-job"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: constant?


matched_dataflow_jobs = gcloud(
f"dataflow jobs list --project={project_id} --region={region} "
f"--filter='name:{dataflow_job_name} AND NOT (state=DONE OR state=FAILED OR state=CANCELLED OR state=DRAINED OR state=UPDATED)'"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use exclusions instead of state=RUNNING? What if they add another state?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditional below was originally checking the length of jobs with state=Done or state=Failed..., which was incorrect. So, I added a NOT to fix; I agree that just checking for running is much simpler, so that's been updated.

Comment on lines +357 to +359
step_reporter.report(
message=f"Successfully created Dataflow job '{dataflow_job_name}'"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something we can do to ensure things are working for the customer? Or at least wait til the job is running?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can’t track that during script execution. The best we can do is verify that the resources were created successfully.

We could pause the script and wait for the job to start processing, but that would take a ton of time. I am thinking about this for the UI changes; maybe pointing the user to a logs query or Dataflow metric

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best we can do is verify that the resources were created successfully. We could pause the script and wait for the job to start processing, but that would take a ton of time.

Yea sorry, not suggesting we wait minutes for logs to start flowing. But, is it worth polling for maybe 10s or so to ensure the job is in the RUNNING state and not FAILED or something? Again, this is optional, just an idea.

},
)
@patch("gcp_integration_quickstart.requests.request")
@patch("gcp_shared.requests.request")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out-of-scope but something to think about.

Has anyone started to think about a way to have integration tests for this stuff that run against a real GCP env? The amount of patching in the tests is a bit worry-some. That said, I know we try these flows a lot before release, but still would be nice to have some automated way to ensure things continue to work against a real environment (i.e. maybe a gcloud command changes or a role is renamed or something of that sort).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, integration testing for something like this isn't trivial there are multiple cases to consider:

  1. First-time creation
  2. Some resources created & deleted—verify that the script can recreate & update them

If a command changes and starts causing errors, we have telemetry in place to monitor the failure rates for the script.

I'll note that I made sure not to use alpha or beta gcloud commands, since those are subject to change. I'm fairly confident that the existing gcloud commands will remain stable (backward compatibility in the case of future changes).

For now, I'm confident that verifying the exact gcloud commands are executed in the right order, along with smoke testing things end-to-end.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integration testing for something like this isn't trivial

Not trivial doesn't mean not worth doing, but I hear ya. I do think it's possible. It would be:

  • Create a new project or use and existing test project
  • Run the script, assuming customer options
  • Ensure the resources are created in the project and the job is running
  • Teardown

We can talk offline about it.

@tedkahwaji tedkahwaji force-pushed the teddy.kahwaji/gcp-3046 branch from 9ba3437 to 861cc30 Compare October 30, 2025 16:15
@tedkahwaji tedkahwaji merged commit 679fcdc into main Oct 30, 2025
2 checks passed
@tedkahwaji tedkahwaji deleted the teddy.kahwaji/gcp-3046 branch October 30, 2025 17:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants