-
Notifications
You must be signed in to change notification settings - Fork 832
/
views.py
90 lines (76 loc) · 3.18 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import json
import random
import socket
from OpenSSL.crypto import *
import aj
from jadi import component
from aj.api.http import url, HttpPlugin
from aj.api.endpoint import endpoint
@component(HttpPlugin)
class Handler(HttpPlugin):
def __init__(self, context):
self.context = context
@url(r'/api/settings/generate-client-certificate')
@endpoint(api=True)
def handle_api_generate_client_certificate(self, http_context):
data = json.loads(http_context.body)
key = PKey()
key.generate_key(TYPE_RSA, 4096)
ca_key = load_privatekey(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
ca_cert = load_certificate(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
cert = X509()
cert.get_subject().countryName = data['c']
cert.get_subject().stateOrProvinceName = data['st']
cert.get_subject().organizationName = data['o']
cert.get_subject().commonName = data['cn']
cert.set_pubkey(key)
cert.set_serial_number(random.getrandbits(8 * 20))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(ca_cert.get_subject())
cert.sign(ca_key, 'sha1')
pkcs = PKCS12()
pkcs.set_certificate(cert)
pkcs.set_privatekey(key)
pkcs.set_friendlyname(str(data['cn']))
return {
'digest': cert.digest('sha1'),
'name': ','.join('%s=%s' % x for x in cert.get_subject().get_components()),
'serial': str(cert.get_serial_number()),
'b64certificate': pkcs.export().encode('base64'),
}
@url(r'/api/settings/generate-server-certificate')
@endpoint(api=True)
def handle_api_generate_server_certificate(self, http_context):
etc_dir = '/etc/ajenti'
certificate_path = '%s/ajenti.pem' % etc_dir
key = PKey()
key.generate_key(TYPE_RSA, 4096)
cert = X509()
cert.get_subject().countryName = 'NA'
cert.get_subject().organizationName = socket.gethostname()
cert.get_subject().commonName = 'ajenti'
cert.set_pubkey(key)
cert.set_serial_number(random.getrandbits(8 * 20))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.sign(key, 'sha1')
with open(certificate_path, 'w') as f:
f.write(dump_privatekey(FILETYPE_PEM, key))
f.write(dump_certificate(FILETYPE_PEM, cert))
return {
'path': certificate_path,
}
@url(r'/api/settings/test-certificate/')
@endpoint(api=True)
def handle_test_certificate(self, http_context):
if http_context.method == 'POST':
data = http_context.json_body()
certificate_path = data['certificate']
try:
certificate = load_certificate(FILETYPE_PEM, open(certificate_path).read())
private_key = load_privatekey(FILETYPE_PEM, open(certificate_path).read())
except Exception as e:
raise EndpointError(None, message=str(e))
return {'path': certificate_path}