In [1]:

import pandas as pd
import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

# Verifico la correcta instalación. Si no falla el import está OK
!pwd
!python3 --version
import metnum


/home/dago/Desktop/Facultad/MN/MNTp3bis/MNTp3/tp3/notebooks
Python 3.7.6


# K-Fold


In [2]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_squared_log_error

def K_Fold(X, y, K=10):
    
    kfold = KFold(n_splits=K)
    #kfold.get_n_splits(X)
    rmse = []
    rmsle = []
    deleted = []
    for train_index, test_index in kfold.split(X):
        # Separamos el conjunto de entrenamiento y el conjunto de test
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        linear_regressor = metnum.LinearRegression()
        linear_regressor.fit(X_train, y_train) 
        y_pred = linear_regressor.predict(X_test)
        data = {"y_pred": y_pred.T[0], "y_test": y_test.T[0]}
        df_temp = pd.DataFrame(data)
            
        deleted.append(100 * (y_pred < 0).sum() / len(y_pred))
        df_temp2 = df_temp[df_temp["y_pred"]>=0]
        y_test = df_temp2["y_test"].values
        y_pred = df_temp2["y_pred"].values
        rmsle.append(mean_squared_log_error(y_test, y_pred))
        rmse.append(mean_squared_error(y_test, y_pred, squared=False))
        
    return np.mean(rmse), np.mean(rmsle), np.mean(deleted), np.max(deleted)

# Data and plots

In [3]:
def fecha_parser(df):
    df['fecha'] = (pd.to_datetime(df['fecha']) - pd.to_datetime(df['fecha'].min())).dt.days

def complete_metrostotales(df):
    df['metrostotales'] = df['metrostotales'].fillna(df['metroscubiertos'])
    
def fill_banosterrenos(df):
    df['banos'] = df.apply(
        lambda row: 0 if row['tipodepropiedad']=='Terreno' and np.isnan(row['banos']) else row['banos'],
        axis=1
    )
    df['banos'] = df.apply(
        lambda row: 0 if row['tipodepropiedad']=='Terreno comercial' and np.isnan(row['banos']) else row['banos'],
        axis=1
    )

def poblacion():
    df = pd.read_csv('../data/poblacion.csv', names=['poblacion', 'ciudad'])
    return df
    
def get_data(filename='../data/train.csv'):
    df = pd.read_csv(filename)
    fecha_parser(df)
    complete_metrostotales(df)
    fill_banosterrenos(df)
    df = df[(df['lat'] > 13) & (df['lat'] < 34) & (df['lng'] >= -117) & (df['lng'] <= -87)]
    dfPob = poblacion()
    dfJoin = pd.merge(df, dfPob, how='left')
    return dfJoin

In [4]:

