**Company** : <br>
Tech Firm

**Notebook Function** : <br>
    This notebook walks through the steps to finetune mittens by starting with the initial company  embeddings that were trained from the Tech firm corpus

**Output File(s)** : <br>
    embeddings_50d_mincount50 - A folder containing the user embeddings for each time period

**Author(s)** : <br>
Lara Yang, Sarayu Anshuman

Install libraries

In [1]:
pip install mittens

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install gensim

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install scikit-learn

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


Run helper functions

In [4]:
import os
import sys
from collections import defaultdict
from datetime import datetime
import pandas as pd
import numpy as np
from mittens import Mittens
import csv
from operator import itemgetter
import ujson as json
import re
from gensim.matutils import cossim
from gensim.test.utils import datapath, get_tmpfile
from gensim.models import KeyedVectors
from gensim.scripts.glove2word2vec import glove2word2vec
from statistics import mean 
from sklearn.decomposition import PCA

#########################################################################
########### Helper Functions for Generating Mittens Embeddings ##########
#########################################################################
def _window_based_iterator(toks, window_size, weighting_function):
    for i, w in enumerate(toks):
        yield w, w, 1
        left = max([0, i-window_size])
        for x in range(left, i):
            yield w, toks[x],weighting_function(x)
        right = min([i+1+window_size, len(toks)])
        for x in range(i+1, right):
            yield w, toks[x], weighting_function(x)
    return

def glove2dict(glove_filename):
    """
    Reads word vectors into a dictionary
    Parameters
    ----------
    glove_filename : str
        Name of file that contains vectors
    Returns
    -------
    data : dict
        A dictionary matching words to their vectors
    """
    with open(glove_filename) as f:
        reader = csv.reader(f, delimiter=' ', quoting=csv.QUOTE_NONE)
        data = {line[0]: np.array(list(map(float, line[1: ]))) for line in reader}
    return data

# Inspired by original build_weighted_matrix in utils.py in the Mittens paper source codebase
def build_weighted_matrix(emails,
        mincount=300, vocab_size=None, window_size=10,
        weighting_function=lambda x: 1 / (x + 1)):
    """
    Builds a count matrix based on a co-occurrence window of
    `window_size` elements before and `window_size` elements after the
    focal word, where the counts are weighted based on proximity to the
    focal word.
    Parameters
    ----------
    emails : list of dicts
        Emails converted from JSON formats
    mincount : int
        Only words with at least this many tokens will be included.
    vocab_size : int or None
        If this is an int above 0, then, the top `vocab_size` words
        by frequency are included in the matrix, and `mincount`
        is ignored.
    window_size : int
        Size of the window before and after. (So the total window size
        is 2 times this value, with the focal word at the center.)
    weighting_function : function from ints to floats
        How to weight counts based on distance. The default is 1/d
        where d is the distance in words.
    Returns
    -------
    X : pd.DataFrame
        Cooccurence matrix guaranteed to be symmetric because of the way the counts are collected.
    """
    wc = defaultdict(int)
    corpus = read_corpus(emails, sentence_delim=False)
    if corpus is None:
        print("These emails are empty\t{}.\n".format(str(emails)))
        return pd.DataFrame()
    for toks in corpus:
        for tok in toks:
            wc[tok] += 1
    if vocab_size:
        srt = sorted(wc.items(), key=itemgetter(1), reverse=True)
        vocab_set = {w for w, c in srt[: vocab_size]}
    else:
        vocab_set = {w for w, c in wc.items() if c >= mincount}
    vocab = sorted(vocab_set)
    n_words = len(vocab)
    # Weighted counts:
    counts = defaultdict(float)
    for toks in corpus:
        window_iter = _window_based_iterator(toks, window_size, weighting_function)
        for w, w_c, val in window_iter:
            if w in vocab_set and w_c in vocab_set:
                counts[(w, w_c)] += val
    X = np.zeros((n_words, n_words))
    for i, w1 in enumerate(vocab):
        for j, w2 in enumerate(vocab):
            X[i, j] = counts[(w1, w2)]
    X = pd.DataFrame(X, columns=vocab, index=pd.Index(vocab))
    return X

