Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick Test Fixes #39

Merged
merged 10 commits into from
Aug 13, 2021
169 changes: 91 additions & 78 deletions src/iks/iks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def print_help():


def handle_args(args):

if args.help:
print_help()
sys.exit(1)

UserInfo = IntegrationInfo()
UserInfo.terraforming = False
if args.terraform:
Expand All @@ -69,15 +69,14 @@ def handle_args(args):

if args.delete:
UserInfo.delete = True

# determining API key
UserInfo.cis_api_key = getpass.getpass(
prompt="Enter CIS Services API Key: ")
os.environ["CIS_SERVICES_APIKEY"] = UserInfo.cis_api_key

# common arguments
UserInfo.request_token()


UserInfo.iks_cluster_id = args.iks_cluster_id
if UserInfo.iks_cluster_id is None:
Expand All @@ -89,14 +88,14 @@ def handle_args(args):
print("You did not specify a resource group.")
sys.exit(1)
UserInfo.get_resource_id()

iks_info = UserInfo.get_iks_info()

UserInfo.cis_domain = args.cis_domain
if UserInfo.cis_domain is None:
print("You did not specify a CIS Domain.")
sys.exit(1)

# terraforming vs. not terraforming
if UserInfo.terraforming and not UserInfo.delete:
UserInfo.cis_name = args.name
Expand All @@ -105,49 +104,52 @@ def handle_args(args):
sys.exit(1)

