|
25 | 25 | import socket |
26 | 26 | from datetime import datetime, timedelta |
27 | 27 | from truenas_api_client import Client |
| 28 | +from OpenSSL import crypto |
| 29 | +import re |
28 | 30 |
|
29 | 31 | parser = argparse.ArgumentParser(description='Import and activate a SSL/TLS certificate into TrueNAS.') |
30 | 32 | parser.add_argument('-c', '--config', default=(os.path.join(os.path.dirname(os.path.realpath(__file__)), |
|
40 | 42 | exit(1) |
41 | 43 |
|
42 | 44 | API_KEY = deploy.get('api_key') |
| 45 | +PROTOCOL = deploy.get('protocol', "ws") |
43 | 46 | CONNECT_HOST = deploy.get('connect_host',"localhost") |
44 | | -CONNECT_URI = "ws://" + CONNECT_HOST + "/websocket" |
| 47 | +CONNECT_URI = PROTOCOL + "://" + CONNECT_HOST + "/websocket" |
45 | 48 |
|
46 | 49 | PRIVATEKEY_PATH = deploy.get('privkey_path') |
47 | 50 | if os.path.isfile(PRIVATEKEY_PATH)==False: |
|
67 | 70 | with open(FULLCHAIN_PATH, 'r') as file: |
68 | 71 | full_chain = file.read() |
69 | 72 |
|
| 73 | +def extract_leaf_certificate(fullchain_pem): |
| 74 | + """Extract the first certificate (leaf) from a full chain PEM file.""" |
| 75 | + certs = re.findall(r"-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----", |
| 76 | + fullchain_pem, re.DOTALL) |
| 77 | + if not certs: |
| 78 | + raise ValueError("No valid certificate found in the provided PEM data.") |
| 79 | + return certs[0] # Return the first certificate (leaf) |
| 80 | + |
| 81 | +def validate_cert_key_pair(cert_pem, key_pem): |
| 82 | + """Validate that the certificate's public key matches the private key.""" |
| 83 | + leaf_cert_pem = extract_leaf_certificate(cert_pem) |
| 84 | + |
| 85 | + # Load the extracted leaf certificate and key |
| 86 | + cert_obj = crypto.load_certificate(crypto.FILETYPE_PEM, leaf_cert_pem) |
| 87 | + key_obj = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) |
| 88 | + |
| 89 | + # Verify that the certificate's public key matches the private key |
| 90 | + try: |
| 91 | + return cert_obj.get_pubkey().to_cryptography_key().public_numbers() == \ |
| 92 | + key_obj.to_cryptography_key().public_key().public_numbers() |
| 93 | + except Exception as e: |
| 94 | + print(f"Validation error: {e}") |
| 95 | + return False |
| 96 | + |
| 97 | +if validate_cert_key_pair(full_chain, priv_key): |
| 98 | + print("✅ Certificate and private key match.") |
| 99 | +else: |
| 100 | + print("❌ Certificate and private key do not match.") |
| 101 | + exit(1) |
| 102 | + |
70 | 103 | with Client(CONNECT_URI) as c: |
71 | | - c.call("auth.login_with_api_key", API_KEY) |
| 104 | + result=c.call("auth.login_with_api_key", API_KEY) |
| 105 | + if result==False: |
| 106 | + print("Failed to authenticate!") |
| 107 | + exit(1) |
72 | 108 | # Import the certificate |
73 | 109 | args = {"name": cert_name, "certificate": full_chain, "privatekey": priv_key, "create_type": "CERTIFICATE_CREATE_IMPORTED"} |
74 | 110 | cert = c.call("certificate.create", args, job=True) |
|
0 commit comments