In [None]:
import pandas as pd
import mysql.connector
from cassandra.cluster import Cluster

In [None]:
fecha = '2022-05-10'
portal = 'ZPAR'
max_retries = 10

In [None]:
db_labs = {
    'host':'34.75.227.218',
    'user':'bumeranlabs',
    'pwd':'labs123',
    'db':'bumeran_labs'
}
cass_db = '172.18.144.51'

In [None]:
def __idstuple(ids):
    s = "("
    for k in ids:
        s = s + str(k) + ','
    s = s[:-1]
    s = s + ")"
    return s

def divide_chunks(l:list, n:int):
    """ Divide una lista en chunks de tamaño n
    """
    for i in range(0, len(l), n): 
        yield l[i:i + n]
        
def portal_to_pais(portal: str):
    paises = dict({
        'ZPAR': 'AR',
        'IWBR': 'BR',
        'WIBR': 'BR',
        '24MX': 'MX',
        'PLEC': 'EC',
        'UBPE': 'UB',
        'LLVE': 'VE',
        'DVPE': 'PE',
        'CAPA': 'PA'})
    res = None
    try:
        res = paises[portal]
    except KeyError:
        #logger.error("{} no es un portal valido".format(portal))
       # raise Exception("{} no es un portal valido".format(portal))
        pass
    return res

def get_count_users_with_leads(fecha, portal):
        mydb = mysql.connector.connect( \
                           host=db_labs['host'], \
                           user=db_labs['user'], \
                           password=db_labs['pwd'], \
                           database=db_labs['db'] )
        
        pais = portal_to_pais(portal)
        
        query = "SELECT COUNT(DISTINCT idusuario) FROM bumeran_labs.contactosre WHERE pais = '{}' AND fecha > '{}'".format(pais, fecha)
        
        mycursor = mydb.cursor(buffered=True)
        #logger.info("[get_count_users_with_leads]: Ejecutando query {}".format(query))
        mycursor.execute(query)
        myresult = mycursor.fetchall()[0][0]
        #leads = pd.DataFrame(myresult, columns = ['idusuario', 'idaviso'])
    
        mycursor.close()
        #logger.info("Hay {} ids de usuario con leads desde {}.".format(myresult, fecha))

        return myresult
    
def get_users_with_leads(fecha, portal, limit=None, offset = None):
    mydb = mysql.connector.connect( \
                               host=db_labs['host'], \
                               user=db_labs['user'], \
                               password=db_labs['pwd'], \
                               database=db_labs['db'] )

    pais = portal_to_pais(portal)

    query = "SELECT idusuario, idaviso FROM bumeran_labs.contactosre WHERE pais = '{}' AND fecha > '{}'".format(pais, fecha)

    if (limit is not None):
        query = query + " LIMIT " + str(limit)
    if (offset is not None):
        query = query + " OFFSET " + str(offset)

    mycursor = mydb.cursor(buffered=True)
    #logger.info("[get_users_with_leads]: Ejecutando query {}".format(query))
    mycursor.execute(query)
    myresult = mycursor.fetchall()
    leads = pd.DataFrame(myresult, columns = ['idusuario', 'idaviso'])

    mycursor.close()
    #logger.info("Se trajeron {} ids de usuario con leads.".format(str(leads.idusuario.nunique())))

    return leads

def get_leads(fecha_desde, portal, users, limit=None, offset = None):
    mydb = mysql.connector.connect( \
                               host=db_labs['host'], \
                               user=db_labs['user'], \
                               password=db_labs['pwd'], \
                               database=db_labs['db'] )

    pais = portal_to_pais(portal)

    query = "SELECT idusuario, idaviso FROM bumeran_labs.contactosre WHERE pais = '{}' AND fecha > '{}' AND idusuario IN {} ".format(pais, fecha_desde, __idstuple(users))

    if (limit is not None):
        query = query + " LIMIT " + str(limit)
    if (offset is not None):
        query = query + " OFFSET " + str(offset)

    mycursor = mydb.cursor(buffered=True)
    #logger.info("[get_users_with_leads]: Ejecutando query {}".format(query))
    mycursor.execute(query)
    myresult = mycursor.fetchall()
    leads = pd.DataFrame(myresult, columns = ['idusuario', 'idaviso'])

    mycursor.close()
    #logger.info("Se trajeron {} ids de usuario con leads.".format(str(leads.idusuario.nunique())))

    return leads
    
def get_hits(ids_sample, portal, date_from):
    ids_list = list(divide_chunks(ids_sample, 2000))

    cluster = Cluster([cass_db], connect_timeout=3000)
    session = cluster.connect("user_history")    
    session.default_fetch_size = None
    hits_partial = []
    for i, chunk in enumerate(ids_list):
        s = __idstuple(chunk)
        query = "SELECT * FROM user_hitaviso WHERE site_id='{}' AND user_id IN {} AND date > '{}';".format(portal, s, date_from)
        for t in range(max_retries):
            try:
                rows = session.execute(query)
                hits_partial.append(pd.DataFrame(rows))
                break
            except Exception as e:
                #logger.error(str(e))
                #logger.info("[get_hits]: retrying connection {}".format(str(t)))
                print("[get_hits]: retrying connection {}".format(str(t)))
                if (t>=max_retries-1): 
                    #logger.critical("Se agotaron los retries.")
                    print("Se agotaron los retries.")
                    cluster.shutdown()
                    raise CassandraConnectionException

    cluster.shutdown()
    cass_hits = pd.concat(hits_partial)
    if (len(cass_hits)==0):
        print("[get_hits]: No hay hits para el portal {}".format(portal))
    cass_hits.columns = ['portal', 'idusuario', 'date', 'idaviso']
    cass_hits = cass_hits.drop(['portal', 'date'], axis = 1)
    #### ACA
    #cass_hits = pd.merge(self.get_recavisos(portal, cass_hits.idaviso.unique()), cass_hits, on = 'idaviso', how = 'inner')

    return cass_hits

