In [1]:
import dask.dataframe as dd
import dask.array as da
import pandas as pd
import html
import re
from sklearn.feature_extraction.text import strip_accents_ascii, strip_accents_unicode
import matplotlib.pyplot as plt
import math
from datetime import datetime

%matplotlib inline
numcores = 16
tiene_gpu = False
pd.set_option('display.max_columns', 99)
pd.set_option('display.max_rows', 100)

import os 



In [2]:
start_time = datetime.now()

# Este es el root en el servidor de jupyter
data_root = 'C:/code/hotelmapping/data/'


## Deshabilito los future wanings ya que se resolverá en el futuro

In [3]:
# import warnings filter
from warnings import simplefilter
# ignore all future warnings
simplefilter(action='ignore', category=FutureWarning)


### Cargo datos

In [4]:
properties_clean_file = os.path.join(data_root, 'travcoding/Properties_clean.parquet')
providers_clean_file = os.path.join(data_root, 'travcoding/Providers_clean.parquet')

# Cargo los archivos
inventory_ddf = dd.read_parquet(properties_clean_file, engine='pyarrow')
provider_ddf  = dd.read_parquet(providers_clean_file, engine='pyarrow')


In [5]:
# print("Providers =",provider_ddf.index.size.compute())
# print("Inventaory =",inventory_ddf.index.size.compute())


### Genero las estadisticas de las palabras de la columna del nombre de la propiedad y de cada registro

In [6]:

def get_word_stats(ddf, colum_name,key_name):            
    ddf1 = ddf[colum_name].str.split().explode()
    ddf2 = ddf1.value_counts().reset_index().rename(columns={'index': 'word',colum_name:'cantidad'})                           
    ddf2['freq'] = (ddf2.cantidad/ddf2.cantidad.sum())
    ddf2['idf']  = (da.log(1/ddf2.freq))
    ddf3= ddf1.to_frame().reset_index().merge(ddf2, how='left', left_on=colum_name, right_on='word')[[key_name,'idf']]
    ddf4=ddf3.groupby(by=[key_name]).agg({'idf': ['count','sum','max','mean'] }).reset_index()
    ddf4.columns = ["_".join(x) for x in ddf4.columns.ravel()]
    ddf4 = ddf4.rename(columns={(key_name+'_'): key_name}).set_index(key_name)    
    return ddf2, ddf4


In [7]:
column_name = 'propertyname'
keycol_name = 'PropertyId'
inventory_words_stats_ddf, inventory_propertyname_stats_ddf = get_word_stats(inventory_ddf, column_name, keycol_name)

print ("Inventory word stats = ",inventory_words_stats_ddf.index.size.compute())
print("Inventory property name stats =", inventory_propertyname_stats_ddf.index.size.compute())
#inventory_propertyname_stats_ddf.head()


Inventory word stats =  193628
Inventory property name stats = 523935


In [8]:

column_name = 'propertyname'
keycol_name = 'PropertyByProviderId'
provider_words_stats_ddf, provider_propertyname_stats_ddf = get_word_stats(provider_ddf, column_name, keycol_name)


print ("Provider word stats = ",provider_words_stats_ddf.index.size.compute())
print("Provider property name stats =", provider_propertyname_stats_ddf.index.size.compute())
#provider_propertyname_stats_ddf.head()



Provider word stats =  204948
Provider property name stats = 703021


In [9]:
#inventory_propertyname_stats_ddf.head()

### Genera Stopwords y Regex de stopwords 

In [10]:

# Devuelve una lista con los stopwords a partir del df de estadisticas
def get_stopwords_list(word_stats_ddf,cantidad_minima):
    stopwords = []
    for stopword in word_stats_ddf[word_stats_ddf.cantidad>cantidad_minima].word:
        stopwords.append(stopword.rstrip('\n').lower())    
    return stopwords

In [11]:
cantidad_minima = 200
property_name_stopwords = get_stopwords_list(inventory_words_stats_ddf,cantidad_minima)
property_name_stopwords_regex = re.compile(r'\b(' + r'|'.join(property_name_stopwords) + r')\b')

