In [1]:
import re
from pprint import pprint
from base_gen_deletion_script import *

In [53]:
def run_command(command: List[str]) -> dict:
    """Run a shell command and return the output."""
    result = subprocess.run(command, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Error running command: {' '.join(command)}")
        print(result.stderr)
        # sys.exit(1)
    ret = {
        "stdout": result.stdout,
        "stderr": result.stderr,
        "returncode": result.returncode
    }
    return ret 

def clean_text(input_text: str) -> str:
    """
    Cleans the input text by removing ANSI escape sequences and formatting codes.

    Args:
        input_text (str): Text containing ANSI escape sequences.

    Returns:
        str: Cleaned text without escape sequences.
    """
    # Regex pattern to match ANSI escape sequences
    ansi_escape_pattern = re.compile(r'\x1b\[[0-9;]*m')
    cleaned_text = ansi_escape_pattern.sub('', input_text)
    return cleaned_text

def parse_help_texts_block(help_text: str, block_name: str, 
                          cmd_indent: int = 5, desc_indent: int = 8,
                          verbose: bool = False) -> dict:
    """
    Extracts a specific block of text from the help text based on the block name.

    Args:
        help_text (str): The entire help text from the gcloud command.
        block_name (str): The name of the block to extract.

    Returns:
        dict: {arg_name: arg_description}
    """
    args_desc = {}

    in_block = False
    for line in clean_text(help_text).split("\n"):
        # print(line)

        # if the line starts with the block name
        if line.startswith(block_name): 
            in_block = True
        elif in_block and line.isupper(): 
            # check stop condition, when the line starts with all capital letters
            in_block = False
            break
        
        # print if in block and not new line
        if in_block and line != "": 
            if verbose: print(line)
            if line.startswith(" "*cmd_indent) and line[cmd_indent] != " ":
                tmp_arg = line.strip()
                # print(tmp_arg)
                args_desc[tmp_arg] = None
            elif line.startswith(" "*desc_indent) and tmp_arg != None:
                # remove front and back spaces
                args_desc[tmp_arg] = line.strip()
                tmp_arg = None
    
    # print(args_desc)
    # print()
    return args_desc

In [54]:
out_dict = run_command(["gcloud", "compute", "networks", "--help"])

block = "GROUPS"
parse_help_texts_block(out_dict["stdout"], block)

{'peerings': 'List, create, and delete, and update VPC Network Peering.',
 'subnets': 'List, describe, and delete, and update Compute Engine subnetworks.',
 'vpc-access': 'Manage VPC Access Service resources.'}

In [102]:
"""
Ex 
    input: ["gcloud", "compute", "networks"]
    return: {'peerings': {}, 'subnets': {}}
"""
def get_sub_resources(gcloud_cmds: List[str], included_cmds: List[str]=["create", "delete", "list"],
                      uri_supported: bool=True, verbose: bool=False) -> list:
    """
    Get sub-resources for a given GCP resource type.

    Args:
        gcloud_component (str): The GCP component (e.g., "compute", "sql", "pubsub")
        resource_type (str): The specific resource type (e.g., "networks", "instances")

    Returns:
        dict: {sub_resource1: {}, sub_resource2: {}}
    """
    ret_list = []
    
    # use help to get the sub-resources from "GROUP"
    out_dict = run_command(gcloud_cmds + ["--help"])
    # print(out_dict["stdout"])
    help_text = out_dict["stdout"]
    group_dict = parse_help_texts_block(help_text, "GROUPS")
    # print(group_dict)
    
    for k, v in group_dict.items():
        group_help_text = run_command(gcloud_cmds + [k, "--help"])["stdout"]
        group_cmd = parse_help_texts_block(group_help_text, "COMMANDS").keys()
        # print(group_cmd)
        if all(cmd in group_cmd for cmd in included_cmds):
            if uri_supported:
                # check cmd + "list --help" 
                list_help_text = run_command(gcloud_cmds + [k, "list", "--help"])["stdout"]
                list_flags = parse_help_texts_block(list_help_text, "LIST COMMAND FLAGS").keys()
                if "--uri" in list_flags:
                    ret_list.append(f"{gcloud_cmds[-1]} {k}")
            else:
                ret_list.append(f"{gcloud_cmds[-1]} {k}")
    return ret_list

# Recursively func to call get_sub_resources
def get_sub_resources_recursively(gcloud_cmds: List[str], sub_resources: dict={}, 
                                  included_cmds: List[str]=["create", "delete", "list"],
                                  uri_supported: bool=True,
                                  verbose: bool=False) -> dict:
    # sub_resources = get_sub_resources(gcloud_cmds, sub_resources, included_cmds, uri_supported, verbose)
    sub_resources.update(get_sub_resources(gcloud_cmds, sub_resources, included_cmds, uri_supported, verbose))
    for k, v in sub_resources.items():
        # print(k, v)
        sub_resources_help_cmd = gcloud_cmds + [k, "--help"]
        # print(sub_resources_help_cmd)
        help_text = run_command(sub_resources_help_cmd)["stdout"]
        if parse_help_texts_block(help_text, "GROUPS"):
            # if there is a sub-group, recursively call get_sub_resources_recursively
            get_sub_resources_recursively(gcloud_cmds + [k], sub_resources[k], included_cmds, uri_supported, verbose)
    return sub_resources

In [103]:
# get_sub_resources(["gcloud", "compute", "networks"])
# get_sub_resources(["gcloud", "compute"])

# get_sub_resources_recursively(["gcloud", "compute", "networks"])  
# get_sub_resources_recursively(["gcloud", "compute", "instances"])  
pprint(get_sub_resources(["gcloud", "compute", "networks"]))     

['networks peerings', 'networks subnets']


In [88]:
out_dict = run_command(["gcloud", "compute", "instances", "--help"])

block = "GROUPS"
parse_help_texts_block(out_dict["stdout"], block)

{'bulk': 'Manipulate multiple Compute Engine virtual machines with single command',
 'network-interfaces': 'Read and manipulate Compute Engine VM instance network interfaces.',
 'ops-agents': 'Manage Google Cloud Observability agents for Compute Engine VM',
 'os-inventory': 'Read Compute Engine OS Inventory Data and Related Resources.'}

In [68]:
clean_text(out_dict["stdout"])

"NAME\n    gcloud compute instances - read and manipulate Compute Engine virtual\n        machine instances\n\nSYNOPSIS\n    gcloud compute instances GROUP | COMMAND [GCLOUD_WIDE_FLAG ...]\n\nDESCRIPTION\n    Read and manipulate Compute Engine virtual machine instances.\n\n    For more information about virtual machine instances, see the virtual\n    machine instances documentation\n    (https://cloud.google.com/compute/docs/instances/).\n\n    See also: Instances API\n    (https://cloud.google.com/compute/docs/reference/rest/v1/instances).\n\nGCLOUD WIDE FLAGS\n    These flags are available to all commands: --help.\n\n    Run $ gcloud help for details.\n\nGROUPS\n    GROUP is one of the following:\n\n     bulk\n        Manipulate multiple Compute Engine virtual machines with single command\n        executions.\n\n     network-interfaces\n        Read and manipulate Compute Engine VM instance network interfaces.\n\n     ops-agents\n        Manage Google Cloud Observability agents for C

In [69]:
# a temp filter to filter out the resources: delete requried other args 
# def filter_resources_by_other_args(resources: List[str]) -> List[str]:

# Filter out the case that delete requires other args
# parse_help_texts_block(run_command(["gcloud", "compute", "networks", "peerings", "delete", "--help"])["stdout"], "REQUIRED")

def filter_resources_by_other_delete_args(cmd_path: List[str]) -> bool:
    cmd_path.append("delete")
    cmd_path.append("--help")
    out_dict = run_command(cmd_path)
    help_text = out_dict["stdout"]
    return parse_help_texts_block(help_text, "REQUIRED")

In [77]:
get_sub_resources(["gcloud", "compute", "networks"])

{'peerings': {}, 'subnets': {}}

In [78]:
filter_resources_by_other_delete_args(["gcloud", "compute", "networks", "peerings"])

{'--network=NETWORK': 'The name of the network in the current project containing the peering.'}

In [83]:
out_dict = run_command(["gcloud", "compute", ["networks", "subnets"], "list", "--uri"])
print(clean_text(out_dict["stdout"]))

TypeError: expected str, bytes or os.PathLike object, not list

In [81]:
# read json file
with open("keys/gcp-keys.json", "r") as f:
    data = json.load(f)
print(data)

{'type': 'service_account', 'project_id': 'terraform-437400', 'private_key_id': '7e67f820ccad26cd4f112be3e398717f0944a2b2', 'private_key': '-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQChab55f0fVHmdr\nI5RA2adA3Z7OlkRhS7PDRqdYtDUjA5RHEvfQ5rrx/veL4K+DKw881EITUmraNIph\nTrhQH2+WoJky2JNFPLmyV4j22IAqiyQlXsJxRDYPpkBZLwrnYkYBYE/LRbp+EvPM\n0p3mLAv0jiM3jJ2YpdYRocme836V1XszWsK6BDaIVi6r71H+wqjuc1OlREO7tcSD\nse7jEPTDcNyfGXnfELuB1vpSy2IvZDdfzrrywIUqmguGpx2yEkKez094EA4S45nY\nF9ie45b8vJjHI5whC1mibDRGYlUgBwVuNoz6wlghn75b0A1yJTltDU/tQ7xlHkgT\nFDEQNz83AgMBAAECggEARBnrybIYWgQOaSclPSk3hkga4RmINy6r5PdR0ryRcLjp\nXgVUNjiq5kAHLGZ7VpGbx7PcPu+2cmLZgY2nePKSb5BJoqi6les1g9eny1Hz3T9A\nXF37FrPLRVYE7zvIU3Bkjd8XO3GradbQMvonsX2i+6bel+jFcyqAw4apnN0p6sfv\nHKgalYUecI0FrPijHFXI9nvIQiHCRvK+nJcWhAtIIMyOZcqnDWGh84gZVsIi3Zdk\nWSNLWkcyZZ15koDHJwMj6uR74hNRpcrvNO7YA/V2OYJn7gDVPaLDBqmGhR+y7rZG\nks46jtfVXE9kXcRgurPLNQFnp1QbfgUFStGuWf4KEQKBgQC1MDPgjzjchGao/1Nm\nGlDIQ4BbCSCz13KcAHIJ0SSffcQY9WQ+XYOj2sR

In [112]:
from typing import Any


a = ["1", "1 2 3", "2", "3"]


def flatten_list(a: List[Any]) -> List[Any]: # type: ignore
    """
    Flatten a list of lists
    # input: [1,2,[1,4],2]
    # output: [1,2,1,4,2]
    """
    ret = []
    for item in a:
        if isinstance(item, list):
            ret.extend(flatten_list(item))
        else:
            ret.extend(item.split())
    return ret

In [113]:
flatten_list(a)

['1', '1', '2', '3', '2', '3']