In [1]:
import datetime as dt
time_ini = dt.datetime.now()

In [2]:
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 15 19:20:05 2019

#Proyecto Integrador
"""
### Paquetes necesarios para el funcionamiento de las funciones en este script
import logging
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.spatial import distance
from sklearn.covariance import LedoitWolf
from sklearn.tree import ExtraTreeClassifier
#%%
#from bokeh.layouts import layout
#from bokeh.plotting import figure
#from bokeh.palettes import Spectral6
#from bokeh.io import output_file, save, curdoc
#from bokeh.models import (ColumnDataSource, HoverTool, BoxZoomTool, 
#                          WheelZoomTool, ResetTool, PanTool,SingleIntervalTicker,
#                          Slider, Button, Label, CategoricalColorMapper)
#%%
logger = logging.getLogger(__name__)
#%%

### Esta funcion calcula la matriz de covarianza de Ledoit and Wolf, retorna
### la matriz de covarianza despues de aplicar el metodo de Shrinkage, ademas
### de retornar la media estimada. Esta funcion toma como parametros de entrada
### el conjunto de datos
def LedoitWolf_covMatrix(X):
    logger.info('Se realiza el calculo de la matriz de covarianza con Shrinkage')
    cov = LedoitWolf().fit(X)
    cov_matrix = cov.covariance_
    mean_vector = cov.location_
    
    return cov_matrix, mean_vector

### La funcion toma como entrada el conjunto de datos en el cual se desea
### realizar la deteccion de outliers multivariante. El metodo utiliza la
### distancia de Mahalanobis y la matriz de covarianza habitual. Se define
### alpha como el percentil en el cual se realizara el corte de la matriz de 
### distancias para determinar que registros son considerados outliers. 
def outlier_detection_mahal(X,alpha = 0.95, shrinkage = False):
    logger.info('Comienza la deteccion de outliers')
    if len(X)==0:
        logger.info('No hay datos para realizar deteccion de outliers')
        return None
    try:
        X = np.array(X)
        X_arr = np.array(X)
        X_mean = X_arr.mean(axis = 0)
        
        if shrinkage:
            cov, _ = LedoitWolf_covMatrix(X)
        else:
            cov = np.array(pd.DataFrame(X).cov())
        
        
        inv_cov = np.linalg.inv(cov)
        
        dist = []
        i=0
        while i<len(X):
            distances = distance.mahalanobis(X_arr[i], X_mean, inv_cov)
            dist.append(distances)
            i +=1
        
        dist = np.array(dist)
        cutoff = np.quantile(dist,alpha)
        outliers = (dist>cutoff).astype(int)
    except Exception as e:
        logger.info('Error en la deteccion de outliers:{0}'.format(e))
    
    logger.info('Deteccion de outliers exitoso')
    return outliers


### Esta funcion toma como entrada un dataframe con el conjunto de datos, la
### etiqueta de 1s y 0s, y realiza un stepwise en la regresion logistica para
### conservar las variables mas relevantes para explicar la etiqueta. La
### funcion devuelve una lista con las variables relevantes, y el modelo final
### de la regresion logistica
def stepwise_logistic(X,Y,alpha =0.1):
    
    try:
        n = len(X.columns)
        for i in range(n):
            model = sm.Logit(Y, X).fit()
            pvalues = pd.DataFrame(model.pvalues,columns = ['pvalues']).reset_index()
            if pvalues['pvalues'].max()<alpha:
                break
            to_drop = pvalues[pvalues['pvalues']==pvalues['pvalues'].max()]['index'].values[0]
            X = X.drop(columns = to_drop)
    
        model = sm.Logit(Y, X).fit()
        variables_relevantes = list(X.columns) 
    except Exception as e:
        logger.info('Error en el stepwise logistic:{0}'.format(e))
        model = None
        variables_relevantes = []
        
    return variables_relevantes,model

### Se construye la funcion de variables relevantes utilizando metodos de
### arboles. Esta funcion entrena un arbol para el conjunto de datos y
### retorna las variables mas relevantes para explicar la variable Y. Hay
### que tener en cuenta que esta funcion toma como entrada DataFrames, donde
### para la variable X, las columnas tienen los nombres de las variables.
def variables_relevantes_arbol(X,Y,alpha = None):
    
    if len(X)==0:
        logger.info("No se ingreso informacion de variables")
        return []
    
    features = list(X.columns)
    
    if alpha == None:
        alpha = 1.0/len(features)
        logger.info('Se calcula el valor minimo de aceptacion de importancia: {0}'.format(alpha))
    
    try:    
        model = ExtraTreeClassifier()
        model.fit(X,Y)
        
        importance = model.feature_importances_ + 0.0001   ## Para evitar overflow
        
        relevant_features = []
        for i in range(len(features)):
            if importance[i]>alpha:
                relevant_features.append(features[i])
                
    except Exception as e:
        logger.info('Error con el metodo de arboles, no se determinaron variables relevantes: {0}'.format(e))
        relevant_features = []
        
    return importance, relevant_features

### Esta funcion toma como entrada un dataframe, el nombre de las variables no
### numericas, el nombre de la columna identificadora de los registros (ID del
### cliente, pais, etc) y realiza una deteccion de outliers utilizando la 
### distancia de Mahalanobis, adicional a esto, realiza la eliminacion de
### variables redundantes en el conjunto de datos mirando la relacion lineal
### de una variable contra todas las demas
def data_preprocessing(datos, alpha_outlier_detection =0.95, columns_not_numeric = {}, 
                       column_id = '', remove_vars = True, r_sqr_threshold = 0.9, shrinkage = False):
    
    ### Realiza la identificaciohn de outliers usando la distancia de 
    ### Mahalanobis
    logger.info('Realiza deteccion de outlier en el preprocesamiento de los datos')
    outliers = outlier_detection_mahal(datos.drop(columns = columns_not_numeric),
                                       alpha = alpha_outlier_detection,
                                       shrinkage = shrinkage)
    
    ### En caso que no exista una columna identificadora para eliminar IDs
    ### outliers, se eliminan todos los registros que se encontraron como
    ### outlier    
    if column_id == '':
        logger.info('Realiza la eliminacion de outliers por registros encontrados')
        datos = datos[outliers==0].reset_index(drop = True)    
        
    else:
        logger.info('Realiza la eliminacion de outliers por IDs detectados')              
        ids_outliers = datos[outliers==1][column_id].unique()
        datos = datos[~datos[column_id].isin(ids_outliers)].reset_index(drop = True) 
    
    ### Esta seccion realiza eliminacion iterativa de variables de forma tal
    ### que el R^2 de todas las variables sea menor a un umbral dado
    if remove_vars:
        logger.info('Se realiza el analisis del R^2 de todas las variables para eliminar variables redundantes')
        to_drop = []
        X_aux = datos.drop(columns = columns_not_numeric).copy()
        while True:
            var_names = list(X_aux.columns)
            
            inver = np.linalg.inv(X_aux.cov())
    
            ### Este es el R2 de cada variable con respecto a todas las demas, 
            ### por eso seria un R2 por cada variable
            r_squared = 1 - 1/(np.diagonal(X_aux.cov())*np.diagonal(inver))
    
            if any(r_squared > r_sqr_threshold):
                
                var_m = np.argmax(r_squared)
                X_aux = X_aux.drop(columns = var_names[var_m])
                
                to_drop.append(var_names[var_m])
                logger.info('Se elimina la variable:var_names[var_m] con el umbral: {0}'.format(r_sqr_threshold))
                
            else:
                break
    
    datos = datos.drop(columns = to_drop)
    logger.info('Fin del preprocesamiento de los datos eliminando Outliers y variables redundantes')
    return datos


### Funcion de distancia p, tenga en cuenta que si p es cero, entonces se
### asume que la distancia deseada es la de Mahalanobis
def distancia(a,b, p, cov = np.array([])):
    
    if p == 0:
        inv_cov = np.linalg.inv(cov)
        dist = distance.mahalanobis(a, b, inv_cov)
    else:
        dist = np.linalg.norm(a-b, p) 
    

    return dist


### Inicializa los centroides para el metodo de kmeans, inicializa de forma
### aleatoria tantos centros como clusters
def init_centroids(X_data,k):
    logger.info('Inicializacion de {0} centroides para el metodo de kmeans'.format(k))
    centroids = []
    numdata = len(X_data)
    for i in range(k):
        centroids.append(X_data[np.random.randint(0, numdata)])

    return centroids

### Metodo de kmeans
def kmeans_old(X_data,numiter,centroids,p_dista = 2,etiquetas = [], shrinkage = False):
    logger.info('Inicializa el metodo de kmeans')
    numdata = len(X_data)    
    if len(etiquetas)==0:
        logger.info('Se crean etiquetas ya que no fueron pasadas incialmente')
         ### Etiquetas actuales de cada elemento para cada cluster
        etiquetas = np.ones(numdata)*-1   ### Inicialmente, ningun elemento esta asignado
    
    ### Grados de pertenencia a cada cluster
    grados_pertenencia = []
    
    ### Se realiza el calculo de la matriz de varianzas y covarianzas en caso
    ### de utilizar la distancia de Mahalanobis, ademas si se desea, se utiliza
    ### una version bien condicionada de la matriz utilizando el metodo de
    ### Shrinkage
    if shrinkage:      
        covariance_matrix, _ = LedoitWolf_covMatrix(X_data)
    else:      
        covariance_matrix = np.cov(X_data,rowvar=False)
        
    ### Ahora empiezo las iteraciones
    for it in range(numiter):
        logger.info('Iteracion {0} de {1} para el metodo de kmeans'.format(it, numiter))
        ### En cada iteracion, itero para todos los elementos
        for element in range(numdata):
            
            np.seterr(all='raise')
            ### Evaluo las distancias a cada centroides
            ### le sumo 0.00001 a cada distancia para evitar division sobre cero
            distc = []
            for c in centroids:
                distc.append(distancia(X_data[element], c, p_dista,covariance_matrix)+0.00001)
            

            ### Encuentro el centroide al que tiene menor distancia
            nearest_centroid = np.argmin(distc)
            
            ### Asigno el elemento a este cluster
            etiquetas[element] = nearest_centroid
            
            
            ### Recalculo el centroide 
            centroids[nearest_centroid] = np.mean(X_data[np.where(etiquetas==nearest_centroid)], axis=0)   
        
        centroids = np.array(centroids)


    ### Guardar grados de pertenencia a cada cluster
    for element in range(numdata):
        ### Evaluo las distancias a cada centroides
        ### le sumo 0.00001 a cada distancia para evitar division sobre cero
        distc = []
        for c in centroids:
            distc.append(distancia(X_data[element], c, p_dista,covariance_matrix)+0.00001)        
        grados_pert = list(np.around(1/(distc/sum(distc))/sum(1/(distc/sum(distc))),4))
        grados_pertenencia.append(grados_pert)

    logger.info('Fin del algoritmo kmeans')
    return grados_pertenencia,etiquetas,centroids


def plot_clusters_bokeh(list_x, list_y, list_pais, k, etiquetas, grados_pertenencia, 
                        title = 'Title', to_save = True):

    ### Plotear con hover tool
    
    ### Colores a usar para cada cluster
    colores = ['blue', 'yellow', 'red', 'green', 'orange', 'purple']

    
    ### Creo la herramienta de hover tool
    hover = HoverTool(tooltips=[
        ("pais","@pais"),
        ("index", "$index"),
        ("(x,y)", "(@x, @y)"),
        ("cluster_id", "@cluster_id"),
        ("Pertenencia_clusters", "@grados_p"),
        ])

    ### Creo la figura
    p = figure(plot_width=700, plot_height=500, tools=[hover, PanTool(), 
                                                       ResetTool(), BoxZoomTool(), 
                                                       WheelZoomTool()], title=title)
    
    ### PLoteo cada cluster
    for i in range(k):
        source = ColumnDataSource(data={'x':list_x[np.where(etiquetas==i)], 'y':list_y[np.where(etiquetas==i)], 'pais':list_pais[np.where(etiquetas==i)],'grados_p':grados_pertenencia[np.where(etiquetas==i)],'cluster_id':etiquetas[np.where(etiquetas==i)]})
        p.circle('x','y', size=12, 
                 fill_color=colores[i], source=source)

    ### Ploteo centroides
    #p.square(centroids_pca[:,0], centroids_pca[:,1], size=15,    fill_color='black')
    
    ### Labels (componentes principales)
    p.xaxis.axis_label = 'Componente principal 1'
    p.yaxis.axis_label = 'Componente principal 2'
    #p.xaxis.axis_label = datos.columns[-2]
    #p.yaxis.axis_label = datos.columns[-1]
    
    if to_save:
    ### Guardo el resultado
        output_file('outputs/ClustersGenerados/cluster_inicial_'+title+'.html')
        save(p)
    
    return None

def plot_cluster_bokeh_cambios(X_data_pca, list_x, list_y, list_xv, list_yv, k, 
                               cambios_variables, list_pais, grados_pertenencia, 
                               etiquetas, etiquetas_prev, centroids, 
                               centroids_viej, centroids_pca, title = 'Title',
                               to_save = True):
    
    ### Hover tool para los datos
    hover = HoverTool(tooltips=[
                                ("pais","@pais"),
                                ("index", "$index"),
                                ("(x,y)", "(@x, @y)"),
                                ("(Cambio_x,Cambio_y)", "(@xv, @yv)"),    
                                ("cluster_id", "@cluster_id"),
                                ("Pertenencia_clusters", "@grados_p"),
                                ])
    
    ### Colores a usar para cada cluster
    colores = ['blue', 'yellow', 'red', 'green', 'orange', 'purple']
    
    ### Crear la figura
    p = figure(plot_width=700, plot_height=500, tools=[hover, PanTool(), 
                                                       ResetTool(), 
                                                       BoxZoomTool(), 
                                                       WheelZoomTool()],
                                                       title=title)
    
    ### PLoteo cada conjunto
    for i in range(k):
        source = ColumnDataSource(data={'x':list_x[np.where(etiquetas==i)], 
                                        'y':list_y[np.where(etiquetas==i)], 
                                        'xv':list_xv[np.where(etiquetas==i)], 
                                        'yv': list_yv[np.where(etiquetas==i)],
                                        'pais':list_pais[np.where(etiquetas==i)],
                                        'grados_p':grados_pertenencia[np.where(etiquetas==i)],
                                        'cluster_id':etiquetas[np.where(etiquetas==i)]})
        p.circle('x','y', size=12, 
                 fill_color=colores[i], source=source)
    
    
    ### Veo cuales cambiaron de cluster
    etiquetas_cambios = np.where(etiquetas_prev-etiquetas != 0)
    etiqs = etiquetas[etiquetas_cambios]
    
    ### PLoteo los elementos de cada conjunto que cambiaron de cluster
    
    ### Listas para los elementos que cambiaron de cluster
    X_data_cambios = X_data_pca[etiquetas_cambios]
    list_x = X_data_cambios[:,0]
    list_y = X_data_cambios[:,1]
    list_xv = cambios_variables[:,0]
    list_yv = cambios_variables[:,1]
    list_pais = list_pais[etiquetas_cambios]
    grados_pertenencia = grados_pertenencia[etiquetas_cambios]
    
    ## Plotear elementos que cambiaron de cluster
    for i in range(k):
        source = ColumnDataSource(data={'x':list_x[np.where(etiqs==i)], 
                                        'y':list_y[np.where(etiqs==i)], 
                                        'xv':list_xv[np.where(etiqs==i)], 
                                        'yv': list_yv[np.where(etiqs==i)],
                                        'pais':list_pais[np.where(etiqs==i)],
                                        'grados_p':grados_pertenencia[np.where(etiqs==i)],
                                        'cluster_id':etiqs[np.where(etiqs==i)]})
        p.square('x','y', size=6, 
                 fill_color='white', source=source)
    
    
    
    ### Ploteo centroides
    
    ### Listas para centroiodes
    cambios_centroids = centroids_viej - centroids
    list_x = centroids_pca[:,0]
    list_y = centroids_pca[:,1]
    list_xv = cambios_centroids[:,0]
    list_yv = cambios_centroids[:,1]
    
    # Plotar centroids    
    source = ColumnDataSource(data={'x':list_x, 'y':list_y, 'xv':list_xv, 'yv': list_yv})
    #    p.square('x','y', size=15,             fill_color='black',source=source)
    
    ### Labels de la grafica (componentes principales)
    p.xaxis.axis_label = 'Componente principal 1'
    p.yaxis.axis_label = 'Componente principal 2'
    #p.xaxis.axis_label = datos.columns[-2]
    #p.yaxis.axis_label = datos.columns[-1]
    
    
    if to_save:
        ### Guardar resultados
        output_file('outputs/ClustersGenerados/cluster'+title+'.html')
        save(p)
    
    return None










### Para ploteo de datos con estilo de gapminder
def gapminder_plot_bokeh(datos_e, datos_pca, year_i, X_data_df, grad_per,
                         etiquetas_glo, periodos_incluir, k, imp_periods_var,
                         centroids_ite, scaler_es,
                         title = 'Titulo',
                         xlabel='Eje x',
                         ylabel='Eje y'):
    
    

    
    
    
    ### Lista years
    years_plot = []
    for o in range(periodos_incluir+1):
        years_plot.append(year_i + o)
        
    ### Dataframes necesarias
    pca1 = pd.DataFrame(columns = years_plot)
    pca2 = pd.DataFrame(columns = years_plot)
    
    ### PCA de cada year
    for year in years_plot:
        filtro = datos_e['Date']==year
        
        ### Los que usare para el PCA seran
        X_data_pca_y = np.array(datos_pca[filtro])
        
        pca1[year] =  X_data_pca_y[:,0]
        pca2[year] =  X_data_pca_y[:,1]
    
    ### Nombres de los individuos
    pca1.index = X_data_df.country
    pca2.index = X_data_df.country
    
    ### Grados de pertenencia
    grados_pert = pd.DataFrame(columns = years_plot)
    
    
    ##### Grados de pertenencia de cada year
    coun = 0
    for year in years_plot:    
        grados_pert[year] =  np.max(grad_per[coun], axis=1)*40  ### Aumento escala para que se vean bien
        coun = coun+1
    grados_pert.index = X_data_df.country
    
     
    ##### Cluster al que pertenece cada dato en cada periodo de tiempo
    etiqs_plot = []
    couu = 0
    for year in years_plot:
        eti = pd.DataFrame()
        eti['region'] = [str(i)[0] for i in list(etiquetas_glo[couu])] ### Solo 1 caracter
        eti.index = X_data_df.country
        
        etiqs_plot.append(eti)
        couu = couu+1
    
    
    ##### Regions_list son los id de los cluster
    regions_list = []
    for cu in range(k):
        regions_list.append(str(cu))
    
    
    ### fertility df seria componente principal 1
    ### life expectancy df seria componente principal 2
    ### population_df_size es el maximo grado de pertenencia
    ### regions_df es el cluster id al que se asigno cada uno
    ### years es la lista de years a modelar
    ### regions list seria el "nombre " de cada cluster (top variables mas importantes)
    
    
    
    
    ### Consolidar data
    df = pd.concat({'Componente_1': pca1,
                    'Componente_2': pca2,
                    'Grado_Pertenencia': grados_pert},
                   axis=1)
        
        
    
    ### Construir data
    data = {}
    
    counta = 0
    for year in years_plot:
        df_year = df.iloc[:,df.columns.get_level_values(1)==year]
        df_year.columns = df_year.columns.droplevel(1)
        data[year] = df_year.join(etiqs_plot[counta].region).reset_index().to_dict('series')
        counta = counta+1
    
    
    source = ColumnDataSource(data=data[years_plot[0]])




    ############### Para las labels ########################
    
    #### Numero de variables a plotear
    num_v_plot = 4
    
    #### Nombres variables
    nomb_v = datos_e.columns[2:]
    
    #### Desestandarizar centroides
    centroids_ito = scaler_es.inverse_transform(centroids_ite)
    
    #### Consolidar strings de las legends de cada iteracion
    strings_legends = []
    c=0
    for y in years_plot:
        esta_iter = []
        estas_imp = imp_periods_var[c]
        cc = 0
        for clu in estas_imp:
            ### Variables mas importantes
            orden_v = np.argsort(clu)[::-1][:num_v_plot]
            
            ### Construir string
            stri = ''
            
            ### Numero observaciones cluster
            stri = stri + 'Num_obs: '+str(len(np.where(etiquetas_glo[c]==cc)[0])) +', '
            
            ### Variables importantes
            for i in orden_v:
                stri = stri+nomb_v[i][:12]+': ' + str(np.around(centroids_ito[c][cc][i],2))+', '
            stri=stri[:-2]
            esta_iter.append(stri)
            cc=cc+1
        strings_legends.append(esta_iter)
        c=c+1



    #### PLoteos    
    global plot
    plot = figure(title=title, y_range=(-5, 7), plot_height=520, plot_width = 900)
    plot.xaxis.ticker = SingleIntervalTicker(interval=1)
    plot.xaxis.axis_label = xlabel
    plot.yaxis.ticker = SingleIntervalTicker(interval=20)
    plot.yaxis.axis_label = ylabel
    

    label = Label(x=1.1, y=18, text=str(years_plot[0]), text_font_size='70pt', text_color='#eeeeee')
    plot.add_layout(label)
    
    color_mapper = CategoricalColorMapper(palette=Spectral6, factors=regions_list)
    global r
    
    r = plot.circle(
        x='Componente_1',
        y='Componente_2',
        size='Grado_Pertenencia',
        source=source,
        fill_color={'field': 'region', 'transform': color_mapper},
        fill_alpha=0.8,
        line_color='#7c7e71',
        line_width=0.5,
        line_alpha=0.5,
#        legend_group='region',
    )
    
    from bokeh.models import Legend, LegendItem
    
    global legend   
    
    items_son=[]
    co = 0
    for a in strings_legends[0]:
        color_ =  list(etiquetas_glo[0]).index(co)
        items_son.append(LegendItem(label=a, renderers=[r], index=color_))
        co=co+1
        
    legend = Legend(items=items_son)
    plot.add_layout(legend)    
    

    plot.add_tools(HoverTool(tooltips="@country", show_arrow=False, point_policy='follow_mouse'))    


    def animate_update():
        year = slider.value + 1
        if year > years_plot[-1]:
            year = years_plot[0]
        slider.value = year
    
    
    def slider_update(attrname, old, new):
        year = slider.value
        label.text = str(year)
        source.data = data[year]
        pos = years_plot.index(year)
        global legend
        global r
        global plot

    
        items_son=[]
        bo = 0
        for a in strings_legends[pos]:
            color_ =  list(etiquetas_glo[pos]).index(bo)
            items_son.append(LegendItem(label=a, renderers=[r], index=color_))
            bo=bo+1
        legend.items = items_son
        plot.add_layout(legend)   
        

        
    slider = Slider(start=years_plot[0], end=years_plot[-1], value=years_plot[0], step=1, title="Year")
    slider.on_change('value', slider_update)
    
    callback_id = None
    
    def animate():
        global callback_id
        if button.label == '► Play':
            button.label = '❚❚ Pause'
            callback_id = curdoc().add_periodic_callback(animate_update, 1000)
        else:
            button.label = '► Play'
            curdoc().remove_periodic_callback(callback_id)
    
    button = Button(label='► Play', width=60)
    button.on_click(animate)
    
    layout_plot = layout([
        [plot],
        [slider, button],
    ])
    
    
    curdoc().add_root(layout_plot)
    curdoc().title = "Gapminder"

    return None

In [4]:
import os
import numpy as np
import pandas as pd
from  sklearn.preprocessing import StandardScaler as StdScaler
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler as SparkStdScaler


In [5]:
## Loading spark session
#sc = pyspark.SparkContext()
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)

In [6]:
spark.conf.set("spark.sql.execution.arrow.enabled","false")

In [7]:
### Leemos los datos
datos = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("dbfs:/mnt/mountblob/data_gapminder_proc2.csv")


In [8]:
### Nombres variables
### Variable names
nomb_vars = datos.columns[2:]

### removing . from variable names
for n in nomb_vars:
    datos = datos.withColumnRenamed(n,n.replace(".",""))
    

In [9]:
datos.toPandas().head(3)

Unnamed: 0,country,Date,population_growth_annual_percent,population_total,children_per_woman_total_fertility,population_density_per_square_km,total_gdp_us_inflation_adjusted,gdp_per_capita_yearly_growth,gdppercapita_us_inflation_adjusted,children_and_elderly_per_100_adults,life_expectancy_years,child_mortality_0_5_year_olds_dying_per_1000_born,gdp_total_yearly_growth,income_per_person_gdppercapita_ppp_inflation_adjusted
0,Albania,1980,2.05,2680000,3.62,97.9,5640000000.0,0.706,2110.0,70.2,72.3,77.8,2.81,4390.0
1,Albania,1981,2.0,2740000,3.53,99.8,5960000000.0,0.536,2190.0,68.9,72.4,72.0,2.56,4400.0
2,Albania,1982,2.11,2790000,3.45,102.0,6140000000.0,0.55,2210.0,67.8,72.5,66.8,2.49,4410.0


In [10]:
#### Preprocesamiento de datos
#datos = funciones_pyspark.data_preprocessing(datos, alpha_outlier_detection =0.96, 
datosp = datos.toPandas()
datos = data_preprocessing(datosp, alpha_outlier_detection =0.96, 
                                     columns_not_numeric = {'country','Date'},
                                     column_id = 'country')

### Estandarizo todos los datos
datos_e = datos.copy()
scaler_es = StdScaler()
scaler_es.fit(datos_e[datos_e.columns[2:]])


In [11]:
datosp.head()

Unnamed: 0,country,Date,population_growth_annual_percent,population_total,children_per_woman_total_fertility,population_density_per_square_km,total_gdp_us_inflation_adjusted,gdp_per_capita_yearly_growth,gdppercapita_us_inflation_adjusted,children_and_elderly_per_100_adults,life_expectancy_years,child_mortality_0_5_year_olds_dying_per_1000_born,gdp_total_yearly_growth,income_per_person_gdppercapita_ppp_inflation_adjusted
0,Albania,1980,2.05,2680000,3.62,97.9,5640000000.0,0.706,2110.0,70.2,72.3,77.8,2.81,4390.0
1,Albania,1981,2.0,2740000,3.53,99.8,5960000000.0,0.536,2190.0,68.9,72.4,72.0,2.56,4400.0
2,Albania,1982,2.11,2790000,3.45,102.0,6140000000.0,0.55,2210.0,67.8,72.5,66.8,2.49,4410.0
3,Albania,1983,2.12,2840000,3.38,104.0,6210000000.0,0.584,2180.0,66.8,72.6,61.9,2.54,4430.0
4,Albania,1984,2.1,2900000,3.32,106.0,6130000000.0,0.569,2110.0,65.8,72.8,57.4,2.68,4440.0


In [12]:
datos_new = spark.createDataFrame(datos)
### Vector Assembler in pyspark
assembler = VectorAssembler(inputCols=datos_new.columns[2:],outputCol="features")
datos_new = assembler.transform(datos_new)

### Standard Scaler in pyspark
scaler = SparkStdScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler = scaler.fit(datos_new)
datos_new = scaler.transform(datos_new)

datos_e_cols = list(datos_e.columns)

scaled_pdf = datos_new.select("scaledFeatures").toPandas()
for i in range(2,len(datos_e_cols)):
    datos_e[datos_e_cols[i]] = scaled_pdf["scaledFeatures"].apply(lambda x:x[i-2])
    
### PCA
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca = pca.fit(datos_new)

datos_new = pca.transform(datos_new)
pca_pdf = datos_new.select('pcaFeatures').toPandas()

pca_pdf['pcaFeatures_1'] = pca_pdf['pcaFeatures'].apply(lambda x:x[0])
pca_pdf['pcaFeatures_2'] = pca_pdf['pcaFeatures'].apply(lambda x:x[1])

# print(pca_pdf.head())

datos_pca = np.array(pca_pdf[['pcaFeatures_1','pcaFeatures_2']])

In [13]:
datos_pca

In [14]:
### Fijar seed aleatoria
np.random.seed(1)

In [15]:
##### Inicio tomando los del primer year
year_i = min(datos_e['Date'].values.tolist())   ### Year inicial a considerar
filtro = datos_e['Date']==year_i
X_data_df = datos_e[filtro].reset_index(drop=True)
X_data = np.array(X_data_df[X_data_df.columns[2:]])


### Numero de periodos que incluire en el estudio, sin incluir el inicial
periodos_incluir = max(datos_e['Date']) - min(datos_e['Date']) - 1

### Los que usare para el PCA seran
X_data_pca = np.array(datos_pca[filtro])


In [16]:
#### Lista donde ire guardando las listas de grados de pertenencia 
grad_per = []

### Lista donde ire guardando las etiquetas asignadas del cluster
etiquetas_glo = []

### Lista donde ire guardando las variables mas importantes por cluster y por periodo
imp_periods_var = []

### Lista donde ire guardando los centroides de cada iteracion
centroids_ite = []


## Numero de observaciones en cada periodo
numdata = len(X_data)
  

### Define cantidad de clusters, numero maximo de iteraciones, y la distancia
### que se utilizara en el metodo de kmeans
k = 3
numiter = 5
p_dista = 2   ### 0 para mahalanobis


#### Inicializar los centroides
centroids = init_centroids(X_data,k)

centroids_arr = np.array(centroids)
centroids_map = map(lambda x: (int(x[0]), Vectors.dense(x[0:])), centroids_arr)
centroids_spark_df = spark.createDataFrame(centroids_map,schema=["ind", "scaledFeatures"])
centroids_spark_df = pca.transform(centroids_spark_df)

pca_cent_pdf = centroids_spark_df.select('pcaFeatures').toPandas()
pca_cent_pdf['pcaFeatures_1'] = pca_cent_pdf['pcaFeatures'].apply(lambda x:x[0])
pca_cent_pdf['pcaFeatures_2'] = pca_cent_pdf['pcaFeatures'].apply(lambda x:x[1])

centroids_pca = np.array(pca_cent_pdf[['pcaFeatures_1','pcaFeatures_2']])


print(centroids_pca)

In [17]:
### Para la fase 2 de las importancias
centroids_p = centroids.copy()

In [18]:
### Aplicar kmeans (version vieja)
grados_pertenencia20,etiquetas20,centroids20 = kmeans_old(X_data,
                                                          numiter,
                                                          centroids,
                                                          p_dista = p_dista)


In [19]:
### Aplicar k means de pyspark (version nueva)
from pyspark.ml.clustering import KMeans

### Transformar el numyp array a un spark dataframe
pand_x = pd.DataFrame(X_data)
X_data_sp = spark.createDataFrame(pand_x)

### Nombre de columnas
lisi=[]
for i in pand_x.columns:
    lisi.append(str(i))

### Transformar a assembler
vecAssembler = VectorAssembler(inputCols=lisi,outputCol="features")
df_kmeans = vecAssembler.transform(X_data_sp).select('features')


# Trains a k-means model.
kmeans = KMeans().setK(k).setSeed(40)
#kmeans = KMeans().setK(k)
model = kmeans.fit(df_kmeans)

### Predecir etiquetas
predictions = model.transform(df_kmeans)

In [20]:

### Recuperar las 3 salidas en el mismo formato del k means anterior

### Etiquetas
etiquetas = predictions.toPandas()['prediction'].values

### Centroids
centroids = np.array(model.clusterCenters())

### Grados pertenencia: no es salida de kmeans de pyspark, la pongo en una cte, solo se usa para plot, no importara
grados_pertenencia = []
for ji in range(len(X_data)):
    lio=[]
    for li in range(k):
        lio.append(0.5)
    grados_pertenencia.append(lio) 

In [21]:
### Guardo grados de pertenencia
grad_per.append(grados_pertenencia.copy())

### Guardo etiquetas
etiquetas_glo.append(etiquetas.copy())

### Guardo centroids
centroids_ite.append(centroids.copy())

### Variable global donde ire guardando la importancia de las variables en cada 
### iteracion
imp_iters = []


### Obtener la importancia de las variables de cada cluster (de mayor a menor)
importancias_cluster = []

### Para cada cluster
for clu in range(k):
    ### Dejo solo las observaciones de cada cluster
    datax_i = pd.DataFrame(X_data)
    datay_i = etiquetas.copy()
    #### Solo clasifico binario si si pertence o no a cada cluster
    distintos_cluster = np.where(datay_i!=clu)
    ### Lo que no pertenece al cluster, lo pongo en -1
    datay_i[distintos_cluster] = -1
    datay_i = pd.DataFrame(datay_i)
    ### Calcular relevancias
    relevancias, _ = variables_relevantes_arbol(datax_i, datay_i, 0)
#     print(relevancias)
    importancias_cluster.append(relevancias)


In [22]:
##### Calculo los promedios de importancia de cada variable
imp_clus_prom =  np.mean(importancias_cluster, axis=0)

### Guardo las importancias de esta iteracion
imp_iters.append(imp_clus_prom)


### Guardo importancias generales (para el plot)
imp_periods_var.append(importancias_cluster)



In [23]:
X_data.shape

In [24]:

###############################################################################
################## Ahora, empiezo a iterar para t >=2 #########################
###############################################################################

#X_data_df = datos_e[filtro].reset_index(drop=True)
#X_data = np.array(X_data_df[X_data_df.columns[2:]])
#periodos_incluir = 5
for periodos in range(periodos_incluir):
    
    print(periodos)
    ### Guardo la X_data anterior
    X_data_viej = X_data.copy()
    centroids_viej = centroids.copy()
    

    ### Los datos para este year ya serian
    X_data_df = datos_e[datos_e['Date']==year_i+1+periodos]
    X_data = np.array(X_data_df[X_data_df.columns[2:]])
    
    

    #### Obtener los 2 componentes principales de los datos para plotear estos
    X_data_pca = np.array(datos_pca[datos_e['Date']==year_i+1+periodos])
    
    ### Calulo los cambios en las variables
    cambios_variables = X_data_viej - X_data    
    
    ###########################################################################
    ######################### Ponderacion dinamica ############################
    ###########################################################################
    
    ### Pondero X_data
    X_data_ori = X_data.copy()  ### X data original (sin ponderacion)
    
    
    ### Obtengo la importancia promedio de cada variable (promedio de todas las
    ### iteraciones)
    importancia_prom = np.mean(imp_iters, axis=0)
    
    ### Rankeo las variables de menor a mayor importancia
    rank_variables = np.argsort(importancia_prom)
    rankpeso_variables = np.zeros(len(rank_variables))
    
    cont = 0
    for i in rank_variables:
        rankpeso_variables[i] = (cont+1)/len(rank_variables)
        cont= cont+1

    #### Usar rankings o usar los promedios para el peso
    peso_variables = importancia_prom.copy()*100  ### Escalarlos con 100 para reducir errores numericos

    
    ### Escalo entonces la X para cambiar los pesos (segun las importancias)
    X_data_pond = X_data.copy()
    for peso in range(len(peso_variables)):
        X_data_pond[:,peso] = X_data_pond[:,peso] * peso_variables[peso]

    
    ###########################################################################
    ######################## K means para los plots ###########################
    ###########################################################################
    
    ### Etiquetas actuales de cada elemento para cada cluster
    etiquetas_prev = etiquetas.copy()
    
    ###########################################################################
    #################### Clusters con k means ponderado #######################
    ###########################################################################
    
#    grados_pertenencia,etiquetas,centroids = kmeans_old(X_data_pond,
#                                                              numiter,
#                                                              centroids,
#                                                              p_dista = p_dista,
#                                                              etiquetas = etiquetas)


    
    ### Transformar el numyp array a un spark dataframe
    pand_x = pd.DataFrame(X_data_pond)
    X_data_sp = spark.createDataFrame(pand_x)

    ### Nombre de columnas
    lisi=[]
    for i in pand_x.columns:
        lisi.append(str(i))

    ### Transformar a assembler
    vecAssembler = VectorAssembler(inputCols=lisi,outputCol="features")
    df_kmeans = vecAssembler.transform(X_data_sp).select('features')


    # Trains a k-means model.
    kmeans = KMeans().setK(k).setSeed(periodos)
   #kmeans = KMeans().setK(k)
    
    model = kmeans.fit(df_kmeans)

    ### Predecir etiquetas
    predictions = model.transform(df_kmeans)    



    ### Recuperar las 3 salidas en el mismo formato del k means anterior

    ### Etiquetas
    etiquetas = predictions.toPandas()['prediction'].values

    ### Centroids
    centroids = np.array(model.clusterCenters())

    ### Grados pertenencia: no es salida de kmeans de pyspark, la pongo en una cte, solo se usa para plot, no importara
    grados_pertenencia = []
    for ji in range(len(X_data_pond)):
        lio=[]
        for li in range(k):
            lio.append(0.5)
        grados_pertenencia.append(lio)     




    ### Guardo grados de pertenencia
    grad_per.append(grados_pertenencia.copy())

    ### Guardo etiquetas
    etiquetas_glo.append(etiquetas.copy())

    ### Guardo centroids
    centroids_ite.append(centroids.copy()*(1/peso_variables))





    ###### Esta importancia la necesito para los labels
    ### Obtener la importancia de las variables de cada cluster (de mayor a menor)
    importancias_cluster = []
    ### Para cada cluster
    for clu in range(k):
        ### Dejo solo las observaciones de cada cluster
        datax_i = pd.DataFrame(X_data_pond)
        datay_i = etiquetas.copy()
        
        #### Solo clasifico binario si si pertence o no a cada cluster
        distintos_cluster = np.where(datay_i!=clu)
        
        ### Lo que no pertenece al cluster, lo pongo en -1
        datay_i[distintos_cluster] = -1
        datay_i = pd.DataFrame(datay_i)
        
        ### Calcular relevancias
        relevancias, _ = variables_relevantes_arbol(datax_i, datay_i, 0)
        
        importancias_cluster.append(relevancias)

    ### Guardo importancias generales (para el plot)
    imp_periods_var.append(importancias_cluster)


    
    ###########################################################################
    ################ K means para la seleccion de variables ###################
    ###########################################################################
    
    ###### Para la proxima iteracion, los pesos
#    grados_pertenencia_p,etiquetas_p,centroids_p = kmeans_old(X_data_ori,
#                                                                    numiter,
#                                                                    centroids_p,
#                                                                    p_dista = p_dista,
#                                                                    etiquetas = etiquetas)

    
   

    ### Transformar el numyp array a un spark dataframe
    pand_x = pd.DataFrame(X_data_ori)
    X_data_sp = spark.createDataFrame(pand_x)

    ### Nombre de columnas
    lisi=[]
    for i in pand_x.columns:
        lisi.append(str(i))

    ### Transformar a assembler
    vecAssembler = VectorAssembler(inputCols=lisi,outputCol="features")
    df_kmeans = vecAssembler.transform(X_data_sp).select('features')


    # Trains a k-means model
    kmeans = KMeans().setK(k).setSeed(periodos)
    #kmeans = KMeans().setK(k)
    model = kmeans.fit(df_kmeans)

    ### Predecir etiquetas
    predictions_p = model.transform(df_kmeans)    


    ### Recuperar las 3 salidas en el mismo formato del k means anterior

    ### Etiquetas
    etiquetas_p = predictions_p.toPandas()['prediction'].values

    ### Centroids
    centroids_p= np.array(model.clusterCenters())
  

    
    #####################################


    
    
    
    ### Obtener la importancia de las variables de cada cluster (de mayor a menor)
    importancias_cluster = []
    ### Para cada cluster
    for clu in range(k):
        ### Dejo solo las observaciones de cada cluster
        datax_i = pd.DataFrame(X_data_ori)
        datay_i = etiquetas_p.copy()
        
        #### Solo clasifico binario si si pertence o no a cada cluster
        distintos_cluster = np.where(datay_i!=clu)
        
        ### Lo que no pertenece al cluster, lo pongo en -1
        datay_i[distintos_cluster] = -1
        datay_i = pd.DataFrame(datay_i)
        
        
        ### Calcular relevancias
        relevancias, _ = variables_relevantes_arbol(datax_i, datay_i, 0)
        
        importancias_cluster.append(relevancias)
    
    ### Calculo los promedios de importancia de cada variable
    imp_clus_prom =  np.mean(importancias_cluster, axis=0)
    
    ### Guardo las importancias de esta iteracion
    imp_iters.append(imp_clus_prom)



In [25]:
#centroids_pca[5]

In [26]:
centroids_ite

In [27]:

###############################################################################
########################### NUEVA VISUALIZACION  ##############################
###############################################################################

#funciones_pyspark.gapminder_plot_bokeh(datos_e, datos_pca, year_i, X_data_df, grad_per,
#                         etiquetas_glo, periodos_incluir, k, imp_periods_var,
#                         centroids_ite, scaler_es,
#                         title = 'Gapminder data',
#                         xlabel='Componente principal 1',
#                         ylabel='Componente principal 2')

#    
#from bokeh.plotting import output_file, save
#output_file("test.html")
#save(layout)
#


In [28]:
df1= pd.DataFrame(datos_e)
df1.to_csv("/dbfs/mnt/mountblob/logs/datos_e.csv",header=True,index= False)

df2= pd.DataFrame(datos_pca)
df2.to_csv('/dbfs/mnt/mountblob/logs/datos_pca.csv',header=True,index= False)

df3= pd.DataFrame(X_data_df)
df3.to_csv('/dbfs/mnt/mountblob/logs/X_data_df.csv',header=True,index= False)

df4= pd.DataFrame(grad_per)
df4.to_csv('/dbfs/mnt/mountblob/logs/grad_per.csv',header=True,index= False)

df5= pd.DataFrame(etiquetas_glo)
df5.to_csv('/dbfs/mnt/mountblob/logs/etiquetas_glo.csv',header=True,index= False)

df6= pd.DataFrame(imp_periods_var)
df6.to_csv('/dbfs/mnt/mountblob/logs/imp_periods_var.csv',header=True,index= False)

#Los siquientes parámetros son números sin estructura de dataframe

dsimple = pd.DataFrame(columns=('year_i', 'k', 'periodos_incluir', 'scaler_es','dim_centroids_ite'))

dimension = np.array(centroids_ite).shape
largo = dimension[2]*dimension[1]

dsimple.loc[len(dsimple)] = [ year_i, k, periodos_incluir,scaler_es,dimension]
df7= pd.DataFrame(dsimple)
df7.to_csv('/dbfs/mnt/mountblob/logs/yeari_k_periods_scaler_es_dimcentroi.csv',header=True,index= False)

centros = np.array(centroids_ite).reshape(largo,-1)
df8= pd.DataFrame(centros)
df8.to_csv('/dbfs/mnt/mountblob/logs/centroids_ite.csv',header=True,index= False)

datos = pd.DataFrame(datos_pca, columns=['component_1', 'component_2'])
datos = pd.concat([datos_e, datos], axis=1, ignore_index=True)
from tabulate import tabulate
print(tabulate(datos, headers='keys', tablefmt="psql"))
datos.to_csv('/dbfs/mnt/mountblob/logs/datos.csv', header = True, index= False)

In [29]:
time_fin = dt.datetime.now()

In [30]:
time_ini

In [31]:
time_fin

In [32]:
time_ini-time_fin

In [33]:
83773/3600