Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add type hints for KMS snippets #9979

Merged
merged 13 commits into from
May 19, 2023
17 changes: 11 additions & 6 deletions kms/snippets/check_state_import_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@


# [START kms_check_state_import_job]
def check_state_import_job(project_id, location_id, key_ring_id, import_job_id):
from google.cloud import kms


def check_state_import_job(
project_id: str, location_id: str, key_ring_id: str, import_job_id: str
) -> None:
"""
Check the state of an import job in Cloud KMS.

Expand All @@ -24,18 +29,18 @@ def check_state_import_job(project_id, location_id, key_ring_id, import_job_id):
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified import_job string.
import_job_name = client.import_job_path(
project_id, location_id, key_ring_id, import_job_id)
project_id, location_id, key_ring_id, import_job_id
)

# Retrieve the state from an existing import job.
import_job = client.get_import_job(name=import_job_name)

print(f'Current state of import job {import_job.name}: {import_job.state}')
print(f"Current state of import job {import_job.name}: {import_job.state}")


# [END kms_check_state_import_job]
17 changes: 11 additions & 6 deletions kms/snippets/check_state_imported_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@


# [START kms_check_state_imported_key]
def check_state_imported_key(project_id, location_id, key_ring_id, import_job_id):
from google.cloud import kms


def check_state_imported_key(
project_id: str, location_id: str, key_ring_id: str, import_job_id: str
) -> None:
"""
Check the state of an import job in Cloud KMS.

Expand All @@ -24,18 +29,18 @@ def check_state_imported_key(project_id, location_id, key_ring_id, import_job_id
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Retrieve the fully-qualified import_job string.
import_job_name = client.import_job_path(
project_id, location_id, key_ring_id, import_job_id)
project_id, location_id, key_ring_id, import_job_id
)

# Retrieve the state from an existing import job.
import_job = client.get_import_job(name=import_job_name)

print(f'Current state of import job {import_job.name}: {import_job.state}')
print(f"Current state of import job {import_job.name}: {import_job.state}")


# [END kms_check_state_imported_key]
27 changes: 20 additions & 7 deletions kms/snippets/create_import_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@


# [START kms_create_import_job]
def create_import_job(project_id, location_id, key_ring_id, import_job_id):
from google.cloud import kms


def create_import_job(
project_id: str, location_id: str, key_ring_id: str, import_job_id: str
) -> None:
"""
Create a new import job in Cloud KMS.

Expand All @@ -24,9 +29,6 @@ def create_import_job(project_id, location_id, key_ring_id, import_job_id):
import_job_id (string): ID of the import job (e.g. 'my-import-job').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

Expand All @@ -38,10 +40,21 @@ def create_import_job(project_id, location_id, key_ring_id, import_job_id):

import_method = kms.ImportJob.ImportMethod.RSA_OAEP_3072_SHA1_AES_256
protection_level = kms.ProtectionLevel.HSM
import_job_params = {"import_method": import_method, "protection_level": protection_level}
import_job_params = {
"import_method": import_method,
"protection_level": protection_level,
}

# Call the client to create a new import job.
import_job = client.create_import_job({"parent": key_ring_name, "import_job_id": import_job_id, "import_job": import_job_params})
import_job = client.create_import_job(
{
"parent": key_ring_name,
"import_job_id": import_job_id,
"import_job": import_job_params,
}
)

print(f"Created import job: {import_job.name}")


print(f'Created import job: {import_job.name}')
# [END kms_create_import_job]
39 changes: 24 additions & 15 deletions kms/snippets/create_key_asymmetric_decrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_asymmetric_decrypt]
def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, key_id):
import datetime

# Import the client library.
from google.cloud import kms
glasnt marked this conversation as resolved.
Show resolved Hide resolved
from google.protobuf import duration_pb2 # type: ignore


def create_key_asymmetric_decrypt(
project_id: str, location_id: str, key_ring_id: str, key_id: str
) -> kms.CryptoKey:
"""
Creates a new asymmetric decryption key in Cloud KMS.

Expand All @@ -28,11 +36,6 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, key_id):

