In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# HPA Workload Recommender - Export Metrics




## Overview
This Colab can be used for export GKE metrics from Cloud Monitoring

## Getting Started

### Install required packages

In [1]:
! pip3 install --upgrade --quiet click google-auth urllib3 requests pandas pyarrow

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/200.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/200.9 kB[0m [31m675.9 kB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.4/200.9 kB[0m [31m757.9 kB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━[0m [32m163.8/200.9 kB[0m [31m1.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.9/200.9 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m126.3/126.3 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m59.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m39.9/39.9 MB[0m [31m40.0 MB/s[0m eta [36m0:00:

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [3]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [2]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()



In [10]:
PROJECT_ID = "gke-opt-demo"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
CLUSTER_NAME = "online-shop"  # @param {type:"string"}
NAMESPACE = "default"  # @param {type:"string"}
CONTROLLER_NAME = "frontend"  # @param {type:"string

# Query Period
ANALYSIS_START_DATETIME = '2024-08-16T00:00:00Z'  # @param {type:"string"}
ANALYSIS_END_DATETIME = '2024-09-16T00:00:00Z'  # @param {type:"string"}

# Retrieve the project number
PROJECT_NUMBER = !gcloud projects list --filter="PROJECT_ID:'{PROJECT_ID}'" --format='value(PROJECT_NUMBER)'
PROJECT_NUMBER = PROJECT_NUMBER[0]

In [11]:
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [15]:
!gcloud auth application-default login


You are running on a Google Compute Engine virtual machine.
The service credentials associated with this virtual machine
will automatically be used by Application Default
Credentials, so it is not necessary to use this command.

If you decide to proceed anyway, your user credentials may be visible
to others with access to this virtual machine. Are you sure you want
to authenticate with your personal account?

Do you want to continue (Y/n)?  Y

Go to the following link in your browser, and complete the sign-in prompts:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fapplicationdefaultauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=29qJ8E4BsKNNLUEpksrlVdsYA3wKtB&prompt=consent&token_

In [27]:
#@title Imports
import pandas as pd
from pathlib import Path
import zipfile
import json
import logging
from google.auth import default
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import os
from datetime import datetime
import uuid

## Export Pod Startup Time from Asset Inventory

In [33]:
#@title Get GKE Pod startup time
def fetch_and_process_assets(project_id, location, cluster_name, controller_name, namespace):
    """
    Fetches Kubernetes asset inventory data from Google Cloud API and returns it as a DataFrame.
    """

    # Initialize the API client
    try:
        credentials, project_id = default()
        credentials.refresh(Request())
        service = build('cloudasset', 'v1', credentials=credentials)
    except Exception as e:
        print(f"Failed to authenticate and initialize the Google Cloud Asset API: {e}")
        return pd.DataFrame()

    asset_data = []
    next_page_token = None

    # Paginate through all pages of results
    while True:
        # Make the request for asset inventory with pagination handling
        try:
            request = service.assets().list(
                parent=f"projects/{project_id}",
                assetTypes=["k8s.io/Pod"],
                contentType="RESOURCE",
                pageToken=next_page_token
            )
            response = request.execute()
        except Exception as e:
            print(f"Error fetching asset inventory: {e}")
            return pd.DataFrame()

        # Process the current page's response and extract relevant data
        if 'assets' in response:
            for asset in response['assets']:
                name = asset.get('name', '')

                # Construct the expected name format
                expected_name = f'projects/{project_id}/locations/{location}/clusters/{cluster_name}/k8s/namespaces/{namespace}/pods/{controller_name}'

                # Check if the name exactly matches the constructed expected name
                if expected_name in name:  # Use `startswith` to handle pod suffix like '-68f5d8498d-67bzg'
                    resource = asset.get('resource', {}).get('data', {})

                    # Extract container information and check for readinessProbe
                    containers_info = extract_container_info(resource, location, cluster_name, namespace, controller_name, project_id)

                    # Add the containers info to the asset data
                    if containers_info:
                        asset_data.extend(containers_info)

        # Check if there is a next page token
        next_page_token = response.get('nextPageToken', None)
        if not next_page_token:
            break

    # Convert the list of asset data into a DataFrame
    return pd.DataFrame(asset_data)


def extract_container_info(resource, location, cluster_name, namespace, controller_name, project_id):
    """
    Extracts the workload information from a resource and checks if readinessProbe exists.
    Also includes metadata like location, cluster_name, namespace, etc.

    Parameters:
    - resource (dict): The resource data from the asset API response.
    - location (str): The location to filter by.
    - cluster_name (str): The cluster name to filter by.
    - namespace (str): The namespace to filter by.
    - controller_name (str): The controller name to filter by.

    Returns:
    - list: A list of dictionaries containing container information, readinessProbe, status conditions, and metadata.
    """
    container_list = []

    # Ensure that all relevant fields are present before proceeding
    if 'spec' not in resource or 'status' not in resource:
        return []

    # Extract container information
    for container in resource.get('spec', {}).get('containers', []):
        container_info = {}

        # Add metadata (location, cluster_name, namespace, etc.)
        container_info['project_id'] = project_id
        container_info['location'] = location
        container_info['cluster_name'] = cluster_name
        container_info['namespace'] = namespace
        container_info['controller_name'] = controller_name
        container_info['container_name'] = container.get('name', 'Unknown')
        container_info['readiness_probe_exists'] = 'readinessProbe' in container

        # Extract status conditions with lastTransitionTime
        status_conditions = extract_status_conditions(resource.get('status', {}).get('conditions', []))
        container_info.update(status_conditions)

        # Append to the list of containers
        container_list.append(container_info)

    return container_list


def extract_status_conditions(conditions):
    """
    Extracts the 'PodScheduled' and 'Ready' conditions and their lastTransitionTime from a list of conditions.

    Parameters:
    - conditions (list): A list of status conditions.

    Returns:
    - dict: A dictionary with 'PodScheduled' and 'Ready' statuses and lastTransitionTimes.
    """
    status_conditions = {
        'PodScheduled_lastTransitionTime': 'Unknown',
        'Ready_lastTransitionTime': 'Unknown'
    }

    for condition in conditions:
        if condition['type'] == 'PodScheduled':
            status_conditions['PodScheduled_lastTransitionTime'] = condition.get('lastTransitionTime', 'Unknown')
        elif condition['type'] == 'Ready':
            status_conditions['Ready_lastTransitionTime'] = condition.get('lastTransitionTime', 'Unknown')

    return status_conditions


## Export GKE Metrics from Cloud Monitoring

Export the following [GKE metrics](https://https://cloud.google.com/monitoring/api/metrics_kubernetes#kubernetes) from Cloud Monitoring and save to a file
- kubernetes.io/container/cpu/core_usage_time
- kubernetes.io/container/memory/used_bytes
- kubernetes.io/container/cpu/request_cores
- kubernetes.io/container/memory/request_bytes


In [34]:
#@title Save Files
def save_dataframes(output_dir: Path, format: str, metrics_data: dict, prefix: str, zip_files: bool):
    """
    Save all dataframes (metrics and asset) to disk in the specified format, and optionally zip the files.

    Args:
    - output_dir (Path): Directory where files will be saved.
    - format (str): File format, either 'csv' or 'parquet'.
    - metrics_data (dict): Dictionary of metrics dataframes.
    - prefix (str): Unique prefix for all files.
    - zip_files (bool): If True, zip the files after saving.
    """

    # Save all metric dataframes
    for metric_name, df in metrics_data.items():
        file_path = output_dir / f"{prefix}_{metric_name}.{format}"
        try:
            if format == 'csv':
                df.to_csv(file_path, index=False)
            elif format == 'parquet':
                df.to_parquet(file_path, index=False)
            print(f"Saved {metric_name} metrics to {file_path}")
        except Exception as e:
            print(f"Failed to save {metric_name} metrics to {file_path}. Error: {e}")


    # Optionally zip the files
    if zip_files:
        zip_file_path = output_dir.parent / f"{prefix}_metrics_and_assets.zip"
        try:
            with zipfile.ZipFile(zip_file_path, 'w') as zipf:
                for root, _, files in os.walk(output_dir):
                    for file in files:
                        file_path = os.path.join(root, file)
                        zipf.write(file_path, os.path.relpath(file_path, output_dir))
            print(f"Zipped files into {zip_file_path}")
        except Exception as e:
            print(f"Failed to zip files into {zip_file_path}. Error: {e}")


In [32]:
#@title Get GKE Metrics

# Exclude namespaces that should not be included in the metrics gathering
EXCLUDED_NAMESPACES = [
    "kube-system", "istio-system", "gatekeeper-system", "gke-system",
    "gmp-system", "gke-gmp-system", "gke-managed-filestorecsi", "gke-mcs"
]

# Fields to be used in the groupBy in the API query
GROUP_BY_FIELDS = [
    "resource.labels.project_id",
    "resource.labels.location",
    "resource.labels.cluster_name",
    "resource.labels.namespace_name",
    "resource.labels.container_name",
    "resource.labels.pod_name",
    "metadata.system_labels.top_level_controller_name",
    "metadata.system_labels.top_level_controller_type"

]

def build_filter_string(
    metric: str,
    project_id: str = '',
    location: str = '',
    cluster_name: str = '',
    namespace: str = '',
    container_name: str = '',
    controller_name: str = '',
    controller_type: str = ''
) -> str:
    """
    Constructs a filter string for querying based on provided parameters.

    Parameters:
    - metric (str): The metric type to be used in the filter.
    - project_id (str): The project ID for the filter.
    - location (str): The location for the filter.
    - cluster_name (str): The cluster name for the filter.
    - namespace (str): The namespace for the filter.
    - container_name (str): The container name for the filter.
    - controller_name (str): The controller name for the filter.
    - controller_type (str): The controller type for the filter.

    Returns:
    - str: A constructed filter string.
    """
    filter_conditions = [
        f'metric.type = "{metric}"',
        'resource.type = "k8s_container"'
    ]

    if 'memory/used_bytes' in metric.lower():
        filter_conditions.append('metric.label.memory_type = "non-evictable"')

    if project_id:
        filter_conditions.append(f'resource.labels.project_id = "{project_id}"')

    if location:
        filter_conditions.append(f'resource.labels.location = "{location}"')

    if cluster_name:
        filter_conditions.append(f'resource.labels.cluster_name = "{cluster_name}"')

    if namespace:
        filter_conditions.append(f'resource.labels.namespace_name = "{namespace}"')

    if container_name:
        filter_conditions.append(f'resource.labels.container_name = "{container_name}"')
    if controller_name:
        filter_conditions.append(f'metadata.system_labels.top_level_controller_name = "{controller_name}"')
    if controller_type:
        filter_conditions.append(f'metadata.system_labels.top_level_controller_type = "{controller_type}"')

    # Exclude unwanted namespaces
    excluded_filter = ' AND '.join(
        f'NOT resource.labels.namespace_name = "{namespace}"' for namespace in EXCLUDED_NAMESPACES
    )
    filter_conditions.append(excluded_filter)

    return ' AND '.join(filter_conditions)

def fetch_metrics_from_api(
    project_id, location, cluster_name, namespace, container_name,
    controller_name, controller_type, metric, start_time, end_time,
    per_series_aligner, cross_series_reducer):
    """
    Fetches metrics from Google Cloud Monitoring API based on the provided parameters.
    """
    # Initialize the API client
    try:
        credentials, project_id = default()
        credentials.refresh(Request())
        service = build('monitoring', 'v3', credentials=credentials)
    except Exception as e:
        print(f"Failed to authenticate and initialize the Google Cloud Asset API: {e}")
        return

    filter_ = build_filter_string(
        metric=metric,
        project_id=project_id,
        location=location,
        cluster_name=cluster_name,
        namespace=namespace,
        container_name=container_name,
        controller_name=controller_name,
        controller_type=controller_type
    )

    print(f"Fetching data for metric: {metric} ...")

    try:
        all_time_series_data = []
        request = service.projects().timeSeries().list(
            name=f"projects/{project_id}",
            aggregation_alignmentPeriod="60s",
            aggregation_crossSeriesReducer=cross_series_reducer,
            aggregation_groupByFields=GROUP_BY_FIELDS,
            aggregation_perSeriesAligner=per_series_aligner,
            filter=filter_,
            interval_endTime=end_time,
            interval_startTime=start_time
        )

        while request is not None:
            response = request.execute()
            all_time_series_data.extend(response.get('timeSeries', []))
            nextPageToken = response.get('nextPageToken')

            request = service.projects().timeSeries().list_next(previous_request=request, previous_response=response) if nextPageToken else None

        df = pd.json_normalize(
            all_time_series_data,
            record_path='points',
            meta=[
                ['metric', 'type'],
                ['resource', 'type'],
                ['resource', 'labels', 'project_id'],
                ['resource', 'labels', 'location'],
                ['resource', 'labels', 'cluster_name'],
                ['resource', 'labels', 'namespace_name'],
                ['resource', 'labels', 'container_name'],
                ['resource', 'labels', 'pod_name'],
                ['metadata', 'systemLabels', 'top_level_controller_name'],
                ['metadata', 'systemLabels', 'top_level_controller_type']
            ],
            errors='ignore'
        )

        if df.empty:
            print(f"No data found for metric: {metric}")
        else:
            print(f"Successfully fetched.")

        return df if not df.empty else pd.DataFrame()

    except Exception as e:
        print(f"Error fetching metrics: {e}")
        return pd.DataFrame()


def fetch_all_metrics(
    project_id, location, cluster_name, namespace, container_name,
    controller_name, controller_type, start_time, end_time, metrics_info):
    """
    Fetch all required metrics as per the metrics info configuration.
    """
    print(f"Starting to fetch metrics for the following configuration: "
               f"Project ID: {project_id}, Location: {location}, "
               f"Cluster Name: {cluster_name}, Namespace: {namespace}, "
               f"Controller Name: {controller_name}")

    all_metrics_data = {}

    for key, info in metrics_info.items():

        metric_type = info["metric_type"]
        aligner = info.get("aligner", "ALIGN_MEAN")
        reducer = info.get("reducer", "REDUCE_MEAN")

        metric_data = fetch_metrics_from_api(
            project_id, location, cluster_name, namespace, container_name,
            controller_name, controller_type, metric_type, start_time, end_time,
            aligner, reducer
        )

        if not metric_data.empty:
            all_metrics_data[key] = metric_data
        else:
            print(f"No data found for {metric_type}.")

        # Fetch pod startup time from Asset Inventory
        all_metrics_data['pod_startup']  = fetch_and_process_assets(
            project_id,
            location,
            cluster_name,
            controller_name,
            namespace
            )

    print("Completed fetching all metrics.")
    return all_metrics_data


In [35]:
#@title Save GKE Metrics to File
def main(project_id, location, cluster_name, namespace, controller_name,
         start_time, end_time, format, zip_files, output_dir):
    """Fetch GKE metrics, save each metric type to its own file, optionally fetch the asset inventory, and optionally zip all files into one folder."""

    unique_prefix = f"{datetime.now().strftime('%Y%m%d')}_{uuid.uuid4().hex[:4]}"

    # Load configuration and set up storage directory

    storage_dir = Path(output_dir) if output_dir else '.'

    # Ensure storage directory exists
    if not storage_dir.exists():
        storage_dir.mkdir(parents=True, exist_ok=True)

    # Full path for output files
    output_dir = storage_dir / unique_prefix
    if not output_dir.exists():
        output_dir.mkdir(parents=True, exist_ok=True)

    # Metric Info Configuration for Fetching Multiple Metrics
    metrics_info = {
        "cpu_usage": {
            "metric_type": "kubernetes.io/container/cpu/core_usage_time",
            "aligner": "ALIGN_RATE",
            "reducer": "REDUCE_MEAN",
        },
        "memory_usage": {
            "metric_type": "kubernetes.io/container/memory/used_bytes",
            "aligner": "ALIGN_MAX",
            "reducer": "REDUCE_MAX",
        },
        "cpu_request": {
            "metric_type": "kubernetes.io/container/cpu/request_cores",
            "aligner": "ALIGN_MEAN",
            "reducer": "REDUCE_MEAN",
        },
        "memory_request": {
            "metric_type": "kubernetes.io/container/memory/request_bytes",
            "aligner": "ALIGN_MEAN",
            "reducer": "REDUCE_MEAN",
        }
    }

    # Initialize placeholders for data
    all_metrics_data = {}


    # Try block for fetching both metrics and asset inventory data
    try:
        # Fetch GKE metrics
        all_metrics_data = fetch_all_metrics(
            project_id=project_id,
            location=location,
            cluster_name=cluster_name,
            namespace=namespace,
            container_name='',
            controller_name=controller_name,
            controller_type='Deployment',
            start_time=start_time,
            end_time=end_time,
            metrics_info=metrics_info
        )
        if not all_metrics_data:
            print("No metrics data found. Please ensure the parameters are correct.")
            return

    except Exception as e:
        print(f"Error fetching data: {e}. Please check your input parameters.")
        return

    # Save all dataframes (metrics and assets), and optionally zip them
    save_dataframes(output_dir, format, all_metrics_data, unique_prefix, zip_files)

if __name__ == '__main__':
  # file options csv and parquet
    main(PROJECT_ID, LOCATION, CLUSTER_NAME, NAMESPACE, CONTROLLER_NAME, ANALYSIS_START_DATETIME, ANALYSIS_END_DATETIME, 'parquet', True, 'output')


Starting to fetch metrics for the following configuration: Project ID: gke-opt-demo, Location: us-central1, Cluster Name: online-shop, Namespace: default, Controller Name: frontend
Fetching data for metric: kubernetes.io/container/cpu/core_usage_time ...
Successfully fetched.
Fetching data for metric: kubernetes.io/container/memory/used_bytes ...
Successfully fetched.
Fetching data for metric: kubernetes.io/container/cpu/request_cores ...
Successfully fetched.
Fetching data for metric: kubernetes.io/container/memory/request_bytes ...
Successfully fetched.
Completed fetching all metrics.
Saved cpu_usage metrics to output/20240917_1687/20240917_1687_cpu_usage.parquet
Saved pod_startup metrics to output/20240917_1687/20240917_1687_pod_startup.parquet
Saved memory_usage metrics to output/20240917_1687/20240917_1687_memory_usage.parquet
Saved cpu_request metrics to output/20240917_1687/20240917_1687_cpu_request.parquet
Saved memory_request metrics to output/20240917_1687/20240917_1687_memor

# Download the .zip file

In [37]:
#download .zip file


output
