In [1]:
import psycopg2
import pandas as pd
from time import time
from datetime import datetime, timedelta
import math

In [2]:
def timer(f):
    def tmp(*args, **kwargs):
        t = time()
        res = f(*args, **kwargs)

        print('working time: {} sec.'.format(round(time() - t, 2)))
        return res

    return tmp


@timer
def getTable(table = 'web_order', schema = 'kfc', db = 'entropia'):
    try:
        conn = psycopg2.connect(
            database = db, user = 'postgres', password = 'qmzpqmzp',
            host = 'localhost', port = 5432
        )
        query = 'SELECT * FROM {}.{}'.format(schema, table)
        df = pd.read_sql(query, conn)
        return df
    except:
        print('getTable failed')


def getDate(table = 'user_info', schema = 'kfc', db = 'entropia'):
    try:
        conn = psycopg2.connect(
            database = db, user = 'postgres', password = 'qmzpqmzp',
            host = 'localhost', port = 5432
        )
        query = 'SELECT max(updated) as updated {}.{}'.format(schema, table)
        df = pd.read_sql(query, conn)
        return df['update'][0]
    except:
        return datetime(2000, 1, 1).date()


@timer
def getOrder(date, table = 'web_order', schema = 'kfc', db = 'entropia'):
    try:
        conn = psycopg2.connect(
            database = db, user = 'postgres', password = 'qmzpqmzp',
            host = 'localhost', port = 5432
        )
        feat = 'LOWER(email) as email, phone_str as phone, LOWER(first_name) as first_name, LOWER(last_name) as last_name'
        query = """
            SELECT user_id, {}, min(DATE(created)) as created  
            FROM {}.{}
            WHERE status IN (1,3) AND DATE(created) > '{}'
            GROUP BY user_id, LOWER(email), phone_str, LOWER(first_name), LOWER(last_name)
        """.format(feat, schema, table, str(date))
        df = pd.read_sql(query, conn)
        df = df.sort_values(by = ['created']).reset_index()
        del df['index']
        return df
    except:
        print('getTable failed')

@timer
def getUser(date, table = 'web_user', schema = 'kfc', db = 'entropia'):
    try:
        conn = psycopg2.connect(
            database = db, user = 'postgres', password = 'qmzpqmzp',
            host = 'localhost', port = 5432
        )
        feat = 'LOWER(email) as email, fb_id, LOWER(first_name) as first_name, LOWER(last_name) as last_name'
        query = """
            SELECT uid as user_id, {}, min(DATE(created)) as created 
            FROM {}.{}
            WHERE DATE(created) > '{}'
            GROUP BY uid, LOWER(email), fb_id, LOWER(first_name), LOWER(last_name)
        """.format(feat, schema, table, str(date))
        df = pd.read_sql(query, conn)
        df = df.sort_values(by=['created']).reset_index()
        del df['index']
        return df
    except:
        print('getUser failed')

@timer
def getStoredUser(table = 'user_info', schema = 'kfc', db = 'entropia'):
    df = getTable(table = table, schema = schema, db = db)
    df = df.sort_values(by=['updated']).reset_index()
    del df['index']
    return df


def not_null(df):
    l = df.shape[0]
    l_col = max(map(len, df.columns))
    print('{} rows'.format(l))
    print('-' * (5 + len(str(l))))
    for col in df.columns:
        try:
            nn = sum(df[col].notnull())
            un = len(df[col].unique())
            perc = round(100 * nn / l, 2)
            print('{}{}: {}%{} | {}{} unique'.format(col, ' ' * (l_col - len(col)), perc, ' ' * (6 - len(str(perc))), un, ' ' * (len(str(l)) - len(str(un)))))
        except:
            pass

In [3]:
%%time
date = getDate()
users_o = getOrder(date)
users_o['email'] = users_o['email'].apply(lambda x: x.lower() if x is not None else None)
users = getUser(date)
users['email'] = users['email'].apply(lambda x: x.lower() if x is not None else None)
print(users.shape, users_o.shape)

working time: 12.05 sec.
working time: 4.02 sec.
(853563, 6) (780854, 6)
CPU times: user 5.32 s, sys: 216 ms, total: 5.54 s
Wall time: 16.7 s


In [4]:
users.head()