def read_corpus(emails, sentence_delim=False):
    """
    Parameters
    ----------
    emails : list of dict
        A list of emails converted from JSON formats
    sentence_delim : bool, optional
        If true, co-occurrences across sentence boundaries are ignored.
    Returns
    -------
    list of list of str
        Corpus converted from emails.
        If sentence_delim is false, returns a list of emails, which are represented as lists of tokens
        If sentence_delim is true, returns a list of sentences, which are represented as lists of tokens
    """
    if not sentence_delim:
        return [email['body'].replace('\n', ' ').replace("SENT_END", "").strip().split() for email in emails]
    else:
        return [sent.strip().split() for email in emails for line in email['body'].split('\n') for sent in line.split('SENT_END') if len(sent) > 0]

def output_embeddings(mittens_df, filename, compress=False):
    if compress:
        mittens_df.to_csv(filename + '.gz', quoting=csv.QUOTE_NONE, header=False, sep=" ", compression='gzip')
    else:
        mittens_df.to_csv(filename, quoting=csv.QUOTE_NONE, header=False, sep=" ")
    return

def isnull_wrapper(x):
    r = pd.isnull(x)
    if type(r) == bool:
        return r
    return r.any()

def cossim_with_none(vec1, vec2, vec_format='sparse'):
    """
    Auxiliary function that calls cossim function to test if vectors are None to prevent erroring out.
    Parameters
    ----------
    vec1 : list of (int, float), gensim sparse vector format
    vec2 : list of (int, float), gensim sparse vector format
    format : str, optional
        Either sparse or dense. If sparse, vec1 and vec2 are in gensim sparse vector format; use cossim function from gensim.
        Otherwise, vec1 and vec2 are numpy arrays and cosine similarity is hand calculated
    Returns
    -------
    float
        Cosine similarity between vec1 and vec2
    """
    if not (isnull_wrapper(vec1) or isnull_wrapper(vec2)):
        if vec_format == 'sparse':
            return cossim(vec1, vec2)
        elif vec_format == 'dense':
            if len(vec1) == 0 or len(vec2) == 0:
                return None
            return np.dot(vec1, vec2)/(np.linalg.norm(vec1) * np.linalg.norm(vec2))
        else:
            raise ValueError()
    return None

#########################################################################
############# Helper Functions for Working with JSON Emails #############
#########################################################################

def slice_user_corpus(emails, train_mode):
    """
    Similar to slice_user_corpus in the Coco, modified to work with Tech firm data structure
    Parameters
    ----------
    emails : list of dict
        A list of emails converted from JSON formats to dictionaries 
    train_mode : str
        One of 'annual', 'quarterly', 'all'
        Indicates how to chunk up emails - into quarters, years, or both
    Returns
    -------
    timekey2emails : dict
        Matches quarters or years to respective emails
    """
    timekey2emails = defaultdict(list)
    for email in emails:
        if train_mode == 'annual':
            timekey2emails[email['year']].append(email)
        elif train_mode == 'quarterly':
            timekey2emails[email['quarter']].append(email)
        elif train_mode == 'halfyear':
            timekey2emails[email['halfyear']].append(email)
        elif train_mode == 'all':
            timekey2emails[email['year']].append(email)
            timekey2emails[email['quarter']].append(email)
    return timekey2emails

#########################################################################
############# Helper Functions for Working with Date Objects ############
#########################################################################
def str_to_datetime(date):
    if date is None:
        return None
    date = re.sub(r"(\([A-Z]{3,}\))", "", date)
    date = date.strip()
    dt = None
    for fmt in ('%a, %d %b %Y %H:%M:%S %z', '%d %b %Y %H:%M:%S %z', '%a %d, %b %Y %H:%M:%S %z', '%a, %d %b %Y %H:%M:%S %Z', '%a, %d %b %Y %H:%M:%S (%Z)', '%Y-%m-%d %H:%M:%S%z'):
        try:
            dt = datetime.strptime(date, fmt)
        except ValueError:
            pass
    return dt

