In [None]:
import json
from pymongo import MongoClient
from tqdm import tqdm
from functools import partial
from copy import deepcopy

In [None]:
def read_config(config_filename):
    """Read the json config."""

    with open(config_filename, 'r') as f:
        config = json.load(f, strict=False)
        
    return config

def get_database_connection(config, section):
    """Get a pymongo connection."""
    
    client = MongoClient(config[section + '_server'])
    db = client[config[section + '_database']]
    return db

In [None]:
config_filename = '../config.json.sample'
config = read_config(config_filename)
source = get_database_connection(config, "source")

In [None]:
destination = get_database_connection(config, "destination")

In [None]:
for item in source['users'].find():
    print(item)
    break

In [None]:
config['pseudonymize']

In [None]:
class CountPseudonymizer():
    """Pseudonymize strings."""
    
    def __init__(self):
        self.mapping = {}
    
    def pseudonymize(self, inp):
        inp = str(inp)
        
        if inp in self.mapping:
            return self.mapping[inp]
        
        out = str(len(self.mapping))
        
        self.mapping[inp] = out
        
        return out

In [None]:
pseudonymizer = CountPseudonymizer()

In [None]:
def iterate_mongo(db):
    """Go over each entry in a database."""
    for collection_name in tqdm(db.list_collection_names(), desc="DB"):
        collection = source[collection_name]
        for entry in tqdm(collection.find(), desc=collection_name,
                          total=collection.estimated_document_count()):
            yield collection_name, entry

In [None]:
for collection, entry in iterate_mongo(source):
    print(collection, entry)
    break

In [None]:
def sink(g):
    """Iterate over all entries."""
    for entry in g:
        pass

In [None]:
def pseudonymize(collection, entry, pseudonymizer, config):
    """Pseudonymize an entry in the database."""
    
    entry_out = entry
    
    if collection in config['pseudonymize']:
        entry_out = deepcopy(entry)
        
        config_keys = config['pseudonymize'][collection]
        entry_keys = entry_out.keys()
        
        # print(config_keys, entry_keys)
        
        for field in set(config_keys).intersection(entry_keys):
            entry_out[field] = pseudonymizer.pseudonymize(entry[field])
            
    return collection, entry_out

In [None]:
def write_entry(collection, entry, db):
    """Write data to a collection."""
    db[collection].insert_one(entry)
    return collection, entry

In [None]:
def compose_with_args(fs):
    """Apply many functions one after another."""
    
    def compose_fcn(args):
        for f in fs:
            # print(f, args)
            args = f(*args)
        return args
        
    return compose_fcn

In [None]:
p = CountPseudonymizer()

In [None]:
pseudonymize_p = partial(pseudonymize,
                         pseudonymizer=CountPseudonymizer(),
                         config=config)

write_p = partial(write_entry, db=destination)

In [None]:
pseudonymize_p("users", {'email': "sdfsf"})

In [None]:
items = iterate_mongo(source)
pseudonymize_and_write = compose_with_args([pseudonymize_p, write_p])
out = map(pseudonymize_and_write, items)
sink(out)