Unnamed: 0,user_id,email,fb_id,first_name,last_name,created
0,2,jasonrey@mailinator.com,,test,test,2016-12-19
1,71,ccyuen_94@hotmail.com,1.0210053875456184e+16,chan,chun,2016-12-19
2,63,muzamirshah@hotmail.com,,muzamir shah,mohamed shariff,2016-12-19
3,54,ange.villanueva@gmail.com,1.0154194809081008e+16,angelina,villanueva,2016-12-19
4,27,enaosh56@gmail.com,,rosh,alina,2016-12-19


In [5]:
not_null(users)

853563 rows
-----------
user_id   : 100.0%  | 853563 unique
email     : 93.48%  | 797927 unique
fb_id     : 50.68%  | 432554 unique
first_name: 99.93%  | 265278 unique
last_name : 73.9%   | 140269 unique
created   : 100.0%  | 691    unique


In [6]:
users_o.head()

Unnamed: 0,user_id,email,phone,first_name,last_name,created
0,3.0,sooleng@compass-interactive.com,60122887526,serenity,puah,2016-12-19
1,65.0,muzamirshah@gmail.com,60192704038,muzamir shah,mohamed shariff,2016-12-19
2,63.0,muzamirshah@gmail.com,60192704038,muzamir shah,mohamed shariff,2016-12-19
3,54.0,ange.villanueva@gmail.com,60122109387,angelina,v,2016-12-19
4,45.0,hs.lee@qsrbrands.com.my,60196000570,hs,lee,2016-12-19


In [7]:
not_null(users_o)

780854 rows
-----------
user_id   : 99.97%  | 649813 unique
email     : 100.0%  | 564912 unique
phone     : 100.0%  | 554677 unique
first_name: 100.0%  | 208449 unique
last_name : 82.65%  | 118161 unique
created   : 100.0%  | 708    unique


# Sending data to ES

In [8]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, A
from elasticsearch import helpers
es = Elasticsearch()

# ignore 400 cause by IndexAlreadyExistsException when creating an index
es.indices.create(index = 'users_db', ignore = 400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'users_db'}

## 1. Users
We split out table to 3 parts:
* Single email & Single fb_id
* Single email &     No fb_id)
*     No email & Single fb_id
* Duplicated email or fb_id

### Functions

In [9]:
def get_es_id(key = 'email'):
    s = Search(using = es, index = 'users_db')
    out = []
    for hit in s.scan():
        out.extend(hit.to_dict()[key])
    return list(set(out))

def get_max():
    s = Search(using = es, index = 'users_db')
    s = s[:0]
    aggr_max = A('max', field = 'entropia_id')
    s.aggs.bucket('max_id', aggr_max)
    r = s.execute()
    return int(r['aggregations']['max_id']['value'])

### Singles

In [10]:
%%time
#email
tmp = users['email'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'email': 'cnt'}, inplace = True)
email = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] == 1].reset_index()['uid']})

tmp = users['fb_id'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'fb_id': 'cnt'}, inplace = True)
fb = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] == 1].reset_index()['uid']})

tmp = pd.merge(users, email, left_on = 'email', right_on = 'uid', how = 'inner')
del tmp['uid']
single = tmp.loc[tmp['fb_id'].isnull()]

tmp = pd.merge(tmp, fb, left_on = 'fb_id', right_on = 'uid', how = 'inner')
del tmp['uid']
single = single.append(tmp, ignore_index = True, sort = False)

tmp = pd.merge(users, fb, left_on = 'fb_id', right_on = 'uid', how = 'inner')
del tmp['uid']
single = single.append(tmp.loc[tmp['email'].isnull()], ignore_index = True, sort = False)

del tmp, email, fb

CPU times: user 2.38 s, sys: 1.57 ms, total: 2.38 s
Wall time: 2.39 s


In [14]:
exist_email = get_es_id(key = 'email')
exist_fb = get_es_id(key = 'fb_id')
len(exist_email), len(exist_fb)

(100000, 0)

In [15]:
%%time
email = pd.DataFrame({'uid': exist_email, 'flag_email': 1})
single = pd.merge(single, email, left_on = 'email', right_on = 'uid', how = 'left')
del email, single['uid']

fb = pd.DataFrame({'uid': exist_fb, 'flag_fb': 1})
single = pd.merge(single, fb, left_on = 'fb_id', right_on = 'uid', how = 'left')
del fb, single['uid']

repeat = single.loc[(single['flag_email'] == 1) | (single['flag_fb'] == 1)].reset_index()
new = single.loc[single['flag_email'].isnull() & single['flag_fb'].isnull()].reset_index()
del repeat['index'], repeat['flag_email'], repeat['flag_fb']
del new['index'], new['flag_email'], new['flag_fb']
print(new.shape, repeat.shape, single.shape)
del single