def to_quarter(date, format):
    """
    Returns quarter of date as str using date formats used in Tech firm data.
    Support for string format is provided, but if both to_quarter and to_year are needed,
    it is computationally more efficient to first convert string to datetime and then call
    both to_quarter and to_year on the datetime object to avoid duplicated datetime conversion.
    """
    if date is None:
        return None
    year, month = 0, 0
    if format == 'str':
        dt = str_to_datetime(date)
        if dt is None:
            return None
        year = dt.year
        month = dt.month
    elif format == 'datetime':
        year = date.year
        month = date.month
    quarter = ((int(month)-1) // 3) + 1
    timekey = str(year) + 'Q' + str(quarter)
    return timekey

def to_year(date, date_type):
    """
    Returns year of date as str using date formats used in Tech firm.
    Support for string format is provided, but if both to_quarter and to_year are needed,
    it is computationally more efficient to first convert string to datetime and then call
    both to_quarter and to_year on the datetime object to avoid duplicated datetime conversion.
    """
    if date is None:
        return None
    if date_type == 'str':
        date = re.sub(r"\([A-Z]{3,}\)", "", date)
        date = date.strip()
        dt = None
        for fmt in ('%a, %d %b %Y %H:%M:%S %z', '%d %b %Y %H:%M:%S %z', '%a %d, %b %Y %H:%M:%S %z', '%a, %d %b %Y %H:%M:%S %Z'):
            try:
                dt = datetime.strptime(date, fmt)
            except ValueError:
                pass
        if dt is None:
            return None
        return str(dt.year)
    elif date_type == 'datetime':
        return str(date.year)

def to_halfyear(date, format):
    """
    Returns quarter of date as str using date formats used in tech firm data.
    Support for string format is provided, but if both to_quarter and to_year are needed,
    it is computationally more efficient to first convert string to datetime and then call
    both to_quarter and to_year on the datetime object to avoid duplicated datetime conversion.
    """
    if date is None:
        return None
    year, month = 0, 0
    if format == 'str':
        dt = str_to_datetime(date)
        if dt is None:
            return None
        year = dt.year
        month = dt.month
    elif format == 'datetime':
        year = date.year
        month = date.month
    halfyear = ((int(month)-1) // 6) + 1
    timekey = str(year) + 'HY' + str(halfyear)
    return timekey

def word_similarity(model, w1, w2):
    """
    This is an auxilary function that allows for comparing one word to another word or multiple words
    If w1 and w2 are both single words, n_similarity returns their cosine similarity which is the same as 
    simply calling similarity(w1, w2)
    If w1 or w2 is a set of words, n_similarity essentially takes the mean of the set of words and then computes
    the cosine similarity between that vector mean and the other vector. This functionality is both reflected
    in its source code and has been verified manually.
    Parameters
    ----------
    model : KeyedVectors
        The model that contains all the words and vectors
    w1 : str or list
        The first word or word list to be compared
    w2 : str or list
        The second word or word list to be compared
    Returns
    -------
    float
        Cosine similarity between w1 and w2
    """
    if not isinstance(w1, list):
        w1 = [w1]
    if not isinstance(w2, list):
        w2 = [w2]
    w1 = [w for w in w1 if w in model.key_to_index]
    w2 = [w for w in w2 if w in model.key_to_index]
    if len(w1) == 0 or len(w2) == 0:
        return None
    return model.n_similarity(w1, w2)

#########################################################################
############## Helper Functions for Working with Dataframes #############
#########################################################################
def dict_to_df(index2rows, cols, index_name):
    """
    Parameters
    ----------
    index2rows : dict
        Dictionary mapping index to rows to be coverted
    cols : list
        List of column names of type str
    index : list
        List of index names
    Returns
    -------
    df : pd.DataFrame
        Constructed dataframe
    """
    if index2rows is None or len(index2rows) == 0:
        return None
    if len(index_name) == 1:
        df = pd.DataFrame.from_dict(index2rows, orient='index', columns=cols)
        df.index.name = index_name[0]
        df.sort_index(axis=0, inplace=True)
        return df
    else:
        df = pd.DataFrame.from_dict(index2rows, orient='index', columns=cols)
        df = pd.DataFrame(df, pd.MultiIndex.from_tuples(df.index, names=index_name))
        df.sort_index(axis=0, inplace=True)
        return df

def get_recipients(msg):
    """
    Return a set of recipients of the current message.
    self is removed from list of recipients if in recipients
    Tech firm's from fields are all lists based on visual inspection
    but we check for the type of the field just in case
    """
    sender = msg['from'][0] if type(msg['from']) == list else msg['from']
    return set(msg.get('to', []) + msg.get('cc', []) + msg.get('bcc', [])) - set([sender])

def is_internal_msg(msg):
    recipients = get_recipients(msg)
    internal = True
    for r in recipients:
        if not re.match(r'\d+', r):
            internal = False
            break
    return internal
    
def extract_variables_from_file(file):
    """ 
    Extract relevant information from name of embedding file, with format: {}(_{})?.txt
    """
    file_chunks = file[0:-4].split('_')
    usr = file_chunks[0]
    time_key = file_chunks[1] if len(file_chunks) == 2 else None
    return (usr, time_key)

def month2timekey(month, time_key):
    """
    Converts month numbers to actual year or quarter
    """
    result = ''
    if time_key == 'year':
        year = 2006 + month // 12 
        if month % 12 > 2:
            year += 1
        result = str(year)
    elif time_key == 'quarter':
        year = 2006 + month // 12 
        remainder = month % 12
        if remainder > 2:
            year += 1
        quarter = ''
        if remainder <=2:
            quarter = 'Q4'
        elif remainder <= 5:
            quarter = 'Q1'
        elif remainder <= 8:
            quarter = 'Q2'
        else:
            quarter = 'Q3'
        result = str(year)+quarter
    return result

def extract_hr_df(hr_file, time_key=None):
    """
    Extract and preprocess Tech firm HR data
    """
    hr = pd.read_csv(hr_file, index_col=0)
    # spell is a counter of the number of observations, not tenure
    hr['hire_month'] = hr.apply(lambda row : row['month'] if row['spell'] == 1 else np.nan, axis=1)
    hr['hire_month'] = hr['hire_month'].fillna(method='ffill')
    hr['tenure_months'] = hr.apply(lambda row : (row['month'] - row['hire_month'])+1, axis=1)
    hr['tenure_days'] = hr.apply(lambda row: row['tenure_months'] * 365/12, axis=1)
    hr['employeeid'] = hr['employeeid'].astype(str)
    hr['bonus_eligible'] = hr['bonus_eligible'].astype(int)
    if not time_key:
        hr.drop_duplicates(subset='employeeid', keep='last', inplace=True)
        hr = (hr[['employeeid', 'hire_month', 'sales', 'marketing', 'tech', 'vol_exit_event', 'invol_exit_event', 'manager', 'female', 'fav_rating', 'bonus', 'bonus_eligible', 'age', 'age2', 'cumbonus', 'tenure_months', 'tenure_days']])
        hr.rename(columns={"employeeid": "anon_id"}, inplace=True)
        hr.set_index('anon_id', inplace=True)
    else:
        hr[time_key] = hr['month'].apply(lambda row : month2timekey(row, time_key))
        hr['bonus'] = hr.groupby(['employeeid', time_key])['bonus'].transform('sum')
        hr['vol_exit_event'] = hr.groupby(['employeeid', time_key])['vol_exit_event'].transform('max')
        hr['invol_exit_event'] = hr.groupby(['employeeid', time_key])['invol_exit_event'].transform('max')
        hr['manager'] = hr.groupby(['employeeid', time_key])['manager'].transform('max')
        hr['fav_rating'] = hr.groupby(['employeeid', time_key])['fav_rating'].transform('max')
        hr['bonus_eligible'] = hr.groupby(['employeeid', time_key])['bonus_eligible'].transform('max')
        hr.drop_duplicates(subset=['employeeid', time_key], keep='last', inplace=True)
        hr = (hr[['employeeid', time_key, 'hire_month', 'sales', 'marketing', 'tech', 'vol_exit_event', 'invol_exit_event', 'manager', 'female', 'fav_rating', 'bonus', 'bonus_eligible', 'age', 'age2', 'cumbonus', 'tenure_months', 'tenure_days']])
        hr.rename(columns={"employeeid": "anon_id"}, inplace=True)
        hr.set_index(['anon_id', time_key], inplace=True)
    return hr

Import some more libraries

In [None]:
import pickle
import glob
import logging
import os
import random
import re
import sys
import ujson as json
import pandas as pd
import numpy as np
from numpy import random
from collections import defaultdict
from datetime import datetime
from utils import *
import multiprocessing
import fasttext
model = fasttext.load_model('lid.176.ftz')

In [None]:
import os
current_dir = os.getcwd()
current_dir

Set hyperparameters and directories

In [None]:
mittens_params = 0.1
window_size = 10
embedding_dim = 50
mincount = 50 
max_iter = 100 
num_cores = 10
num_users_to_test = 60
vocab_size = 2500
max_iter_all = 3000

home_dir = current_dir
corpus_dir = "/zfs/projects/faculty/amirgo-identification/identification-Sarayu/tech/training/cleaned_email_data_v2"
mittens_dir = os.path.join(home_dir, "mittens")
utils_dir = os.path.join(mittens_dir, "utils")
output_dir = os.path.join(home_dir, "embeddings_{}d_mincount{}".format(embedding_dim, mincount))
test_dir = os.path.join(mittens_dir, "embeddings_test_{}d_mincount{}".format(embedding_dim, mincount)) 
company_embeddings_filename = "/zfs/projects/faculty/amirgo-identification/identification-Sarayu/tech/training/vectors_high_prob_eng_08_50d_techfirm.txt"

print(corpus_dir)
print(company_embeddings_filename)
print(output_dir)

Now finetune Mittens on the Tech corpus

In [None]:
def load_user_emails(corpus_dir):
    """
    Loads all emails from JSON objects residing in corpus_dir
    Parameter
    ---------
    corpus_dir : str
        File containing all emails
    Returns
    -------
    uid2emails : dict
        A dictionary mapping user ids to their email
    """
    uid2emails = defaultdict(list)
    for filename in os.listdir(corpus_dir):
        usr = filename.replace('.txt', '')
        with open(os.path.join(corpus_dir, filename), encoding='utf-8') as f:
            emails = json.load(f)
            uid2emails[usr] = emails
            eng_emails = []
            for e in emails:
                clean_e = ' '.join(e['body'].split('\n'))
                if len(clean_e) == 0:
                     continue
                r = model.predict(clean_e)
                lang = (r[0][0], r[1][0])
                if lang[0] == '__label__en' and lang[1] > 0.8: #set at 0.8
                     eng_emails.append(e)
            uid2emails[usr] = eng_emails
    return uid2emails


def process_user(i, num_users, uid, emails, timekey_type='quarterly'):
    """
    Workhorse function for training embedding spaces for each individual, filtering out emails sent to individuals outside of the organization
    Parameters
    ----------
    i : int
        Index of current user used to keep track of progress
    num_users : int
        Total number of users used to keep track of progress
    uid : str
        User ID of current user
    emails : list of dict
        A list of emails converted from JSON formats that include both content and meta-data
    timekey_type :
        One of quarterly or annual
    """
    if i % 50 == 0:
        print('reached processing single user function')
        sys.stderr.write("\nProcessing \t{}/{} - user '{}', at {}.\n".format(i, num_users, uid, datetime.now()))
    user_embedding_filename = os.path.join(output_dir, "{}.txt".format(uid))
    internal_emails = []
    #using only internal emails
    for e in emails:
        if is_internal_msg(e): internal_emails.append(e)
    
    if not os.path.exists(user_embedding_filename):
        X = build_weighted_matrix(internal_emails, mincount=mincount, window_size=window_size)
        if X.empty:
            return
        mittens = Mittens(n=embedding_dim, max_iter=max_iter, mittens=mittens_params)
        mittens = mittens.fit(
            X.values, 
            vocab=list(X.index), 
            initial_embedding_dict=company_embeddings)
        mittens_df = pd.DataFrame(mittens, index=X.index)
        if not mittens_df.empty:
            output_embeddings(mittens_df, filename=user_embedding_filename)
    #further finetuning on a year or quarter basis
    user_embeddings = glove2dict(user_embedding_filename)
    sliced_usr_corpus = slice_user_corpus(emails, timekey_type)
    for timekey in sliced_usr_corpus.keys():
        emails = sliced_usr_corpus[timekey]
        user_embedding_time_filename = os.path.join(output_dir, "{}_{}.txt".format(uid, timekey))
        if not os.path.exists(user_embedding_time_filename):             
            X = build_weighted_matrix(emails, mincount=mincount, window_size=window_size)
            if X.empty:
                continue
            mittens = Mittens(n=embedding_dim, max_iter=max_iter, mittens=mittens_params)
            if not user_embeddings:
                sys.stderr.write("\n%s does not have corresponding user embeddings with timekey %s.\n" % (usr, timekey))
            mittens = mittens.fit(
                X.values,
                vocab=list(X.index),
                initial_embedding_dict=user_embeddings) #supplying the user embeddings as the initial embeddings
            mittens_df = pd.DataFrame(mittens, index=X.index)
            if not mittens_df.empty:
                output_embeddings(mittens_df, filename=user_embedding_time_filename)
    return

def process_users(uid2emails, test_mode):
    """
    Processes emails of each user in parallel

    Parameter
    ---------
    uid2emails : dict
        A dictionary matching user IDs to a list of emails converted from JSON to dictionaries
    test_mode : bool
        If in test_mode, only process num_users_to_test users
    """
    num_users = len(uid2emails)
    if test_mode:
        keys = random.choice(list(uid2emails), num_users_to_test)
        uid2emails = {k : uid2emails[k] for k in keys}
        num_users = num_users_to_test
    sys.stderr.write('Processing %d users in parallel at %s.\n' % (num_users, str(datetime.now())))
    pool = multiprocessing.Pool(processes = num_cores) 
    #run this for quarterly embeddings
    results = [pool.apply_async(process_user, args=(i, num_users, uid, uid2emails[uid], 'quarterly',)) for i, uid in enumerate(uid2emails)]
    # necessary for fetching errors
    for r in results:
        r.get()
    pool.close()
    pool.join()
    return

#to use the pre-trained embeddings glove.6B.{}d.txt instead of company embeddings if training is not done from scratch
def train_all_corpus(emails, glove_embeddings):
    """
    Finetunes all emails on GloVe embeddings
    Parameter
    ---------
    emails : list of dict
        List of dictionaries where each dictionary represents one single email converted from JSON
    glove_embeddings : dict
        A dictionary that maps strings to global vectors
    }
    """
    sys.stderr.write('Building co-occurrence matrix with all corpora at %s.\n' % str(datetime.now()))
    X = build_weighted_matrix(emails, vocab_size=vocab_size, window_size=window_size)
    sys.stderr.write('Fitting mittens for all corpora at %s.\n' % datetime.now())
    mittens = Mittens(n=embedding_dim, max_iter=max_iter_all, mittens=mittens_params)
    mittens = mittens.fit(
        X.values, 
        vocab=list(X.index), 
        initial_embedding_dict=glove_embeddings)
    mittens_df = pd.DataFrame(mittens, index=X.index)
    output_embeddings(mittens_df, filename=company_embeddings_filename)
    sys.stderr.write('Successfully produced mittens embeddings using all corpora at %s.\n' % datetime.now())
    return


if __name__ == '__main__':
    starttime = datetime.now()
    test_mode = False
    for d in [output_dir]:
        if not os.path.exists(d):
            os.mkdir(d)
    if test_mode:
        output_dir = test_dir
    
    sys.stderr.write("Loading files at {}.\n".format(datetime.now()))
    uid2emails = load_user_emails(corpus_dir)
    print('loaded all users')

    flag = 1
    if not os.path.exists(company_embeddings_filename):
        print('reached here - the training was not done from scratch so finetuning on already pre-trained Glove vectors will be done now')
        flag = 0
        sys.stderr.write("Finetuning on all emails at {}.\n".format(datetime.now()))
        glove_embeddings = glove2dict(glove_filename)
        train_all_corpus(sum(uid2emails.values(), []), glove_embeddings) #this function is called only when the finetuning is done on publically available Glove embeddings.

    if flag == 1:
        print('Finetuning on the company embeddings that were previously trained from scratch')
    company_embeddings = glove2dict(company_embeddings_filename)
    if test_mode:
        sys.stderr.write("Processing test users at {}.\n".format(datetime.now()))
    else:
        sys.stderr.write("Processing all users at {}.\n".format(datetime.now()))
    
    keys = list(uid2emails.keys())
    first_keys = keys[0:len(keys)//2]
    second_keys = keys[len(keys)//2:]
    process_users({k : uid2emails[k] for k in first_keys}, test_mode) #there are 1266 users in each part
    process_users({k : uid2emails[k] for k in second_keys}, test_mode)

    sys.stderr.write("\n\nFinished processing at {}, with a duration of {}\n".format(datetime.now(), datetime.now()-starttime))