def get_recavisos(portal, ids):
    #if ((portal!='ZPAR') and (portal!='IWBR') and (portal!='WIBR')): 
    #    db = self.bumex_dbs['RELA']
    #else:
    #    db = self.bumex_dbs[portal]

    db = db_labs

    mydb = mysql.connector.connect( \
                               host=db['host'], \
                               user=db['user'], \
                               password=db['pwd'], \
                               database=db['db'] )

    s = __idstuple(ids)

    id_pais = portal_to_idpais(portal)
    query = 'SELECT idaviso, ciudad, provincia, precio, tipodeoperacion, tipodepropiedad, habitaciones, metrostotales, iscurrent FROM recavisos WHERE idpais = {} AND precio > 100 AND idaviso IN {};'.format(id_pais, s)

    mycursor = mydb.cursor(buffered=True)
    #logger.debug("[get_recavisos]: Ejecutando query {}".format(query))
    mycursor.execute(query)
    myresult = mycursor.fetchall()
    item_feats = pd.DataFrame(myresult, columns = ['idaviso','ciudad','provincia', 'precio', 'tipodeoperacion', 'tipodepropiedad', 'habitaciones', 'metrostotales', 'iscurrent'])

    mycursor.close()

    return item_feats

def portal_to_idpais(portal: str):
    paises = dict({
        'ZPAR': '1',
        'IWBR': '2',
        'WIBR': '2',
        '24MX': '18',
        'PLEC': '9',
        'LLVE': '13',
        'UBPE': '111',
        'CAPA': '20',
        'DVPE': '11'})
    res = paises[portal]
    return res


In [None]:
segmentos_df = pd.read_pickle('data_ZPAR.pkl')

In [None]:
fecha

In [None]:
%time leads = get_leads(fecha_desde = fecha, portal = 'ZPAR', users = list(segmentos_df.idusuario))

In [None]:
leads['lead']=True

In [None]:
# Busco hits para los usuarios que tienen pocos leads asi complementamos
v = leads.idusuario.value_counts()
users_con_pocos_leads = leads[leads.idusuario.isin(v.index[v.lt(10)])]

In [None]:
%time hits = get_hits(list(users_con_pocos_leads.idusuario), portal = 'ZPAR', date_from = fecha)

In [None]:
hits['lead']=False

In [None]:
leads = pd.concat([leads,hits]).sort_values('lead', ascending=False).drop_duplicates(['idusuario', 'idaviso']).groupby('idusuario').head(100).drop('lead', axis=1)

In [None]:
leads.idusuario.value_counts()

In [None]:
v = leads.idaviso.value_counts()
leads = leads[leads.idaviso.isin(v.index[v.gt(19)])]

In [None]:
#v = leads.idusuario.value_counts()
#leads = leads[leads.idusuario.isin(v.index[v.gt(9)])]

In [None]:
# Busco hits para los usuarios que tienen pocos leads asi complementamos
#v = leads.idusuario.value_counts()
#leads = leads[leads.idusuario.isin(v.index[v.gt(9)])]

In [None]:
#leads = get_users_with_leads(fecha, portal)

In [None]:
#v = leads.idusuario.value_counts()
#leads = leads[leads.idusuario.isin(v.index[(v.gt(9))&(v.lt(100))])]

In [None]:
leads_falsos = leads.copy()

In [None]:
leads_falsos['idaviso'] = leads_falsos.idaviso.sample(frac=1).values

In [None]:
leads['Match']=1
leads_falsos['Match']=0

In [None]:
leads_falsos.shape, leads.shape

In [None]:
leads = pd.concat([leads, leads_falsos]).reset_index(drop=True)

In [None]:
leads = leads.sort_values('Match', ascending = False)

In [None]:
leads = leads.drop_duplicates(['idusuario', 'idaviso'])

In [None]:
# Para llevarlo a la forma de matriz de interacción
#leads_df = leads.pivot(index='idusuario',
#                                 columns='idaviso',
#                                 values='Match').fillna(-1)

In [None]:
len(segmentos_df.idusuario), len(set(leads.idusuario))

In [None]:
leads = leads.merge(segmentos_df[['idusuario', 'pred_label_km']], on='idusuario', how='left')

In [None]:
leads.to_pickle('leads_{}_{}.pkl'.format(portal, fecha))

In [None]:
leads.idaviso.unique()

In [None]:
%time avisos = get_recavisos(portal='ZPAR', ids = leads.idaviso.unique())

In [None]:
avisos.to_csv('avisos.csv')