(753541, 6) (100000, 6) (853541, 8)
CPU times: user 777 ms, sys: 11.9 ms, total: 788 ms
Wall time: 788 ms


#### 1. new ones

In [12]:
def to_es_single_new(df, last_id = 0):
    for i in df.index:
        last_id += 1
        yield {
            '_op_type': 'index',
            '_index': 'users_db',
            '_type': 'document',
            '_id': last_id,
            '_source': {
                'entropia_id': last_id,
                'user_id': [int(df['user_id'][i])],
                'email':[] if df['email'][i] is None else [df['email'][i]],
                'phone':[],
                'fb_id':[] if df['fb_id'][i] is None else [df['fb_id'][i]],
                'first_name':[] if df['first_name'][i] is None else [df['first_name'][i]],
                'last_name':[] if df['last_name'][i] is None else [df['last_name'][i]],
                'updated': str(datetime.now()),#.date()),
                'first_activity': None,
                'registered': str(df['created'][i])
            }
        }

In [13]:
%%time
# temporary to get example
helpers.bulk(es, to_es_single_new(single.loc[single.index < 100000]))

CPU times: user 15.5 s, sys: 9.5 ms, total: 15.5 s
Wall time: 18 s


(100000, [])

In [16]:
%%time
if new.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, to_es_single_new(new, max_id)))
else:
    print('Nothing to write')

(753541, [])
CPU times: user 2min 5s, sys: 136 ms, total: 2min 5s
Wall time: 2min 24s


#### 2. Repeated

In [18]:
def to_es_single_repeat(df):
    def get_value(v_hit, v):
        if type(v_hit) == list:
            return v_hit if v is None or v in v_hit else v_hit + [v]
        else:
            return v_hit if v is None or v_hit is None or v_hit < v else v
    for i in df.index: 
        user_id = None if df['user_id'][i] is None or math.isnan(df['user_id'][i]) else int(df['user_id'][i])
        email = df['email'][i]
        fb_id = df['fb_id'][i]
        first_name = df['first_name'][i]
        last_name = df['last_name'][i]
        registered = None if df['created'][i] is None else str(df['created'][i])
        #----------------------------------------
        s = Search(using = es, index = 'users_db')
        q = []
        if df['email'][i] is not None:
            q.append({'match': {'email.keyword': df['email'][i]}})
        if df['fb_id'][i] is not None:
            q.append({'match': {'fb_id.keyword': df['fb_id'][i]}})
        s = s.query('bool', should = q)
        r = s.execute()
        #----------------------------------------
        if len(r.hits) > 1:
            print(q)
            continue
        #----------------------------------------
        hit = r.hits[0]
        doc = dict()
        if user_id is not None and user_id not in hit.user_id:
            doc['user_id'] = list(hit.user_id) + [user_id]
        if email is not None and email not in hit.email:
            doc['email'] = list(hit.email) + [email]
        if fb_id is not None and fb_id not in hit.fb_id:
            doc['fb_id'] = list(hit.fb_id) + [fb_id]
        if first_name is not None and first_name not in hit.first_name:
            doc['first_name'] = list(hit.first_name) + [first_name]
        if last_name is not None and last_name not in hit.last_name:
            doc['last_name'] = list(hit.last_name) + [last_name]
        if hit.registered is not None and registered is not None and registered < hit.registered:
            doc['registered'] = registered
        
        if doc == dict():
            continue
        doc['updated'] = str(datetime.now())                
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }

def OLD_to_es_single_repeat(df):
    def get_value(v_hit, v):
        if type(v_hit) == list:
            return v_hit if v is None or v in v_hit else v_hit + [v]
        else:
            return v_hit if v is None or v_hit is None or v_hit < v else v
    for i in df.index: 
        user_id = None if df['user_id'][i] is None or math.isnan(df['user_id'][i]) else int(df['user_id'][i])
        email = df['email'][i]
        fb_id = df['fb_id'][i]
        first_name = df['first_name'][i]
        last_name = df['last_name'][i]
        registered = str(df['created'][i])
        #----------------------------------------
        s = Search(using = es, index = 'users_db')
        q = []
        if df['email'][i] is not None:
            q.append({'match': {'email.keyword': df['email'][i]}})
        if df['fb_id'][i] is not None:
            q.append({'match': {'fb_id.keyword': df['fb_id'][i]}})
        s = s.query('bool', should = q)
        r = s.execute()
        #----------------------------------------
        if len(r.hits) > 1:
            print(q)
            continue
        #----------------------------------------
        hit = r.hits[0]
        doc = {
            'entropia_id': hit.entropia_id,
            'user_id': get_value(list(hit.user_id), user_id),
            'email': get_value(list(hit.email), email),
            'phone':[],
            'fb_id': get_value(list(hit.fb_id), fb_id),
            'first_name': get_value(list(hit.first_name), first_name),
            'last_name': get_value(list(hit.last_name), last_name),
            'updated': str(datetime.now()),#.date()),
            'first_activity': get_value(hit.first_activity, None),
            'registered': get_value(hit.registered, registered)
        }
                
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }

In [19]:
%%time
if repeat.shape[0] > 0:
    print(helpers.bulk(es, to_es_single_repeat(repeat)))
else:
    print('Nothing to write')

(0, [])
CPU times: user 1min 9s, sys: 2.53 s, total: 1min 12s
Wall time: 1min 45s


### Duplicates

In [20]:
%%time
#email
tmp = users['email'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'email': 'cnt'}, inplace = True)
email = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] > 1].reset_index()['uid']})

tmp = users['fb_id'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'fb_id': 'cnt'}, inplace = True)
fb = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] > 1].reset_index()['uid']})

dupl = pd.merge(users, email, left_on = 'email', right_on = 'uid', how = 'inner')
del dupl['uid']

tmp = pd.merge(users, fb, left_on = 'fb_id', right_on = 'uid', how = 'inner')
del tmp['uid']
dupl = dupl.append(tmp, ignore_index = True, sort = False)
print(dupl.shape)
del tmp, email, fb

(22, 6)
CPU times: user 1.3 s, sys: 303 µs, total: 1.3 s
Wall time: 1.3 s


In [21]:
%%time
exist_email = get_es_id(key = 'email')
exist_fb = get_es_id(key = 'fb_id')
print(len(exist_email), len(exist_fb))

797916 432551
CPU times: user 31 s, sys: 127 ms, total: 31.1 s
Wall time: 38.7 s


In [22]:
%%time
email = pd.DataFrame({'uid': exist_email, 'flag_email': 1})
dupl = pd.merge(dupl, email, left_on = 'email', right_on = 'uid', how = 'left')
del email, dupl['uid']

fb = pd.DataFrame({'uid': exist_fb, 'flag_fb': 1})
dupl = pd.merge(dupl, fb, left_on = 'fb_id', right_on = 'uid', how = 'left')
del fb, dupl['uid']

repeat = dupl.loc[(dupl['flag_email'] == 1) | (dupl['flag_fb'] == 1)].reset_index()
new = dupl.loc[dupl['flag_email'].isnull() & dupl['flag_fb'].isnull()].reset_index()
del repeat['index'], repeat['flag_email'], repeat['flag_fb']
del new['index'], new['flag_email'], new['flag_fb']
print(new.shape, repeat.shape, dupl.shape)
del dupl

(22, 6) (0, 6) (22, 8)
CPU times: user 748 ms, sys: 165 µs, total: 748 ms
Wall time: 746 ms


#### 1. new ones

In [23]:
def to_es_dupl_new(df, last_id = 0):
    cond_email = df['email'].notnull()
    for email in df.loc[cond_email, 'email'].unique():
        last_id += 1
        cond = df['email'] == email
        _source = {
            'entropia_id': last_id,
            'user_id': list(map(int, df.loc[cond & df['user_id'].notnull(),'user_id'].unique())),
            'email': [email],
            'phone': [],
            'fb_id': list(df.loc[cond & df['fb_id'].notnull(),'fb_id'].unique()),
            'first_name': list(df.loc[cond & df['first_name'].notnull(), 'first_name'].unique()),
            'last_name': list(df.loc[cond & df['last_name'].notnull(), 'last_name'].unique()),
            'updated': str(datetime.now()),#.date()),
            'first_activity': None,
            'registered': str(df.loc[cond & df['created'].notnull(), 'created'].min())          
        }
        yield {
            '_op_type': 'index',
            '_index': 'users_db',
            '_type': 'document',
            '_id': last_id,
            '_source': _source
        }
    #------------------------------
    cond_fb = df['email'].isnull() & df['fb_id'].notnull()
    for fb_id in df.loc[cond_fb, 'fb_id'].unique():
        last_id += 1
        cond = df['fb_id'] == fb_id
        _source = {
            'entropia_id': last_id,
            'user_id': list(map(int, df.loc[cond & df['user_id'].notnull(),'user_id'].unique())),
            'email': list(df.loc[cond & df['email'].notnull(),'email'].unique()),
            'phone': [],
            'fb_id': [fb_id],
            'first_name': list(df.loc[cond & df['first_name'].notnull(), 'first_name'].unique()),
            'last_name': list(df.loc[cond & df['last_name'].notnull(), 'last_name'].unique()),
            'updated': str(datetime.now()),#.date()),
            'first_activity': None,
            'registered': str(df.loc[cond & df['created'].notnull(), 'created'].min())          
        }
        yield {
            '_op_type': 'index',
            '_index': 'users_db',
            '_type': 'document',
            '_id': last_id,
            '_source': _source
        }

