<a href="https://colab.research.google.com/github/KierFR-clean/temporal_script/blob/main/temporalscript.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install libraries

In [None]:
!pip install google-cloud-bigquery pandas


authenticate by service account key/gcloud CLI.

In [None]:
from google.colab import auth
auth.authenticate_user()


using a service account key file:

In [None]:
from google.colab import files
uploaded = files.upload()  # Uploadservice account

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "ser_acc_path.json"


script adapted to run in colab/bigquery instead localfile

In [None]:
import pandas as pd
from google.cloud import bigquery
from datetime import datetime, timedelta
import logging

#  debug log
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class GoogleClusterDataPreprocessor:
    """
    Preprocessor script
    """

    def __init__(self, project_id: str = "google.com:google-cluster-data"):
        """
        Init with proj id
        """
        self.project_id = project_id
        self.client = bigquery.Client()
        self.clusters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

        # tables in the dataset
        self.tables = [
            'machine_events',
            'machine_attributes',
            'collection_events',
            'instance_events',
            'instance_usage'
        ]

    def convert_time_to_datetime(self, df: pd.DataFrame, time_col: str = 'time') -> pd.DataFrame:
        """
        timestamp -> microseconds
        """
        if time_col in df.columns:
            trace_start = datetime(2019, 5, 1, 7, 0, 0)  # Start timestamp
            df[time_col] = pd.to_datetime(trace_start + pd.to_timedelta(df[time_col] - 600_000_000, unit='us'))
        return df

    def query_table(self, cluster: str, table: str, limit: int = None) -> pd.DataFrame:
        """
        Query a table from dataframe
        """
        dataset_id = f"clusterdata_2019_{cluster}"
        table_id = f"{self.project_id}.{dataset_id}.{table}"
        query = f"SELECT * FROM `{table_id}`"

        if limit:
            query += f" LIMIT {limit}"

        logger.info(f"Querying {table_id} with limit {limit}")

        try:
            df = self.client.query(query).to_dataframe()
            logger.info(f"Retrieved {len(df)} rows from {table_id}")
            return df
        except Exception as e:
            logger.error(f"Error querying {table_id}: {e}")
            return pd.DataFrame()

    def preprocess_instance_events(self, cluster: str, limit: int = None) -> pd.DataFrame:
        """
        Preprocess instance events
        """
        df = self.query_table(cluster, 'instance_events', limit)
        if df.empty:
            return df

        # microscs ts -> datetime
        df = self.convert_time_to_datetime(df)

        # Parse rq & sc
        if 'resource_request' in df.columns:
            df['requested_cpus'] = df['resource_request'].apply(
                lambda x: x.get('cpus', 0) if isinstance(x, dict) else 0
            )
            df['requested_memory'] = df['resource_request'].apply(
                lambda x: x.get('memory', 0) if isinstance(x, dict) else 0
            )

        if 'scheduling_class' in df.columns:
            df['scheduling_class'] = df['scheduling_class'].apply(
                lambda x: 'Production' if x >= 120 else 'Non-Production'
            )

        df['cluster'] = cluster
        return df

    def preprocess_instance_usage(self, cluster: str, limit: int = None) -> pd.DataFrame:
        """
        Preprocess resource util.
        """
        df = self.query_table(cluster, 'instance_usage', limit)
        if df.empty:
            return df

        df = self.convert_time_to_datetime(df, 'start_time')
        df = self.convert_time_to_datetime(df, 'end_time')

        for usage_col in ['average_usage', 'maximum_usage']:
            if usage_col in df.columns:
                df[f'{usage_col}_cpus'] = df[usage_col].apply(
                    lambda x: x.get('cpus', 0) if isinstance(x, dict) else 0
                )
                df[f'{usage_col}_memory'] = df[usage_col].apply(
                    lambda x: x.get('memory', 0) if isinstance(x, dict) else 0
                )

        df['cluster'] = cluster
        return df

    def sample_data(self, table: str, clusters: list = None, limit: int = 10000) -> pd.DataFrame:
        """
        Sample data.
        """
        if clusters is None:
            clusters = self.clusters

        combined_df = pd.DataFrame()

        for cluster in clusters:
            logger.info(f"Sampling {table} from cluster {cluster}")

            if table == 'instance_events':
                df = self.preprocess_instance_events(cluster, limit)
            elif table == 'instance_usage':
                df = self.preprocess_instance_usage(cluster, limit)
            else:
                df = self.query_table(cluster, table, limit)

            if not df.empty:
                combined_df = pd.concat([combined_df, df], ignore_index=True)

        return combined_df

    def export_to_csv(self, df: pd.DataFrame, filename: str) -> None:
        """
        Export to CSV.
        """
        df.to_csv(filename, index=False)
        logger.info(f"Exported {len(df)} rows to {filename}")

def main():
    """
    Main function.
    """
    preprocessor = GoogleClusterDataPreprocessor()

    # Sample instance events
    logger.info("Sampling instance events data...")
    instance_events_df = preprocessor.sample_data('instance_events', limit=50000)

    if not instance_events_df.empty:
        instance_events_df = instance_events_df.dropna(subset=['collection_id', 'machine_id'])
        preprocessor.export_to_csv(instance_events_df, 'google_cluster_instance_events_sample.csv')

    # Sample instance usage data
    logger.info("Sampling instance usage data...")
    usage_df = preprocessor.sample_data('instance_usage', limit=10000)

    if not usage_df.empty:
        preprocessor.export_to_csv(usage_df, 'google_cluster_usage_sample.csv')

if __name__ == '__main__':
    main()


Authenticate your Google Cloud account.

Install the required libraries.

Run the Python code

download the processed data as CSV files

In [None]:
from google.colab import files
files.download("google_cluster_instance_events_sample.csv")
files.download("google_cluster_usage_sample.csv")
