-
Notifications
You must be signed in to change notification settings - Fork 0
/
hotp.py
120 lines (102 loc) · 4.21 KB
/
hotp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""HOTP/TOTP one time password (RFC 4226/RFC 6238) implementation"""
import base64
import hmac
import struct
import time
import urllib.parse
__all__ = ['HOTP', 'TOTP', 'from_qr', 'default_alg', 'default_digits', 'default_period']
default_alg = 'sha1'
default_digits = 6
default_period = 30
class HOTP:
secret: bytes
alg: str
digits: int
counter: int
issuer: str
user_account: str
def __init__(self, secret: bytes, digits: int = default_digits, alg: str = default_alg,
counter: int = 0) -> None:
self.secret = secret
self.digits = digits
self.alg = alg
self.counter = counter
self.issuer = self.user_account = ''
def __repr__(self) -> str:
return f'HOTP(digits={self.digits}, alg={self.alg.lower()})'
def token(self, counter: int | None = None) -> str:
"""Calculate the HOTP value for the given counter."""
if counter is None:
counter = self.counter
if counter < 0:
raise ValueError("HOTP counter must be non-negative")
counter_bin = struct.pack('>Q', counter)
digest = hmac.new(self.secret, counter_bin, self.alg).digest()
offset = digest[-1] & 0x0F
p = digest[offset:offset + 4]
result = struct.unpack('>L', p)[0] & 0x7FFFFFFF
return f'{result % 10 ** self.digits:0{self.digits}}'
def to_qr(self) -> str:
netloc = self.__class__.__name__.lower()
params: dict[str, str | bytes | int] = {
'secret': base64.b32encode(self.secret).rstrip(b'=')
}
if self.issuer:
path = f'{self.issuer}:{self.user_account}'
params['issuer'] = self.issuer
else:
path = self.user_account
path = urllib.parse.quote(path)
if self.alg != default_alg:
params['algorithm'] = self.alg.upper()
if self.digits != default_digits:
params['digits'] = self.digits
if type(self) is HOTP:
params['counter'] = self.counter
if type(self) is TOTP and self.period != default_period:
params['period'] = self.period
query = urllib.parse.urlencode(params)
return urllib.parse.urlunparse(('otpauth', netloc, path, None, query, None))
class TOTP(HOTP):
period: int
base: int
def __init__(self, secret: bytes, digits: int = default_digits, alg: str = default_alg,
period: int = default_period, base: int = 0) -> None:
super().__init__(secret, digits, alg)
self.period = period
self.base = base
def __repr__(self) -> str:
return f'TOTP(digits={self.digits}, alg={self.alg.lower()}, period={self.period})'
def count(self, ts: float) -> int:
"""Calculate the TOTP counter for a given timestamp."""
return (int(ts) - self.base) // self.period
def match(self, token: str, *, fuzz: int = 1, ts: float | None = None) -> bool:
"""Return true if the TOTP token matches around the given timestamp.
`fuzz` is the number of periods on each side of `ts` that are checked
for a match."""
ctr = self.count(ts or time.time())
return any(token == self.token(ctr + f) for f in range(-fuzz, fuzz + 1))
def from_qr(qr: str) -> HOTP:
"""Decode a HOTP or TOTP QR code URL into the appropriate object."""
url = urllib.parse.urlparse(qr)
if url.scheme != 'otpauth':
raise ValueError(f'invalid scheme "{url.scheme}"')
query = {k: v[0] for k, v in urllib.parse.parse_qs(url.query).items()}
secret = base64.b32decode(query['secret'])
alg = query.get('algorithm', 'sha1')
digits = int(query.get('digits', default_digits))
if url.netloc == 'hotp':
hotp = HOTP(secret, digits, alg, int(query['counter']))
elif url.netloc == 'totp':
hotp = TOTP(secret, digits, alg, int(query.get('period', default_period)))
else:
raise ValueError(f'invalid protocol "{url.netloc}"')
path = urllib.parse.unquote(url.path.lstrip('/')).split(':')
if len(path) == 2:
hotp.issuer = path[0]
hotp.user_account = path[1].strip()
elif len(path) == 1 and path[0]:
hotp.user_account = path[0].strip()
else:
hotp.issuer = query.get('issuer', '')
return hotp