In [24]:
%%time
if new.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, to_es_dupl_new(new, max_id)))
else:
    print('Nothing to write')

(11, [])
CPU times: user 50.9 ms, sys: 55 µs, total: 51 ms
Wall time: 57.3 ms


#### 2. repeated

In [27]:
# !TODO: change this
def to_es_dupl_repeat(df):
    cond_email = df['email'].notnull()
    for email in df.loc[cond_email, 'email']: 
        cond = df['email'] == email
        user_id = set(list(map(int, df.loc[cond & df['user_id'].notnull(), 'user_id'].unique())))
        fb_id = set(df.loc[cond & df['fb_id'].notnull(), 'fb_id'].values)
        first_name = set(df.loc[cond & df['first_name'].notnull(), 'first_name'].values)
        last_name = set(df.loc[cond & df['last_name'].notnull(), 'last_name'].values)
        registered = str(df.loc[cond & df['created'].notnull(), 'created'].min())        
        
        s = Search(using = es, index = 'users_db')
        q = [{'match': {'email.keyword': email}}]
        for fb in fb_id:
            q.append({'match': {'fb_id.keyword': fb}})
        s = s.query('bool', should = q)
        r = s.execute()
        
        if len(r.hits) > 1:
            print(q)
            continue
        
        hit = r.hits[0]
        doc = {
            'entropia_id': hit.entropia_id,
            'user_id': list(set(hit.user_id) | user_id),
            'email': list(hit.email) if email not in list(hit.email) else list(hit.email) + [email],
            'phone': list(hit.phone),
            'fb_id': list(set(hit.fb_id) | fb_id),
            'first_name': list(set(hit.first_name) | first_name),
            'last_name': list(set(hit.last_name) | last_name),
            'updated': str(datetime.now()),#.date()),
            'first_activity': hit.first_activity,
            'registered': hit.registered if registered is None or hit.registered < registered else registered
        }       
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }
    #------------------------------
    cond_fb = df['email'].isnull() & df['fb_id'].notnull()
    for fb_id in df.loc[cond_fb, 'fb_id']: 
        cond = df['fb_id'] == fb_id
        user_id = set(list(map(int, df.loc[cond & df['user_id'].notnull(), 'user_id'].unique())))
#         email = []
        first_name = set(df.loc[cond & df['first_name'].notnull(), 'first_name'].values)
        last_name = set(df.loc[cond & df['last_name'].notnull(), 'last_name'].values)
        registered = str(df.loc[cond & df['created'].notnull(), 'created'].min())        
        
        s = Search(using = es, index = 'users_db')
        q = [{'match': {'fb_id.keyword': fb_id}}]
        s = s.query('bool', must = q)
        r = s.execute()
        
        if len(r.hits) > 1:
            print(q)
            continue
        
        hit = r.hits[0]
        doc = {
            'entropia_id': hit.entropia_id,
            'user_id': list(set(hit.user_id) | user_id),
            'email': list(hit.email),
            'phone':list(hit.phone),
            'fb_id': list(hit.fb_id) if fb_id not in list(hit.fb_id) else list(hit.fb_id) + [fb_id],
            'first_name': list(set(hit.first_name) | first_name),
            'last_name': list(set(hit.last_name) | last_name),
            'updated': str(datetime.now()),#.date()),
            'first_activity': hit.first_activity,
            'registered': hit.registered if registered is None or hit.registered < registered else registered
        }       
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }

In [None]:
%%time
if repeat.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, to_es_dupl_repeat(repeat)))
else:
    print('Nothing to write')

## 2. Users in Orders
We split out table to 2 parts:
* Single email
* Duplicated email

