|
11 | 11 |
|
12 | 12 | class MkcertManager: |
13 | 13 | def __init__(self) -> None: |
14 | | - self.mkcert_bin: str | Path | None = None |
| 14 | + self.mkcert_bin: str | None = None |
15 | 15 |
|
16 | | - def setup_mkcert(self, install_path: Path) -> None: |
| 16 | + def setup_mkcert( |
| 17 | + self, install_path: Path, *, force_reinstall: bool = False |
| 18 | + ) -> None: |
17 | 19 | """Set up mkcert by checking if it's installed or downloading the binary and installing the local CA.""" |
18 | 20 | if mkcert_path := shutil.which("mkcert"): |
19 | | - # mkcert is already installed somewhere |
20 | 21 | self.mkcert_bin = mkcert_path |
21 | | - else: |
22 | | - self.mkcert_bin = install_path / "mkcert" |
23 | | - install_path.mkdir(parents=True, exist_ok=True) |
24 | | - if not self.mkcert_bin.exists(): |
25 | | - system = platform.system() |
26 | | - arch = platform.machine() |
27 | | - |
28 | | - # Map platform.machine() to mkcert's expected architecture strings |
29 | | - arch_map = { |
30 | | - "x86_64": "amd64", |
31 | | - "amd64": "amd64", |
32 | | - "AMD64": "amd64", |
33 | | - "arm64": "arm64", |
34 | | - "aarch64": "arm64", |
35 | | - } |
36 | | - arch = arch_map.get( |
37 | | - arch.lower(), "amd64" |
38 | | - ) # Default to amd64 if unknown |
39 | | - |
40 | | - if system == "Darwin": |
41 | | - os_name = "darwin" |
42 | | - elif system == "Linux": |
43 | | - os_name = "linux" |
44 | | - elif system == "Windows": |
45 | | - os_name = "windows" |
46 | | - else: |
47 | | - click.secho("Unsupported OS", fg="red") |
48 | | - sys.exit(1) |
49 | | - |
50 | | - mkcert_url = f"https://dl.filippo.io/mkcert/latest?for={os_name}/{arch}" |
51 | | - click.secho(f"Downloading mkcert from {mkcert_url}...", bold=True) |
52 | | - urllib.request.urlretrieve(mkcert_url, self.mkcert_bin) |
53 | | - self.mkcert_bin.chmod(0o755) |
54 | | - self.mkcert_bin = str(self.mkcert_bin) # Convert Path object to string |
55 | | - |
56 | | - if not self.is_mkcert_ca_installed(): |
57 | | - click.secho( |
58 | | - "Installing mkcert local CA. You may be prompted for your password.", |
59 | | - bold=True, |
60 | | - ) |
61 | | - subprocess.run([self.mkcert_bin, "-install"], check=True) |
62 | | - |
63 | | - def is_mkcert_ca_installed(self) -> bool: |
64 | | - """Check if mkcert local CA is already installed using mkcert -check.""" |
| 22 | + # Run install if CA files don't exist, or if force reinstall |
| 23 | + if force_reinstall or not self._ca_files_exist(): |
| 24 | + self.install_ca() |
| 25 | + return |
| 26 | + |
| 27 | + # mkcert not found system-wide, download to install_path |
| 28 | + install_path.mkdir(parents=True, exist_ok=True) |
| 29 | + binary_path = install_path / "mkcert" |
| 30 | + |
| 31 | + if force_reinstall and binary_path.exists(): |
| 32 | + click.secho("Removing existing mkcert binary...", bold=True) |
| 33 | + binary_path.unlink() |
| 34 | + |
| 35 | + if not binary_path.exists(): |
| 36 | + self._download_mkcert(binary_path) |
| 37 | + |
| 38 | + self.mkcert_bin = str(binary_path) |
| 39 | + |
| 40 | + # Run install if CA files don't exist, or if force reinstall |
| 41 | + if force_reinstall or not self._ca_files_exist(): |
| 42 | + self.install_ca() |
| 43 | + |
| 44 | + def _download_mkcert(self, dest: Path) -> None: |
| 45 | + """Download the mkcert binary.""" |
| 46 | + system = platform.system() |
| 47 | + machine = platform.machine().lower() |
| 48 | + |
| 49 | + # Map platform.machine() to mkcert's expected architecture strings |
| 50 | + arch_map = { |
| 51 | + "x86_64": "amd64", |
| 52 | + "amd64": "amd64", |
| 53 | + "arm64": "arm64", |
| 54 | + "aarch64": "arm64", |
| 55 | + } |
| 56 | + arch = arch_map.get(machine, "amd64") |
| 57 | + |
| 58 | + os_map = { |
| 59 | + "Darwin": "darwin", |
| 60 | + "Linux": "linux", |
| 61 | + "Windows": "windows", |
| 62 | + } |
| 63 | + os_name = os_map.get(system) |
| 64 | + if not os_name: |
| 65 | + click.secho(f"Unsupported OS: {system}", fg="red") |
| 66 | + sys.exit(1) |
| 67 | + |
| 68 | + mkcert_url = f"https://dl.filippo.io/mkcert/latest?for={os_name}/{arch}" |
| 69 | + click.secho(f"Downloading mkcert from {mkcert_url}...", bold=True) |
| 70 | + urllib.request.urlretrieve(mkcert_url, dest) |
| 71 | + dest.chmod(0o755) |
| 72 | + |
| 73 | + def _get_ca_root(self) -> Path | None: |
| 74 | + """Get the mkcert CAROOT directory.""" |
65 | 75 | if not self.mkcert_bin: |
| 76 | + return None |
| 77 | + result = subprocess.run( |
| 78 | + [self.mkcert_bin, "-CAROOT"], |
| 79 | + capture_output=True, |
| 80 | + text=True, |
| 81 | + ) |
| 82 | + if result.returncode == 0: |
| 83 | + return Path(result.stdout.strip()) |
| 84 | + return None |
| 85 | + |
| 86 | + def _ca_files_exist(self) -> bool: |
| 87 | + """Check if the CA root files exist.""" |
| 88 | + ca_root = self._get_ca_root() |
| 89 | + if not ca_root: |
66 | 90 | return False |
67 | | - try: |
68 | | - result = subprocess.run([self.mkcert_bin, "-check"], capture_output=True) |
69 | | - output = result.stdout.decode() + result.stderr.decode() |
70 | | - if "The local CA is not installed" in output: |
71 | | - return False |
72 | | - return True |
73 | | - except Exception as e: |
74 | | - click.secho(f"Error checking mkcert CA installation: {e}", fg="red") |
75 | | - return False |
| 91 | + return (ca_root / "rootCA.pem").exists() and ( |
| 92 | + ca_root / "rootCA-key.pem" |
| 93 | + ).exists() |
| 94 | + |
| 95 | + def install_ca(self) -> None: |
| 96 | + """Install the mkcert CA into the system trust store. |
| 97 | +
|
| 98 | + Running `mkcert -install` is idempotent - if already installed, |
| 99 | + it just prints a message without prompting for a password. |
| 100 | + """ |
| 101 | + if not self.mkcert_bin: |
| 102 | + return |
| 103 | + |
| 104 | + # Don't capture output so user can see messages and respond to password prompts |
| 105 | + result = subprocess.run([self.mkcert_bin, "-install"]) |
| 106 | + |
| 107 | + if result.returncode != 0: |
| 108 | + click.secho("Failed to install mkcert CA", fg="red") |
| 109 | + raise SystemExit(1) |
76 | 110 |
|
77 | | - def generate_certs(self, domain: str, storage_path: Path) -> tuple[Path, Path]: |
| 111 | + def generate_certs( |
| 112 | + self, domain: str, storage_path: Path, *, force_regenerate: bool = False |
| 113 | + ) -> tuple[Path, Path]: |
78 | 114 | cert_path = storage_path / f"{domain}-cert.pem" |
79 | 115 | key_path = storage_path / f"{domain}-key.pem" |
80 | 116 | timestamp_path = storage_path / f"{domain}.timestamp" |
81 | 117 | update_interval = 60 * 24 * 3600 # 60 days in seconds |
82 | 118 |
|
83 | 119 | # Check if the certs exist and if the timestamp is recent enough |
84 | | - if cert_path.exists() and key_path.exists() and timestamp_path.exists(): |
85 | | - last_updated = timestamp_path.stat().st_mtime |
86 | | - if time.time() - last_updated < update_interval: |
87 | | - return cert_path, key_path |
| 120 | + if not force_regenerate: |
| 121 | + if cert_path.exists() and key_path.exists() and timestamp_path.exists(): |
| 122 | + last_updated = timestamp_path.stat().st_mtime |
| 123 | + if time.time() - last_updated < update_interval: |
| 124 | + return cert_path, key_path |
88 | 125 |
|
89 | 126 | storage_path.mkdir(parents=True, exist_ok=True) |
90 | 127 |
|
|
0 commit comments