if not UserInfo.get_crn_and_zone():
print("Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
print(
"Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
sys.exit(1)

UserInfo.namespace = args.namespace
if UserInfo.namespace is None:
if UserInfo.namespace is None:
print("You did not specify a namespace for IKS cluster.")
sys.exit(1)

UserInfo.service_name = args.service_name
if UserInfo.service_name is None:
if UserInfo.service_name is None:
print("You did not specify a service name from the IKS cluster.")
sys.exit(1)

UserInfo.service_port = args.service_port
if UserInfo.service_port is None:
print("You did not specify the target port of the service from the IKS cluster.")
if UserInfo.service_port is None:
print(
"You did not specify the target port of the service from the IKS cluster.")
sys.exit(1)

UserInfo.vpc_name = args.vpc_name
if UserInfo.vpc_name is None:
print("You did not specify a VPC instance name.")
sys.exit(1)

elif not UserInfo.delete:
#vpc name
# vpc name
UserInfo.vpc_name = args.vpc_name
if UserInfo.vpc_name is None:
print("You did not specify a VPC instance name.")
sys.exit(1)

UserInfo.namespace = args.namespace
if UserInfo.namespace is None:
if UserInfo.namespace is None:
print("You did not specify a namespace for IKS cluster.")
sys.exit(1)

UserInfo.service_name = args.service_name
if UserInfo.service_name is None:
if UserInfo.service_name is None:
print("You did not specify a service name from the IKS cluster.")
sys.exit(1)

UserInfo.service_port = args.service_port
if UserInfo.service_port is None:
print("You did not specify the target port of the service from the IKS cluster.")
if UserInfo.service_port is None:
print(
"You did not specify the target port of the service from the IKS cluster.")
sys.exit(1)

UserInfo.get_resource_id()
Expand All @@ -158,13 +160,15 @@ def handle_args(args):
UserInfo.cis_name = args.name

if UserInfo.cis_name is None:
print("Please specify the name of your CIS instance or both the CIS CRN and CIS Zone ID")
print(
"Please specify the name of your CIS instance or both the CIS CRN and CIS Zone ID")
sys.exit(1)

if not UserInfo.get_crn_and_zone():
print("Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
print(
"Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
sys.exit(1)

elif UserInfo.delete:
UserInfo.resource_group = args.resource_group
if UserInfo.resource_group is None:
Expand All @@ -179,87 +183,97 @@ def handle_args(args):
UserInfo.cis_name = args.name

if UserInfo.cis_name is None:
print("Please specify the name of your CIS instance or both the CIS CRN and CIS Zone ID")
print(
"Please specify the name of your CIS instance or both the CIS CRN and CIS Zone ID")
sys.exit(1)

if not UserInfo.get_crn_and_zone():
print("Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
print(
"Failed to retrieve CRN and Zone ID. Check the name of your CIS instance and try again")
sys.exit(1)

return UserInfo


def iks(args):
delete_dns = None
delete_workspaces = None
work_creator = None
delete_workspaces = None
work_creator = None
user_ingress = None

UserInfo = handle_args(args)
if UserInfo.delete and not UserInfo.terraforming:

delete_dns = DeleteDNS(UserInfo.crn, UserInfo.zone_id, UserInfo.api_endpoint, UserInfo.cis_domain)

delete_dns = DeleteDNS(
UserInfo.crn, UserInfo.zone_id, UserInfo.api_endpoint, UserInfo.cis_domain)
delete_dns.delete_dns()


UserInfo.get_id_token()
delete_ingress = DeleteIngress(UserInfo.namespace,UserInfo.id_token,UserInfo.iks_master_url)
delete_ingress = DeleteIngress(
UserInfo.namespace, UserInfo.id_token, UserInfo.iks_master_url)
delete_ingress.delete_ingress()

delete_certs = DeleteCerts(
UserInfo.crn, UserInfo.zone_id, UserInfo.api_endpoint, UserInfo.cis_domain)
delete_certs.delete_certs()

print("If you created a certificate in the certificate manager and imported it as a secret to your IKS cluster, you may delete them now.")
secret = input("Delete certificate and secret? Input 'y' or 'yes' to execute:").lower()
secret = input(
"Delete certificate and secret? Input 'y' or 'yes' to execute:").lower()
if secret == 'y' or secret == 'yes':
UserInfo.cert_name="cis-cert"
UserInfo.cert_name = "cis-cert"

cms_id = UserInfo.get_cms()

delete_secret = DeleteSecretCMS(UserInfo.iks_cluster_id, UserInfo.cis_domain, cms_id, UserInfo.cert_name, UserInfo.token['access_token'])
delete_secret = DeleteSecretCMS(
UserInfo.iks_cluster_id, UserInfo.cis_domain, cms_id, UserInfo.cert_name, UserInfo.token['access_token'])
delete_secret.delete_cms_cert()
delete_secret.delete_secret()
elif UserInfo.delete and UserInfo.terraforming:
print("If you created a certificate in the certificate manager and imported it as a secret to your IKS cluster, you may delete them now.")
secret = input("Delete certificate and secret? Input 'y' or 'yes' to execute:").lower()
secret = input(
"Delete certificate and secret? Input 'y' or 'yes' to execute:").lower()
if secret == 'y' or secret == 'yes':
UserInfo.cert_name="cis-cert"
UserInfo.cert_name = "cis-cert"

cms_id = UserInfo.get_cms()

delete_secret = DeleteSecretCMS(UserInfo.iks_cluster_id, UserInfo.cis_domain, cms_id, UserInfo.cert_name, UserInfo.token['access_token'])

delete_secret = DeleteSecretCMS(
UserInfo.iks_cluster_id, UserInfo.cis_domain, cms_id, UserInfo.cert_name, UserInfo.token['access_token'])
delete_secret.delete_secret()

delete_workspaces = DeleteWorkspace(UserInfo.crn, UserInfo.zone_id,
UserInfo.cis_domain, UserInfo.api_endpoint,
UserInfo.schematics_url, UserInfo.cis_api_key, UserInfo.token, ce=False, iks=True)
UserInfo.cis_domain, UserInfo.api_endpoint,
UserInfo.schematics_url, UserInfo.cis_api_key, UserInfo.token, ce=False, iks=True)
delete_workspaces.delete_workspace()
elif UserInfo.terraforming: # handle the case of using terraform
elif UserInfo.terraforming: # handle the case of using terraform
print("Currently using the default secret in IKS, but a new TLS certificate can be ordered and imported as a secret if you wish.")
execute = input("Would you like to create a new secret? Input 'y' or 'yes' to execute:").lower()
execute = input(
"Would you like to create a new secret? Input 'y' or 'yes' to execute:").lower()
if execute == 'y' or execute == 'yes':
UserInfo.cert_name = 'cis-cert'
else:
secret = UserInfo.app_url.split('.')
UserInfo.cert_name = secret[0]

resource_group_id = UserInfo.get_resource_id()
user_ACL = AclRuleCreator(resource_group_id, UserInfo.vpc_name, UserInfo.cis_api_key)
user_ACL = AclRuleCreator(
resource_group_id, UserInfo.vpc_name, UserInfo.cis_api_key)
user_ACL.check_network_acl()
UserInfo.secret_name=UserInfo.cert_name

UserInfo.secret_name = UserInfo.cert_name
user_ingress = IngressCreator(
clusterNameOrID=UserInfo.iks_cluster_id,
resourceGroupID=UserInfo.resource_id,
namespace=UserInfo.namespace,
secretName=UserInfo.secret_name,
serviceName=UserInfo.service_name,
servicePort=UserInfo.service_port,
accessToken=UserInfo.token["access_token"],
resourceGroupID=UserInfo.resource_id,
namespace=UserInfo.namespace,
secretName=UserInfo.secret_name,
serviceName=UserInfo.service_name,
servicePort=UserInfo.service_port,
accessToken=UserInfo.token["access_token"],
refreshToken=UserInfo.token["refresh_token"],
ingressSubdomain=UserInfo.app_url,
iks_master_url=UserInfo.iks_master_url
iks_master_url=UserInfo.iks_master_url,
idToken=UserInfo.id_token
)
user_ingress.create_ingress()

Expand All @@ -271,30 +285,33 @@ def iks(args):
UserInfo.verbose, UserInfo.token)
work_creator.create_terraform_workspace()
else:

# handle the case of using python
# 1. Domain Name and DNS

user_DNS = DNSCreator(UserInfo.crn, UserInfo.zone_id,
UserInfo.api_endpoint, UserInfo.app_url, token=UserInfo.token["access_token"])

user_DNS.create_records()

# 2. Order Edge Certificate from CIS
user_edge_cert = CertificateCreator(UserInfo.crn, UserInfo.zone_id, UserInfo.api_endpoint, UserInfo.cis_domain)
user_edge_cert = CertificateCreator(
UserInfo.crn, UserInfo.zone_id, UserInfo.api_endpoint, UserInfo.cis_domain)
user_edge_cert.create_certificate()

# 3. Check ACL Rules
resource_group_id = UserInfo.get_resource_id()
user_ACL = AclRuleCreator(resource_group_id, UserInfo.vpc_name, UserInfo.cis_api_key)
user_ACL = AclRuleCreator(
resource_group_id, UserInfo.vpc_name, UserInfo.cis_api_key)
user_ACL.check_network_acl()

# 4. Generate certificate in manager if necessary
print("Currently using the default secret in IKS, but a new TLS certificate can be ordered and imported as a secret if you wish.")
execute = input("Would you like to create a new secret? Input 'y' or 'yes' to execute:").lower()
execute = input(
"Would you like to create a new secret? Input 'y' or 'yes' to execute:").lower()
if execute == 'y' or execute == 'yes':
UserInfo.cert_name="cis-cert"
UserInfo.cert_name = "cis-cert"

cms_id = UserInfo.get_cms()
# print("\n"+cms_id)
user_cert = SecretCertificateCreator(
Expand All @@ -305,36 +322,32 @@ def iks(args):

token=UserInfo.token["access_token"],
cert_name=UserInfo.cert_name
)
)
user_cert.create_secret()
else:
secret = UserInfo.app_url.split('.')
UserInfo.cert_name = secret[0]

# 5. generate ingress

UserInfo.get_id_token()
UserInfo.secret_name=UserInfo.cert_name
UserInfo.secret_name = UserInfo.cert_name
user_ingress = IngressCreator(
clusterNameOrID=UserInfo.iks_cluster_id,
resourceGroupID=UserInfo.resource_id,
namespace=UserInfo.namespace,
secretName=UserInfo.secret_name,
serviceName=UserInfo.service_name,
servicePort=UserInfo.service_port,
accessToken=UserInfo.token["access_token"],
resourceGroupID=UserInfo.resource_id,
namespace=UserInfo.namespace,
secretName=UserInfo.secret_name,
serviceName=UserInfo.service_name,
servicePort=UserInfo.service_port,
accessToken=UserInfo.token["access_token"],
refreshToken=UserInfo.token["refresh_token"],
ingressSubdomain=UserInfo.app_url,
iks_master_url=UserInfo.iks_master_url,
idToken=UserInfo.id_token
)

user_ingress.create_ingress()





if not UserInfo.delete:
hostUrl = "https://"+UserInfo.cis_domain

Expand Down
8 changes: 0 additions & 8 deletions test_var.env

This file was deleted.