In [28]:
users_o.head()

Unnamed: 0,user_id,email,phone,first_name,last_name,created
0,3.0,sooleng@compass-interactive.com,60122887526,serenity,puah,2016-12-19
1,65.0,muzamirshah@gmail.com,60192704038,muzamir shah,mohamed shariff,2016-12-19
2,63.0,muzamirshah@gmail.com,60192704038,muzamir shah,mohamed shariff,2016-12-19
3,54.0,ange.villanueva@gmail.com,60122109387,angelina,v,2016-12-19
4,45.0,hs.lee@qsrbrands.com.my,60196000570,hs,lee,2016-12-19


### Singles

In [29]:
sum(users_o['email'].isnull())

1

In [30]:
%%time
#email
tmp = users_o['email'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'email': 'cnt'}, inplace = True)
email = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] == 1].reset_index()['uid']})

single = pd.merge(users_o, email, left_on = 'email', right_on = 'uid', how = 'inner')
print(single.shape, users_o.shape)
del single['uid'], tmp, email

(438142, 7) (780854, 6)
CPU times: user 881 ms, sys: 175 µs, total: 881 ms
Wall time: 879 ms


In [31]:
%%time
exist_email = get_es_id(key = 'email')
print(len(exist_email))

797926
CPU times: user 18.1 s, sys: 74.4 ms, total: 18.2 s
Wall time: 22 s


In [32]:
%%time
email = pd.DataFrame({'uid': exist_email, 'flag_email': 1})
single = pd.merge(single, email, left_on = 'email', right_on = 'uid', how = 'left')
del email, single['uid']

repeat = single.loc[single['flag_email'] == 1].reset_index()
new = single.loc[single['flag_email'].isnull()].reset_index()
del repeat['index'], repeat['flag_email']
del new['index'], new['flag_email']
print(new.shape, repeat.shape, single.shape)
del single

(140358, 6) (297784, 6) (438142, 7)
CPU times: user 797 ms, sys: 148 µs, total: 797 ms
Wall time: 795 ms


#### 1. new ones

In [33]:
def order_to_es_single_new(df, last_id = 0):
    for i in df.index:
        last_id += 1
        yield {
            '_op_type': 'index',
            '_index': 'users_db',
            '_type': 'document',
            '_id': last_id,
            '_source': {
                'entropia_id': last_id,
                'user_id': [] if df['user_id'][i] is None or math.isnan(df['user_id'][i]) else [int(df['user_id'][i])],
                'email': [] if df['email'][i] is None else [df['email'][i]],
                'phone': [] if df['phone'][i] is None else [df['phone'][i]],
                'fb_id':[],
                'first_name':[] if df['first_name'][i] is None else [df['first_name'][i]],
                'last_name':[] if df['last_name'][i] is None else [df['last_name'][i]],
                'updated': str(datetime.now()),#.date()),
                'first_activity': str(df['created'][i]),
                'registered': None
            }
        }

In [34]:
%%time
if new.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, order_to_es_single_new(new, max_id)))
else:
    print('Nothing to write')

(140358, [])
CPU times: user 29.6 s, sys: 24.5 ms, total: 29.6 s
Wall time: 33.2 s


#### 2. repeated

In [35]:
def order_to_es_single_repeat(df):
    def get_value(v_hit, v):
        if type(v_hit) == list:
            return v_hit if v is None or v in v_hit else v_hit + [v]
        else:
            return v_hit if v is None or v_hit is None or v_hit < v else v
    for i in df.index: 
        user_id = None if df['user_id'][i] is None or math.isnan(df['user_id'][i]) else int(df['user_id'][i])
        email = df['email'][i]
        phone = df['phone'][i]
        first_name = df['first_name'][i]
        last_name = df['last_name'][i]
        first_activity = None if df['created'][i] is None else str(df['created'][i])
        #----------------------------------------
        s = Search(using = es, index = 'users_db')
        q = [{'match': {'email.keyword': df['email'][i]}}]
        s = s.query('bool', must = q)
        r = s.execute()
        #----------------------------------------
        if len(r.hits) > 1:
            print(q)
            continue
        #----------------------------------------
        hit = r.hits[0]
        doc = dict()
        if user_id is not None and user_id not in hit.user_id:
            doc['user_id'] = list(hit.user_id) + [user_id]
        if email is not None and email not in hit.email:
            doc['email'] = list(hit.email) + [email]
        if phone is not None and phone not in hit.phone:
            doc['phone'] = list(hit.phone) + [phone]
        if first_name is not None and first_name not in hit.first_name:
            doc['first_name'] = list(hit.first_name) + [first_name]
        if last_name is not None and last_name not in hit.last_name:
            doc['last_name'] = list(hit.last_name) + [last_name]
        if hit.first_activity is not None and first_activity is not None and first_activity < hit.first_activity:
            doc['first_activity'] = first_activity
        
        if doc == dict():
            continue
        doc['updated'] = str(datetime.now())       
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }

In [36]:
%%time
if repeat.shape[0] > 0:
    print(helpers.bulk(es, order_to_es_single_repeat(repeat)))
else:
    print('Nothing to write')

(297783, [])
CPU times: user 3min 41s, sys: 7.77 s, total: 3min 49s
Wall time: 5min 43s


### Duplicates

In [37]:
%%time
#email
tmp = users_o['email'].value_counts().reset_index()
tmp.rename(columns = {'index': 'uid', 'email': 'cnt'}, inplace = True)
email = pd.DataFrame({'uid': tmp.loc[tmp['cnt'] > 1].reset_index()['uid']})

dupl = pd.merge(users_o, email, left_on = 'email', right_on = 'uid', how = 'inner')
print(dupl.shape)
del tmp, email, dupl['uid']

(342711, 7)
CPU times: user 828 ms, sys: 0 ns, total: 828 ms
Wall time: 829 ms


In [38]:
%%time
exist_email = get_es_id(key = 'email')
print(len(exist_email))

938284
CPU times: user 21.8 s, sys: 116 ms, total: 21.9 s
Wall time: 26.9 s


In [39]:
%%time
email = pd.DataFrame({'uid': exist_email, 'flag_email': 1})
dupl = pd.merge(dupl, email, left_on = 'email', right_on = 'uid', how = 'left')
del email, dupl['uid']

repeat = dupl.loc[dupl['flag_email'] == 1].reset_index()
new = dupl.loc[dupl['flag_email'].isnull()].reset_index()
del repeat['index'], repeat['flag_email']
del new['index'], new['flag_email']
print(new.shape, repeat.shape, dupl.shape)
del dupl

(104981, 6) (237730, 6) (342711, 7)
CPU times: user 828 ms, sys: 148 µs, total: 828 ms
Wall time: 827 ms


#### 1. new ones

In [40]:
def order_to_es_dupl_new(df, last_id = 0):
    # email is not null by default
    print('{} different emails'.format(len(df['email'].unique())))
    i = 0
    tmp_time = time()
    for email in df['email'].unique():
        last_id += 1
        cond = df['email'] == email
        _source = {
            'entropia_id': last_id,
            'user_id': list(map(int, df.loc[cond & df['user_id'].notnull(),'user_id'].unique())),
            'email': [email],
            'phone': list(df.loc[cond & df['phone'].notnull(),'phone'].unique()),
            'fb_id': [],
            'first_name': list(df.loc[cond & df['first_name'].notnull(), 'first_name'].unique()),
            'last_name': list(df.loc[cond & df['last_name'].notnull(), 'last_name'].unique()),
            'updated': str(datetime.now()),#.date()),
            'first_activity': str(df.loc[cond & df['created'].notnull(), 'created'].min()),
            'registered': None          
        }
        i += 1
        if i % 1000 == 0:
            print('{}{}. {} min.'.format(' ' * (2 - len(str(i))), i, round((time() - tmp_time) / 60, 2)))
            tmp_time = time()            
        yield {
            '_op_type': 'index',
            '_index': 'users_db',
            '_type': 'document',
            '_id': last_id,
            '_source': _source
        }

In [41]:
%%time
if new.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, order_to_es_dupl_new(new, max_id)))
else:
    print('Nothing to write')

36116 different emails
1000. 1.0 min.
2000. 1.0 min.
3000. 0.99 min.
4000. 0.99 min.
5000. 0.99 min.
6000. 0.99 min.
7000. 0.98 min.
8000. 0.98 min.
9000. 0.98 min.
10000. 0.97 min.
11000. 0.97 min.
12000. 0.98 min.
13000. 1.03 min.
14000. 1.03 min.
15000. 0.99 min.
16000. 0.99 min.
17000. 1.01 min.
18000. 1.0 min.
19000. 1.02 min.
20000. 1.01 min.
21000. 1.0 min.
22000. 0.99 min.
23000. 0.97 min.
24000. 0.98 min.
25000. 0.98 min.
26000. 0.98 min.
27000. 0.98 min.
28000. 0.99 min.
29000. 0.98 min.
30000. 0.97 min.
31000. 0.98 min.
32000. 0.98 min.
33000. 0.98 min.
34000. 0.98 min.
35000. 0.97 min.
36000. 0.97 min.
(36116, [])
CPU times: user 35min 40s, sys: 83.3 ms, total: 35min 40s
Wall time: 35min 41s