print("# Stopwords =",len(property_name_stopwords))
print("#  palabras total =", len(inventory_words_stats_ddf.index))


# Stopwords = 927
#  palabras total = 193628


In [12]:
#Save Stopwords
#Inventory_words_stats[Inventory_words_stats.cantidad>cantidad_minima].compute().sort_values('cantidad', ascending=False).to_csv('data/travcoding/stopwords.csv') 


# Blocking

## BUGS
Hayq ue revisar esta función porque devuelve menos registros que la tabla


In [13]:
def get_blocking_df( data_ddf, 
                blocking_column, 
                stopwords_regex, 
                aditional_blocking_column='countrycorregido'):       
    return data_ddf[blocking_column].str.replace(stopwords_regex,'').str.split().explode().dropna()\
        .to_frame().rename(columns={ blocking_column :  'value'})\
        .merge(data_ddf, left_index=True, right_index=True)[['value', aditional_blocking_column]].reset_index().drop_duplicates()
        #.compute()
    

    

In [14]:
blocking_column = 'propertyname'    


### Genera indice para el inventario

In [15]:
#property_name_stopwords_regex
inv_indx_ddf = get_blocking_df(inventory_ddf,blocking_column,property_name_stopwords_regex)
#inv_indx = inv_indx.reset_index()
# print("Inventory  = ", inventory_ddf.index.size.compute())
# print("Index rows = ", inv_indx_ddf.index.size.compute())


### Genera indice para proveedores

In [16]:
prv_indx_ddf = get_blocking_df(provider_ddf,blocking_column,property_name_stopwords_regex)
#print("Index rows = ", prv_indx_ddf.index.size.compute())
#prv_indx_ddf.head(3)



### Genera candidatos

In [17]:
# Arma las combinaciones de los indices
candidates_ddf = dd.merge(inv_indx_ddf, prv_indx_ddf, how='inner', left_on=['value','countrycorregido'], right_on=['value','countrycorregido']).repartition(npartitions=16)

# Agrega la sestadistica de palabras
candidates_ddf = candidates_ddf.merge(inventory_words_stats_ddf, how='left', left_on='value', right_on='word')

# Agrego candidad de palabras y suma de IDF
candidates_ddf = candidates_ddf.groupby(['PropertyId','PropertyByProviderId']).agg({'idf': ['count','sum','max','mean'] }).reset_index().repartition(npartitions=16)

#Aplano los niveles del multiindex
candidates_ddf.columns = ["_".join([z.strip() for z in x if z.strip()]) for x in candidates_ddf.columns.ravel()]


In [18]:
#candidates_ddf.index.size.compute()


In [19]:
# candidates_ddf = candidates_ddf.sort_values('idf_sum',ascending=False)
# candidates_ddf.head()

In [20]:

#inventory_ddf.loc['739246'].compute()
	


In [21]:
#provider_ddf.loc['468971'].compute()
	

## Grabo los candidatos

In [22]:
# File path a los archivos
pair_candidates_file = os.path.join(data_root, 'travcoding/pair_candidates.parquet')    
candidates_ddf.to_parquet(pair_candidates_file, engine='pyarrow')


In [23]:
#pair_candidates2.head()

In [24]:

import csv
filename = os.path.join(data_root, 'travcoding/addresslong.csv')    
# candidates_pd.head(10000)[['address_x','address_y']].to_csv(filename,header=False, sep="\t", encoding='utf-8',quotechar='"', quoting=csv.QUOTE_ALL, index=None)  


# filename = os.path.join(data_root, 'travcoding/propertylong.csv')    
# candidates_pd.head(10000)[['propertyname_x','propertyname_y']].to_csv(filename,header=False, sep="\t", encoding='utf-8',quotechar='"', quoting=csv.QUOTE_ALL, index=None)  






In [25]:
time_elapsed = datetime.now() - start_time
print('Time elapsed (hh:mm:ss.ms) {}'.format(time_elapsed))

Time elapsed (hh:mm:ss.ms) 0:01:13.575175
