In [1]:
from base64 import b64encode, b64decode
import bcrypt
import csv
from tqdm.notebook import tqdm

import sys
sys.path.append('../app')
import cipher
from model import Person
from util import ConnectionContext
from constants import MYSQL_PORT, USER_SECRET_KEY_LENGTH

In [2]:
connctx = ConnectionContext({
    'user': os.environ.get('MYSQL_DB_USER'),
    'password': os.environ.get('MYSQL_DB_PASSWORD'),
    'db': os.environ.get('MYSQL_DB_DATABASE'),
    'host': os.environ.get('MYSQL_DB_HOST'),
    'port': MYSQL_PORT
})

In [3]:
# populate user data with random data for example

import random, string
def randname():
    names = ''.join(random.choices(string.ascii_lowercase, k=12))
    return names[:6], names[:6] + '-' + names[6:]

user_list_file = '../data/user_names.csv'

with open(user_list_file, 'w') as f:
    w = csv.writer(f)
    for name, fullname in [randname() for _ in range(10)]:
        w.writerow((name, fullname))

In [4]:
### example of how to populate a database with users
#
# in this example, the csv has the format
# firstname1, fullname1
# firstname2, fullname2
# ...
#
# and outputs a csv where each line is
# name, fullname, secret

user_list_file = '../data/user_names.csv'
output_file = '../data/user_secrets.csv'

nrows = 0
with open(user_list_file) as f:
    for _ in csv.reader(f):
        nrows += 1

with connctx as conn, open(user_list_file) as f, open(output_file, 'w') as g:
    writer = csv.writer(g)
    for row in tqdm(csv.reader(f), total=nrows):
        sec = Person.new(conn, row[0], row[1])
        writer.writerow(row + [sec])


  0%|          | 0/10 [00:00<?, ?it/s]

In [5]:
### example to verify that user secrets are capable of encrypting and decrypting messages
message = "hello world"

with connctx as conn, open(output_file) as f:
    for row in tqdm(csv.reader(f), total=nrows):
        pid, pw = cipher.user_info_from_secret(row[2])
        person = Person.get(conn, pid)
        assert message == cipher.hybrid_decrypt(
            cipher.hybrid_encrypt(
                message,
                cipher.deserialize_public_key(person.public_key)
            ), cipher.decrypt_private_key(
                person.encrypted_private_key,
                pw
            )
        )

  0%|          | 0/10 [00:00<?, ?it/s]

In [6]:
### create a test user if necessary

testuser_names = ('testy', 'testy-mctesterson')

private_key, public_key = cipher.generate_key_pair()
public_key_str = cipher.serialize_public_key(public_key)

pw = os.urandom(USER_SECRET_KEY_LENGTH)
encrypted_private_key = cipher.encrypt_private_key(private_key, pw)
secret_key_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode()

with connctx as conn:
    Person(*testuser_names, public_key_str, encrypted_private_key, secret_key_hash).insert(conn)
    pid = conn.lastrowid

In [7]:
### example of nuking and resetting keys for a test user, without changing the password

# insert your own data here of course
testuser_names = ('testy', 'testy-mctesterson')
pid = 83
pw = b64decode(b'IuFeurTvzUlj46igTj2rh8sL')

private_key, public_key = cipher.generate_key_pair()
public_key_str = cipher.serialize_public_key(public_key)
encrypted_private_key = cipher.encrypt_private_key(private_key, pw)
secret_key_hash = bcrypt.hashpw(pw, bcrypt.gensalt()).decode()

with connctx as conn:
    conn.execute(
        'UPDATE persons SET secret_key_hash = %s, public_key = %s, encrypted_private_key = %s WHERE id = %s',
        (secret_key_hash, public_key_str, encrypted_private_key, pid)
    )