#### 2. repeated

In [43]:
def order_to_es_dupl_repeat(df):
    # email is not null by default
    print('{} different emails'.format(len(df['email'].unique())))
    i = 0
    tmp_time = time()
    for email in df['email'].unique(): 
        cond = df['email'] == email
        user_id = set(list(map(int, df.loc[cond & df['user_id'].notnull(), 'user_id'].unique())))
        phone = set(df.loc[cond & df['phone'].notnull(), 'phone'].values)
        first_name = set(df.loc[cond & df['first_name'].notnull(), 'first_name'].values)
        last_name = set(df.loc[cond & df['last_name'].notnull(), 'last_name'].values)
        fa = str(df.loc[cond & df['created'].notnull(), 'created'].min())        
        
        s = Search(using = es, index = 'users_db')
        q = [{'match': {'email.keyword': email}}]
        s = s.query('bool', must = q)
        r = s.execute()
        
        if len(r.hits) > 1:
            print(q)
            continue
        
        hit = r.hits[0]
        doc = dict()
        if user_id is not set() and set(hit.user_id) | user_id != set(hit.user_id):
            doc['user_id'] = list(set(hit.user_id) | user_id)
        if phone is not set() and set(hit.phone) | phone != set(hit.phone):
            doc['phone'] = list(set(hit.phone) | phone)
        if first_name is not set() and set(hit.first_name) | first_name != set(hit.first_name):
            doc['first_name'] = list(set(hit.first_name) | first_name)
        if last_name is not set() and set(hit.last_name) | last_name != set(hit.last_name):
            doc['last_name'] = list(set(hit.last_name) | last_name)
        if hit.first_activity is not None and fa is not None and fa < hit.first_activity:
            doc['first_activity'] = fa
        if doc == dict():
            continue
        doc['updated'] = str(datetime.now()) 
        i += 1
        if i % 1000 == 0:
            print('{}{}. {} min.'.format(' ' * (2 - len(str(i))), i, round((time() - tmp_time) / 60, 2)))
            tmp_time = time()   
        yield {
            '_op_type': 'update',
            '_index': 'users_db',
            '_type': 'document',
            '_id': hit.entropia_id,
            'doc': doc
        }

In [44]:
%%time
if repeat.shape[0] > 0:
    max_id = get_max()
    print(helpers.bulk(es, order_to_es_dupl_repeat(repeat)))
else:
    print('Nothing to write')

90653 different emails
1000. 1.99 min.
2000. 1.99 min.
3000. 2.0 min.
4000. 2.0 min.
5000. 2.0 min.
6000. 2.0 min.
7000. 1.99 min.
8000. 2.0 min.
9000. 2.0 min.
10000. 1.99 min.
11000. 2.02 min.
12000. 1.98 min.
13000. 1.98 min.
14000. 2.0 min.
15000. 2.01 min.
16000. 1.99 min.
17000. 1.99 min.
18000. 2.0 min.
19000. 1.99 min.
20000. 1.99 min.
21000. 1.98 min.
22000. 2.0 min.
23000. 2.0 min.
24000. 1.99 min.
25000. 1.99 min.
26000. 2.02 min.
27000. 2.09 min.
28000. 2.03 min.
29000. 2.02 min.
30000. 2.02 min.
31000. 2.01 min.
32000. 2.02 min.
33000. 2.1 min.
34000. 2.36 min.
35000. 2.32 min.
36000. 2.3 min.
37000. 2.12 min.
38000. 2.06 min.
39000. 2.05 min.
40000. 2.05 min.
41000. 2.03 min.
42000. 2.04 min.
43000. 2.04 min.
44000. 2.06 min.
45000. 2.03 min.
46000. 2.06 min.
47000. 2.04 min.
48000. 2.04 min.
49000. 2.08 min.
50000. 2.08 min.
51000. 2.04 min.
52000. 2.06 min.
53000. 2.04 min.
54000. 2.03 min.
55000. 2.09 min.
56000. 2.06 min.
57000. 2.07 min.
58000. 2.07 min.
59000. 2.05 