"""

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()

Expand All @@ -41,21 +44,27 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, key_id):

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_DECRYPT
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_DECRYPT_OAEP_2048_SHA256
algorithm = (
kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_DECRYPT_OAEP_2048_SHA256
)
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
"purpose": purpose,
"version_template": {
"algorithm": algorithm,
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
"destroy_scheduled_duration": duration_pb2.Duration().FromTimedelta(
datetime.timedelta(days=1)
),
}

# Call the API.
created_key = client.create_crypto_key(
request={'parent': key_ring_name, 'crypto_key_id': key_id, 'crypto_key': key})
print(f'Created asymmetric decrypt key: {created_key.name}')
request={"parent": key_ring_name, "crypto_key_id": key_id, "crypto_key": key}
)
print(f"Created asymmetric decrypt key: {created_key.name}")
return created_key


# [END kms_create_key_asymmetric_decrypt]
40 changes: 25 additions & 15 deletions kms/snippets/create_key_asymmetric_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_asymmetric_sign]
def create_key_asymmetric_sign(project_id, location_id, key_ring_id, key_id):

import datetime

# Import the client library.
from google.cloud import kms
glasnt marked this conversation as resolved.
Show resolved Hide resolved
from google.protobuf import duration_pb2 # type: ignore


def create_key_asymmetric_sign(
project_id: str, location_id: str, key_ring_id: str, key_id: str
) -> kms.CryptoKey:
"""
Creates a new asymmetric signing key in Cloud KMS.

Expand All @@ -28,11 +37,6 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, key_id):

"""

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()

Expand All @@ -41,21 +45,27 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, key_id):

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256
algorithm = (
kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256
)
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
"purpose": purpose,
"version_template": {
"algorithm": algorithm,
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
"destroy_scheduled_duration": duration_pb2.Duration().FromTimedelta(
datetime.timedelta(days=1)
),
}

# Call the API.
created_key = client.create_crypto_key(
request={'parent': key_ring_name, 'crypto_key_id': key_id, 'crypto_key': key})
print(f'Created asymmetric signing key: {created_key.name}')
request={"parent": key_ring_name, "crypto_key_id": key_id, "crypto_key": key}
)
print(f"Created asymmetric signing key: {created_key.name}")
return created_key


# [END kms_create_key_asymmetric_sign]
32 changes: 21 additions & 11 deletions kms/snippets/create_key_for_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@


# [START kms_create_key_for_import]
def create_key_for_import(project_id, location_id, key_ring_id, crypto_key_id):
from google.cloud import kms


def create_key_for_import(
project_id: str, location_id: str, key_ring_id: str, crypto_key_id: str
) -> None:
"""

Sets up an empty CryptoKey within a KeyRing for import.
Expand All @@ -26,9 +31,6 @@ def create_key_for_import(project_id, location_id, key_ring_id, crypto_key_id):
crypto_key_id (string): ID of the key to import (e.g. 'my-asymmetric-signing-key').
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

Expand All @@ -38,17 +40,25 @@ def create_key_for_import(project_id, location_id, key_ring_id, crypto_key_id):
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256
protection_level = kms.ProtectionLevel.HSM
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
'protection_level': protection_level
}
"purpose": purpose,
"version_template": {
"algorithm": algorithm,
"protection_level": protection_level,
},
}

# Build the parent key ring name.
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

# Call the API.
created_key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': crypto_key_id, 'crypto_key': key})
print(f'Created hsm key: {created_key.name}')
created_key = client.create_crypto_key(
request={
"parent": key_ring_name,
"crypto_key_id": crypto_key_id,
"crypto_key": key,
}
)
print(f"Created hsm key: {created_key.name}")


# [END kms_create_key_for_import]
40 changes: 24 additions & 16 deletions kms/snippets/create_key_hsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_hsm]
def create_key_hsm(project_id, location_id, key_ring_id, key_id):
import datetime

from google.cloud import kms
glasnt marked this conversation as resolved.
Show resolved Hide resolved
from google.protobuf import duration_pb2 # type: ignore


def create_key_hsm(
project_id: str, location_id: str, key_ring_id: str, key_id: str
) -> kms.CryptoKey:
"""
Creates a new key in Cloud KMS backed by Cloud HSM.

Expand All @@ -28,11 +35,6 @@ def create_key_hsm(project_id, location_id, key_ring_id, key_id):

"""

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()

Expand All @@ -41,23 +43,29 @@ def create_key_hsm(project_id, location_id, key_ring_id, key_id):

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.GOOGLE_SYMMETRIC_ENCRYPTION
algorithm = (
kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.GOOGLE_SYMMETRIC_ENCRYPTION
)
protection_level = kms.ProtectionLevel.HSM
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
'protection_level': protection_level
"purpose": purpose,
"version_template": {
"algorithm": algorithm,
"protection_level": protection_level,
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
"destroy_scheduled_duration": duration_pb2.Duration().FromTimedelta(
datetime.timedelta(days=1)
),
}

# Call the API.
created_key = client.create_crypto_key(
request={'parent': key_ring_name, 'crypto_key_id': key_id, 'crypto_key': key})
print(f'Created hsm key: {created_key.name}')
request={"parent": key_ring_name, "crypto_key_id": key_id, "crypto_key": key}
)
print(f"Created hsm key: {created_key.name}")
return created_key


# [END kms_create_key_hsm]
Loading