class BubbleChart:
    def __init__(self, area, bubble_spacing=0):
        """
        Setup for bubble collapse.

        Parameters
        ----------
        area : array-like
            Area of the bubbles.
        bubble_spacing : float, default: 0
            Minimal spacing between bubbles after collapsing.

        Notes
        -----
        If "area" is sorted, the results might look weird.
        """
        area = np.asarray(area)
        r = np.sqrt(area / np.pi)

        self.bubble_spacing = bubble_spacing
        self.bubbles = np.ones((len(area), 4))
        self.bubbles[:, 2] = r
        self.bubbles[:, 3] = area
        self.maxstep = 2 * self.bubbles[:, 2].max() + self.bubble_spacing
        self.step_dist = self.maxstep / 2

        # calculate initial grid layout for bubbles
        length = np.ceil(np.sqrt(len(self.bubbles)))
        grid = np.arange(length) * self.maxstep
        gx, gy = np.meshgrid(grid, grid)
        self.bubbles[:, 0] = gx.flatten()[:len(self.bubbles)]
        self.bubbles[:, 1] = gy.flatten()[:len(self.bubbles)]

        self.com = self.center_of_mass()

    def center_of_mass(self):
        return np.average(
            self.bubbles[:, :2], axis=0, weights=self.bubbles[:, 3]
        )

    def center_distance(self, bubble, bubbles):
        return np.hypot(bubble[0] - bubbles[:, 0],
                        bubble[1] - bubbles[:, 1])

    def outline_distance(self, bubble, bubbles):
        center_distance = self.center_distance(bubble, bubbles)
        return center_distance - bubble[2] - \
            bubbles[:, 2] - self.bubble_spacing

    def check_collisions(self, bubble, bubbles):
        distance = self.outline_distance(bubble, bubbles)
        return len(distance[distance < 0])

    def collides_with(self, bubble, bubbles):
        distance = self.outline_distance(bubble, bubbles)
        idx_min = np.argmin(distance)
        return idx_min if type(idx_min) == np.ndarray else [idx_min]

    def collapse(self, n_iterations=50):
        """
        Move bubbles to the center of mass.

        Parameters
        ----------
        n_iterations : int, default: 50
            Number of moves to perform.
        """
        for _i in range(n_iterations):
            moves = 0
            for i in range(len(self.bubbles)):
                rest_bub = np.delete(self.bubbles, i, 0)
                # try to move directly towards the center of mass
                # direction vector from bubble to the center of mass
                dir_vec = self.com - self.bubbles[i, :2]

                # shorten direction vector to have length of 1
                dir_vec = dir_vec / np.sqrt(dir_vec.dot(dir_vec))

                # calculate new bubble position
                new_point = self.bubbles[i, :2] + dir_vec * self.step_dist
                new_bubble = np.append(new_point, self.bubbles[i, 2:4])

                # check whether new bubble collides with other bubbles
                if not self.check_collisions(new_bubble, rest_bub):
                    self.bubbles[i, :] = new_bubble
                    self.com = self.center_of_mass()
                    moves += 1
                else:
                    # try to move around a bubble that you collide with
                    # find colliding bubble
                    for colliding in self.collides_with(new_bubble, rest_bub):
                        # calculate direction vector
                        dir_vec = rest_bub[colliding, :2] - self.bubbles[i, :2]
                        dir_vec = dir_vec / np.sqrt(dir_vec.dot(dir_vec))
                        # calculate orthogonal vector
                        orth = np.array([dir_vec[1], -dir_vec[0]])
                        # test which direction to go
                        new_point1 = (self.bubbles[i, :2] + orth *
                                      self.step_dist)
                        new_point2 = (self.bubbles[i, :2] - orth *
                                      self.step_dist)
                        dist1 = self.center_distance(
                            self.com, np.array([new_point1]))
                        dist2 = self.center_distance(
                            self.com, np.array([new_point2]))
                        new_point = new_point1 if dist1 < dist2 else new_point2
                        new_bubble = np.append(new_point, self.bubbles[i, 2:4])
                        if not self.check_collisions(new_bubble, rest_bub):
                            self.bubbles[i, :] = new_bubble
                            self.com = self.center_of_mass()

            if moves / len(self.bubbles) < 0.1:
                self.step_dist = self.step_dist / 2

    def plot(self, ax, labels, colors):
        """
        Draw the bubble plot.

        Parameters
        ----------
        ax : matplotlib.axes.Axes
        labels : list
            Labels of the bubbles.
        colors : list
            Colors of the bubbles.
        """
        for i in range(len(self.bubbles)):
            circ = plt.Circle(
                self.bubbles[i, :2], self.bubbles[i, 2], color=colors[i])
            ax.add_patch(circ)
            ax.text(*self.bubbles[i, :2], labels[i],
                    horizontalalignment='center', verticalalignment='center')

def plot_bubble(df, title=''):
    bubble_chart = BubbleChart(area=(df['value']),
                               bubble_spacing=0.1)

    bubble_chart.collapse()

    fig, ax = plt.subplots(subplot_kw=dict(aspect="equal"))
    bubble_chart.plot(
        ax, df['name'], df['color'])
    ax.axis("off")
    ax.relim()
    ax.autoscale_view()
    ax.set_title(f'Peso de cada feature en la predicción{title}')

    plt.show()

In [5]:
def plot_prediction(df, columns, column_to_show='metrostotales', column_to_predict='precio', kfold_flag=False, title=''):
    df = df.reindex(columns=(columns+[column_to_predict])).dropna()
    y = df[column_to_predict].values
    df = df.drop(columns=[column_to_predict])
    X = df.to_numpy()
    print('Size: ', X.shape[0])
    #from sklearn.linear_model import LinearRegression
    

    col_avg=np.array(df.mean())


    #linear_regressor = LinearRegression()
    linear_regressor = metnum.LinearRegression()
    linear_regressor.fit(X,y)

    df['prediccion'] = linear_regressor.predict(X)
    # print(df[df['prediccion']<0])
    
    coef = linear_regressor.coef()
    print(list(zip(df.columns, coef.T[0], coef.T[0] * col_avg)))
    
    data = { "name": columns, "value": coef.T[0] * col_avg }
    dfForBubble = pd.DataFrame(data)
    print(dfForBubble)
    dfForBubble['color'] = dfForBubble.apply(
        lambda row: '#9CE699' if row['value'] >= 0 else '#DD6E77',
        axis=1
    )
    dfForBubble['value'] = np.abs(dfForBubble['value'])
    plot_bubble(dfForBubble, title)

    print((df['prediccion'] < 0).any())
    
    sns.scatterplot(x=df[column_to_show], y=y).set_title(f"{column_to_predict} en función de {column_to_show}{title}")
    sns.lineplot(data=df, x=column_to_show, y='prediccion', color='red', label=f'predicción de {column_to_predict}')
    
    if (kfold_flag):
        X = X.reshape(len(X),len(columns))
        y = y.reshape(len(y),1)

        rmse, rmsle, avg_deleted, max_deleted = K_Fold(X, y, 10) # 10% de las muestras para testear
        print(rmse, '  ', rmsle, '  ', avg_deleted,"%", '  ', max_deleted )