airbyte.secrets
Secrets management for PyAirbyte.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Secrets management for PyAirbyte.""" 3 4from __future__ import annotations 5 6from airbyte.secrets import ( 7 base, 8 config, 9 custom, 10 env_vars, 11 google_colab, 12 google_gsm, 13 prompt, 14 util, 15) 16from airbyte.secrets.base import SecretHandle, SecretSourceEnum, SecretString 17from airbyte.secrets.config import disable_secret_source, register_secret_manager 18from airbyte.secrets.custom import CustomSecretManager 19from airbyte.secrets.env_vars import DotenvSecretManager, EnvVarSecretManager 20from airbyte.secrets.google_colab import ColabSecretManager 21from airbyte.secrets.google_gsm import GoogleGSMSecretManager 22from airbyte.secrets.prompt import SecretsPrompt 23from airbyte.secrets.util import get_secret 24 25 26__all__ = [ 27 # Submodules 28 "base", 29 "config", 30 "custom", 31 "env_vars", 32 "google_colab", 33 "google_gsm", 34 "prompt", 35 "util", 36 # Secret Access 37 "get_secret", 38 # Secret Classes 39 "SecretSourceEnum", 40 "SecretString", 41 "SecretHandle", 42 # Secret Managers 43 "SecretManager", 44 "EnvVarSecretManager", 45 "DotenvSecretManager", 46 "ColabSecretManager", 47 "SecretsPrompt", 48 "CustomSecretManager", 49 "GoogleGSMSecretManager", 50 # Registration Functions` 51 "register_secret_manager", 52 "disable_secret_source", 53]
15def get_secret( 16 secret_name: str, 17 /, 18 *, 19 sources: list[SecretManager | SecretSourceEnum] | None = None, 20 allow_prompt: bool = True, 21 **kwargs: dict[str, Any], 22) -> SecretString: 23 """Get a secret from the environment. 24 25 The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` 26 options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum` 27 entries is passed, then the sources will be checked using the provided ordering. 28 29 If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then 30 the user will be prompted to enter the secret if it is not found in any of the other sources. 31 """ 32 if "source" in kwargs: 33 warnings.warn( 34 message="The `source` argument is deprecated. Use the `sources` argument instead.", 35 category=DeprecationWarning, 36 stacklevel=2, 37 ) 38 sources = kwargs.pop("source") # type: ignore [assignment] 39 40 available_sources: dict[str, SecretManager] = {} 41 for available_source in _get_secret_sources(): 42 # Add available sources to the dict. Order matters. 43 available_sources[available_source.name] = available_source 44 45 if sources is None: 46 # If ANY is in the list, then we don't need to check any other sources. 47 # This is the default behavior. 48 sources = list(available_sources.values()) 49 50 elif not isinstance(sources, list): 51 sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. 52 53 # Replace any SecretSourceEnum strings with the matching SecretManager object 54 for source in sources: 55 if isinstance(source, SecretSourceEnum): 56 if source not in available_sources: 57 raise exc.PyAirbyteInputError( 58 guidance="Invalid secret source name.", 59 input_value=source, 60 context={ 61 "Available Sources": list(available_sources.keys()), 62 }, 63 ) 64 65 sources[sources.index(source)] = available_sources[source] 66 67 secret_managers = cast(list[SecretManager], sources) 68 69 if SecretSourceEnum.PROMPT in secret_managers: 70 prompt_source = secret_managers.pop( 71 # Mis-typed, but okay here since we have equality logic for the enum comparison: 72 secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] 73 ) 74 75 if allow_prompt: 76 # Always check prompt last. Add it to the end of the list. 77 secret_managers.append(prompt_source) 78 79 for secret_mgr in secret_managers: 80 val = secret_mgr.get_secret(secret_name) 81 if val: 82 return SecretString(val) 83 84 raise exc.PyAirbyteSecretNotFoundError( 85 secret_name=secret_name, 86 sources=[str(s) for s in available_sources], 87 )
Get a secret from the environment.
The optional sources
argument of enum type SecretSourceEnum
or list of SecretSourceEnum
options. If left blank, all available sources will be checked. If a list of SecretSourceEnum
entries is passed, then the sources will be checked using the provided ordering.
If allow_prompt
is True
or if SecretSourceEnum.PROMPT is declared in the source
arg, then
the user will be prompted to enter the secret if it is not found in any of the other sources.
15class SecretSourceEnum(str, Enum): 16 ENV = "env" 17 DOTENV = "dotenv" 18 GOOGLE_COLAB = "google_colab" 19 GOOGLE_GSM = "google_gsm" # Not enabled by default 20 21 PROMPT = "prompt"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
24class SecretString(str): 25 """A string that represents a secret. 26 27 This class is used to mark a string as a secret. When a secret is printed, it 28 will be masked to prevent accidental exposure of sensitive information. 29 """ 30 31 __slots__ = () 32 33 def __repr__(self) -> str: 34 return "<SecretString: ****>" 35 36 def is_empty(self) -> bool: 37 """Check if the secret is an empty string.""" 38 return len(self) == 0 39 40 def is_json(self) -> bool: 41 """Check if the secret string is a valid JSON string.""" 42 try: 43 json.loads(self) 44 except (json.JSONDecodeError, Exception): 45 return False 46 47 return True 48 49 def __bool__(self) -> bool: 50 """Override the boolean value of the secret string. 51 52 Always returns `True` without inspecting contents.""" 53 return True 54 55 def parse_json(self) -> dict: 56 """Parse the secret string as JSON.""" 57 try: 58 return json.loads(self) 59 except json.JSONDecodeError as ex: 60 raise exc.PyAirbyteInputError( 61 message="Failed to parse secret as JSON.", 62 context={ 63 "Message": ex.msg, 64 "Position": ex.pos, 65 "SecretString_Length": len(self), # Debug secret blank or an unexpected format. 66 }, 67 ) from None
A string that represents a secret.
This class is used to mark a string as a secret. When a secret is printed, it will be masked to prevent accidental exposure of sensitive information.
36 def is_empty(self) -> bool: 37 """Check if the secret is an empty string.""" 38 return len(self) == 0
Check if the secret is an empty string.
40 def is_json(self) -> bool: 41 """Check if the secret string is a valid JSON string.""" 42 try: 43 json.loads(self) 44 except (json.JSONDecodeError, Exception): 45 return False 46 47 return True
Check if the secret string is a valid JSON string.
55 def parse_json(self) -> dict: 56 """Parse the secret string as JSON.""" 57 try: 58 return json.loads(self) 59 except json.JSONDecodeError as ex: 60 raise exc.PyAirbyteInputError( 61 message="Failed to parse secret as JSON.", 62 context={ 63 "Message": ex.msg, 64 "Position": ex.pos, 65 "SecretString_Length": len(self), # Debug secret blank or an unexpected format. 66 }, 67 ) from None
Parse the secret string as JSON.
Inherited Members
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
126class SecretHandle: 127 """A handle for a secret in a secret manager. 128 129 This class is used to store a reference to a secret in a secret manager. 130 The secret is not retrieved until the `get_value()` method is called on the handle. 131 """ 132 133 def __init__( 134 self, 135 parent: SecretManager, 136 secret_name: str, 137 ) -> None: 138 """Instantiate a new secret handle.""" 139 self.parent = parent 140 self.secret_name = secret_name 141 142 def get_value(self) -> SecretString: 143 """Get the secret from the secret manager. 144 145 Subclasses can optionally override this method to provide a more optimized code path. 146 """ 147 return cast(SecretString, self.parent.get_secret(self.secret_name))
A handle for a secret in a secret manager.
This class is used to store a reference to a secret in a secret manager.
The secret is not retrieved until the get_value()
method is called on the handle.
133 def __init__( 134 self, 135 parent: SecretManager, 136 secret_name: str, 137 ) -> None: 138 """Instantiate a new secret handle.""" 139 self.parent = parent 140 self.secret_name = secret_name
Instantiate a new secret handle.
142 def get_value(self) -> SecretString: 143 """Get the secret from the secret manager. 144 145 Subclasses can optionally override this method to provide a more optimized code path. 146 """ 147 return cast(SecretString, self.parent.get_secret(self.secret_name))
Get the secret from the secret manager.
Subclasses can optionally override this method to provide a more optimized code path.
14class EnvVarSecretManager(SecretManager): 15 """Secret manager that retrieves secrets from environment variables.""" 16 17 name = SecretSourceEnum.ENV.value 18 19 def get_secret(self, secret_name: str) -> SecretString | None: 20 """Get a named secret from the environment.""" 21 if secret_name not in os.environ: 22 return None 23 24 return SecretString(os.environ[secret_name])
Secret manager that retrieves secrets from environment variables.
19 def get_secret(self, secret_name: str) -> SecretString | None: 20 """Get a named secret from the environment.""" 21 if secret_name not in os.environ: 22 return None 23 24 return SecretString(os.environ[secret_name])
Get a named secret from the environment.
Inherited Members
27class DotenvSecretManager(SecretManager): 28 """Secret manager that retrieves secrets from a `.env` file.""" 29 30 name = SecretSourceEnum.DOTENV.value 31 32 def get_secret(self, secret_name: str) -> SecretString | None: 33 """Get a named secret from the `.env` file.""" 34 try: 35 dotenv_vars: dict[str, str | None] = dotenv_values() 36 except Exception: 37 # Can't locate or parse a .env file 38 return None 39 40 if secret_name not in dotenv_vars: 41 # Secret not found 42 return None 43 44 return SecretString(dotenv_vars[secret_name])
Secret manager that retrieves secrets from a .env
file.
32 def get_secret(self, secret_name: str) -> SecretString | None: 33 """Get a named secret from the `.env` file.""" 34 try: 35 dotenv_vars: dict[str, str | None] = dotenv_values() 36 except Exception: 37 # Can't locate or parse a .env file 38 return None 39 40 if secret_name not in dotenv_vars: 41 # Secret not found 42 return None 43 44 return SecretString(dotenv_vars[secret_name])
Get a named secret from the .env
file.
Inherited Members
10class ColabSecretManager(SecretManager): 11 """Secret manager that retrieves secrets from Google Colab user secrets.""" 12 13 name = SecretSourceEnum.GOOGLE_COLAB.value 14 15 def __init__(self) -> None: 16 try: 17 from google.colab import ( # pyright: ignore[reportMissingImports] 18 userdata as colab_userdata, 19 ) 20 21 self.colab_userdata = colab_userdata 22 except ImportError: 23 self.colab_userdata = None 24 25 super().__init__() 26 27 def get_secret(self, secret_name: str) -> SecretString | None: 28 """Get a named secret from Google Colab user secrets.""" 29 if self.colab_userdata is None: 30 # The module doesn't exist. We probably aren't in Colab. 31 return None 32 33 try: 34 return SecretString(self.colab_userdata.get(secret_name)) 35 except Exception: 36 # Secret name not found. Continue. 37 return None
Secret manager that retrieves secrets from Google Colab user secrets.
15 def __init__(self) -> None: 16 try: 17 from google.colab import ( # pyright: ignore[reportMissingImports] 18 userdata as colab_userdata, 19 ) 20 21 self.colab_userdata = colab_userdata 22 except ImportError: 23 self.colab_userdata = None 24 25 super().__init__()
Instantiate the new secret manager.
27 def get_secret(self, secret_name: str) -> SecretString | None: 28 """Get a named secret from Google Colab user secrets.""" 29 if self.colab_userdata is None: 30 # The module doesn't exist. We probably aren't in Colab. 31 return None 32 33 try: 34 return SecretString(self.colab_userdata.get(secret_name)) 35 except Exception: 36 # Secret name not found. Continue. 37 return None
Get a named secret from Google Colab user secrets.
Inherited Members
13class SecretsPrompt(SecretManager): 14 """Secret manager that prompts the user to enter a secret.""" 15 16 name = SecretSourceEnum.PROMPT.value 17 18 def get_secret( 19 self, 20 secret_name: str, 21 ) -> SecretString | None: 22 with contextlib.suppress(Exception): 23 return SecretString(getpass(f"Enter the value for secret '{secret_name}': ")) 24 25 return None
Secret manager that prompts the user to enter a secret.
18 def get_secret( 19 self, 20 secret_name: str, 21 ) -> SecretString | None: 22 with contextlib.suppress(Exception): 23 return SecretString(getpass(f"Enter the value for secret '{secret_name}': ")) 24 25 return None
Get a named secret from the secret manager.
This method should be implemented by subclasses to retrieve secrets from
the secret store. If the secret is not found, the method should return None
.
Inherited Members
13class CustomSecretManager(SecretManager, ABC): 14 """Custom secret manager that retrieves secrets from a custom source. 15 16 This class is a convenience class that can be used to create custom secret 17 managers. By default, custom secrets managers are auto-registered during 18 creation. 19 """ 20 21 auto_register = True 22 replace_existing = False 23 as_backup = False 24 25 def __init__(self) -> None: 26 super().__init__() 27 if self.auto_register: 28 self.register() 29 30 def register( 31 self, 32 *, 33 replace_existing: bool | None = None, 34 as_backup: bool | None = None, 35 ) -> None: 36 """Register the secret manager as global secret source. 37 38 This makes the secret manager available to the `get_secret` function and 39 allows it to be used automatically as a source for secrets. 40 41 If `replace_existing` is `True`, the secret manager will replace all existing 42 secrets sources, including the default secret managers such as environment 43 variables, dotenv files, and Google Colab secrets. If `replace_existing` is 44 None or not provided, the default behavior will be used from the `replace_existing` 45 of the class (`False` unless overridden by the subclass). 46 """ 47 if replace_existing is None: 48 replace_existing = self.replace_existing 49 50 if as_backup is None: 51 as_backup = self.as_backup 52 53 if replace_existing: 54 clear_secret_sources() 55 56 register_secret_manager( 57 self, 58 as_backup=as_backup, 59 replace_existing=replace_existing, 60 )
Custom secret manager that retrieves secrets from a custom source.
This class is a convenience class that can be used to create custom secret managers. By default, custom secrets managers are auto-registered during creation.
30 def register( 31 self, 32 *, 33 replace_existing: bool | None = None, 34 as_backup: bool | None = None, 35 ) -> None: 36 """Register the secret manager as global secret source. 37 38 This makes the secret manager available to the `get_secret` function and 39 allows it to be used automatically as a source for secrets. 40 41 If `replace_existing` is `True`, the secret manager will replace all existing 42 secrets sources, including the default secret managers such as environment 43 variables, dotenv files, and Google Colab secrets. If `replace_existing` is 44 None or not provided, the default behavior will be used from the `replace_existing` 45 of the class (`False` unless overridden by the subclass). 46 """ 47 if replace_existing is None: 48 replace_existing = self.replace_existing 49 50 if as_backup is None: 51 as_backup = self.as_backup 52 53 if replace_existing: 54 clear_secret_sources() 55 56 register_secret_manager( 57 self, 58 as_backup=as_backup, 59 replace_existing=replace_existing, 60 )
Register the secret manager as global secret source.
This makes the secret manager available to the get_secret
function and
allows it to be used automatically as a source for secrets.
If replace_existing
is True
, the secret manager will replace all existing
secrets sources, including the default secret managers such as environment
variables, dotenv files, and Google Colab secrets. If replace_existing
is
None or not provided, the default behavior will be used from the replace_existing
of the class (False
unless overridden by the subclass).
Inherited Members
65class GoogleGSMSecretManager(CustomSecretManager): 66 """Secret manager that retrieves secrets from Google Secrets Manager (GSM). 67 68 This class inherits from `CustomSecretManager` and also adds methods 69 that are specific to this implementation: `fetch_secrets()`, 70 `fetch_secrets_by_label()` and `fetch_connector_secrets()`. 71 72 This secret manager is not enabled by default. To use it, you must provide the project ID and 73 the credentials for a service account with the necessary permissions to access the secrets. 74 75 The `fetch_connector_secret()` method assumes a label name of `connector` 76 matches the name of the connector (`source-github`, `destination-snowflake`, etc.) 77 """ 78 79 name = SecretSourceEnum.GOOGLE_GSM.value 80 auto_register = False 81 as_backup = False 82 replace_existing = False 83 84 CONNECTOR_LABEL = "connector" 85 """The label key used to filter secrets by connector name.""" 86 87 def __init__( 88 self, 89 project: str, 90 *, 91 credentials_path: str | None = None, 92 credentials_json: str | SecretString | None = None, 93 auto_register: bool = False, 94 as_backup: bool = False, 95 ) -> None: 96 """Instantiate a new Google GSM secret manager instance. 97 98 You can provide either the path to the credentials file or the JSON contents of the 99 credentials file. If both are provided, a `PyAirbyteInputError` will be raised. 100 """ 101 if credentials_path and credentials_json: 102 raise exc.PyAirbyteInputError( 103 guidance=("You can provide `credentials_path` or `credentials_json` but not both."), 104 ) 105 106 self.project = project 107 108 if credentials_json is not None and not isinstance(credentials_json, SecretString): 109 credentials_json = SecretString(credentials_json) 110 111 if not credentials_json and not credentials_path: 112 if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: 113 credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] 114 115 elif "GCP_GSM_CREDENTIALS" in os.environ: 116 credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"]) 117 118 if credentials_path: 119 credentials_json = SecretString(Path(credentials_path).read_text()) 120 121 if not credentials_json: 122 raise exc.PyAirbyteInputError( 123 guidance=( 124 "No Google Cloud credentials found. You can provide the path to the " 125 "credentials file using the `credentials_path` argument, or provide the JSON " 126 "contents of the credentials file using the `credentials_json` argument." 127 ), 128 ) 129 130 self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info( 131 json.loads(credentials_json) 132 ) 133 134 if auto_register: 135 self.auto_register = auto_register 136 137 if as_backup: 138 self.as_backup = as_backup 139 140 super().__init__() # Handles the registration if needed 141 142 def get_secret(self, secret_name: str) -> SecretString | None: 143 """Get a named secret from Google Colab user secrets.""" 144 return SecretString( 145 self.secret_client.access_secret_version( 146 name=f"projects/{self.project}/secrets/{secret_name}/versions/latest" 147 ).payload.data.decode("UTF-8") 148 ) 149 150 def fetch_secrets( 151 self, 152 *, 153 filter_string: str, 154 ) -> Iterable[SecretHandle]: 155 """List all available secrets in the secret manager. 156 157 Example filter strings: 158 - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'. 159 160 Args: 161 filter_string (str): A filter string to apply to the list of secrets, following the 162 format described in the Google Secret Manager documentation: 163 https://cloud.google.com/secret-manager/docs/filtering 164 165 Returns: 166 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 167 """ 168 gsm_secrets: ListSecretsPager = self.secret_client.list_secrets( 169 secretmanager.ListSecretsRequest( 170 request={ 171 "filter": filter_string, 172 } 173 ) 174 ) 175 176 return [ 177 SecretHandle( 178 parent=self, 179 secret_name=secret.name, 180 ) 181 for secret in gsm_secrets 182 ] 183 184 def fetch_secrets_by_label( 185 self, 186 label_key: str, 187 label_value: str, 188 ) -> Iterable[SecretHandle]: 189 """List all available secrets in the secret manager. 190 191 Args: 192 label_key (str): The key of the label to filter by. 193 label_value (str): The value of the label to filter by. 194 195 Returns: 196 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 197 """ 198 return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}") 199 200 def fetch_connector_secrets( 201 self, 202 connector_name: str, 203 ) -> Iterable[SecretHandle]: 204 """Fetch secrets in the secret manager, using the connector name as a filter for the label. 205 206 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 207 which defaults to 'connector'. 208 209 Args: 210 connector_name (str): The name of the connector to filter by. 211 212 Returns: 213 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 214 """ 215 return self.fetch_secrets_by_label( 216 label_key=self.CONNECTOR_LABEL, 217 label_value=connector_name, 218 )
Secret manager that retrieves secrets from Google Secrets Manager (GSM).
This class inherits from CustomSecretManager
and also adds methods
that are specific to this implementation: fetch_secrets()
,
fetch_secrets_by_label()
and fetch_connector_secrets()
.
This secret manager is not enabled by default. To use it, you must provide the project ID and the credentials for a service account with the necessary permissions to access the secrets.
The fetch_connector_secret()
method assumes a label name of connector
matches the name of the connector (source-github
, destination-snowflake
, etc.)
87 def __init__( 88 self, 89 project: str, 90 *, 91 credentials_path: str | None = None, 92 credentials_json: str | SecretString | None = None, 93 auto_register: bool = False, 94 as_backup: bool = False, 95 ) -> None: 96 """Instantiate a new Google GSM secret manager instance. 97 98 You can provide either the path to the credentials file or the JSON contents of the 99 credentials file. If both are provided, a `PyAirbyteInputError` will be raised. 100 """ 101 if credentials_path and credentials_json: 102 raise exc.PyAirbyteInputError( 103 guidance=("You can provide `credentials_path` or `credentials_json` but not both."), 104 ) 105 106 self.project = project 107 108 if credentials_json is not None and not isinstance(credentials_json, SecretString): 109 credentials_json = SecretString(credentials_json) 110 111 if not credentials_json and not credentials_path: 112 if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: 113 credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] 114 115 elif "GCP_GSM_CREDENTIALS" in os.environ: 116 credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"]) 117 118 if credentials_path: 119 credentials_json = SecretString(Path(credentials_path).read_text()) 120 121 if not credentials_json: 122 raise exc.PyAirbyteInputError( 123 guidance=( 124 "No Google Cloud credentials found. You can provide the path to the " 125 "credentials file using the `credentials_path` argument, or provide the JSON " 126 "contents of the credentials file using the `credentials_json` argument." 127 ), 128 ) 129 130 self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info( 131 json.loads(credentials_json) 132 ) 133 134 if auto_register: 135 self.auto_register = auto_register 136 137 if as_backup: 138 self.as_backup = as_backup 139 140 super().__init__() # Handles the registration if needed
Instantiate a new Google GSM secret manager instance.
You can provide either the path to the credentials file or the JSON contents of the
credentials file. If both are provided, a PyAirbyteInputError
will be raised.
142 def get_secret(self, secret_name: str) -> SecretString | None: 143 """Get a named secret from Google Colab user secrets.""" 144 return SecretString( 145 self.secret_client.access_secret_version( 146 name=f"projects/{self.project}/secrets/{secret_name}/versions/latest" 147 ).payload.data.decode("UTF-8") 148 )
Get a named secret from Google Colab user secrets.
150 def fetch_secrets( 151 self, 152 *, 153 filter_string: str, 154 ) -> Iterable[SecretHandle]: 155 """List all available secrets in the secret manager. 156 157 Example filter strings: 158 - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'. 159 160 Args: 161 filter_string (str): A filter string to apply to the list of secrets, following the 162 format described in the Google Secret Manager documentation: 163 https://cloud.google.com/secret-manager/docs/filtering 164 165 Returns: 166 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 167 """ 168 gsm_secrets: ListSecretsPager = self.secret_client.list_secrets( 169 secretmanager.ListSecretsRequest( 170 request={ 171 "filter": filter_string, 172 } 173 ) 174 ) 175 176 return [ 177 SecretHandle( 178 parent=self, 179 secret_name=secret.name, 180 ) 181 for secret in gsm_secrets 182 ]
List all available secrets in the secret manager.
Example filter strings:
labels.connector=source-bigquery
: Filter for secrets with the labe 'source-bigquery'.
Arguments:
- filter_string (str): A filter string to apply to the list of secrets, following the format described in the Google Secret Manager documentation: https://cloud.google.com/secret-manager/docs/filtering
Returns:
Iterable[SecretHandle]: An iterable of
SecretHandle
objects for the matching secrets.
184 def fetch_secrets_by_label( 185 self, 186 label_key: str, 187 label_value: str, 188 ) -> Iterable[SecretHandle]: 189 """List all available secrets in the secret manager. 190 191 Args: 192 label_key (str): The key of the label to filter by. 193 label_value (str): The value of the label to filter by. 194 195 Returns: 196 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 197 """ 198 return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}")
List all available secrets in the secret manager.
Arguments:
- label_key (str): The key of the label to filter by.
- label_value (str): The value of the label to filter by.
Returns:
Iterable[SecretHandle]: An iterable of
SecretHandle
objects for the matching secrets.
200 def fetch_connector_secrets( 201 self, 202 connector_name: str, 203 ) -> Iterable[SecretHandle]: 204 """Fetch secrets in the secret manager, using the connector name as a filter for the label. 205 206 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 207 which defaults to 'connector'. 208 209 Args: 210 connector_name (str): The name of the connector to filter by. 211 212 Returns: 213 Iterable[SecretHandle]: An iterable of `SecretHandle` objects for the matching secrets. 214 """ 215 return self.fetch_secrets_by_label( 216 label_key=self.CONNECTOR_LABEL, 217 label_value=connector_name, 218 )
Fetch secrets in the secret manager, using the connector name as a filter for the label.
The label key used to filter the secrets is defined by the CONNECTOR_LABEL
attribute,
which defaults to 'connector'.
Arguments:
- connector_name (str): The name of the connector to filter by.
Returns:
Iterable[SecretHandle]: An iterable of
SecretHandle
objects for the matching secrets.
Inherited Members
47def register_secret_manager( 48 secret_manager: CustomSecretManager, 49 *, 50 as_backup: bool = False, 51 replace_existing: bool = False, 52) -> None: 53 """Register a custom secret manager.""" 54 if replace_existing: 55 clear_secret_sources() 56 57 if as_backup: 58 # Add to end of list 59 _SECRETS_SOURCES.append(secret_manager) 60 else: 61 # Add to beginning of list 62 _SECRETS_SOURCES.insert(0, secret_manager)
Register a custom secret manager.
70def disable_secret_source(source: SecretManager | SecretSourceEnum) -> None: 71 """Disable one of the default secrets sources. 72 73 This function can accept either a `SecretManager` instance, a `SecretSourceEnum` enum value, or 74 a string representing the name of the source to disable. 75 """ 76 if isinstance(source, SecretManager) and source in _SECRETS_SOURCES: 77 _SECRETS_SOURCES.remove(source) 78 return 79 80 # Else, remove by name 81 for s in _SECRETS_SOURCES: 82 if s.name == str(source): 83 _SECRETS_SOURCES.remove(s)
Disable one of the default secrets sources.
This function can accept either a SecretManager
instance, a SecretSourceEnum
enum value, or
a string representing the name of the source to disable.