In [1]:
import pandas as pd
import numpy as np

In [2]:
PATH_TO_DATA = "/media/horace/OS/Users/Horace/Documents/Centrale/3A/plp/taxi_fares/ProjetPLP/"

In [3]:
import findspark
findspark.init("/home/horace/spark-2.3.1-bin-hadoop2.7/")

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
from operator import add

def read_data(nrows=10**4):
    return (spark
            .read
            .format("csv")
            .option("header","true")
            .load(PATH_TO_DATA + "train.csv")
            .limit(nrows)
            .drop("key")
            .drop("pickup_datetime")
            .rdd
            .map(lambda r: np.array(r, dtype=float))
           )

def standardize(data):
    m = data.mean()
    s = np.sqrt(data.map(lambda a: (a - m)**2).reduce(add) / data.count())
    return data.map(lambda a: (a - m) / s)

data = read_data()
data.take(4)

[array([  4.5     , -73.844311,  40.721319, -73.84161 ,  40.712278,
          1.      ]),
 array([ 16.9     , -74.016048,  40.711303, -73.979268,  40.782004,
          1.      ]),
 array([  5.7     , -73.982738,  40.76127 , -73.991242,  40.750562,
          2.      ]),
 array([  7.7     , -73.98713 ,  40.733143, -73.991567,  40.758092,
          1.      ])]

In [5]:
def assign_cluster(x, centers):
    center_ix = 0
    min_dist = np.inf
    for i, c in enumerate(centers):
        new_dist = np.sum((x - c)**2)
        if new_dist < min_dist:
            min_dist = new_dist
            center_ix = i
    return center_ix

centers = data.takeSample(False, 3, seed=42)
centers

[array([  5.7     , -73.988873,  40.727102, -74.002665,  40.739143,
          1.      ]),
 array([  7.7     , -73.950808,  40.831861, -73.987036,  40.776786,
          1.      ]),
 array([  8.5       , -74.00982666,  40.71247482, -73.99652863,
         40.7322731 ,   1.        ])]

In [6]:
labeled_points = data.map(lambda x: (assign_cluster(x, centers), (x, 1)))
labeled_points.take(5)

[(0, (array([  4.5     , -73.844311,  40.721319, -73.84161 ,  40.712278,
            1.      ]), 1)),
 (2, (array([ 16.9     , -74.016048,  40.711303, -73.979268,  40.782004,
            1.      ]), 1)),
 (0, (array([  5.7     , -73.982738,  40.76127 , -73.991242,  40.750562,
            2.      ]), 1)),
 (1, (array([  7.7     , -73.98713 ,  40.733143, -73.991567,  40.758092,
            1.      ]), 1)),
 (0, (array([  5.3     , -73.968095,  40.768008, -73.956655,  40.783762,
            1.      ]), 1))]

On voit qu'ici, ce qui domine la classification est le _fare amount_ (première composante)

In [7]:
sum_clusters = (labeled_points
            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))  # sum coordinates and keep count
           )
sum_clusters.collect()

[(0, (array([  17565.01      , -245378.87488157,  135074.24921016,
          -245452.80644183,  135076.7668574 ,    5522.        ]), 3400)),
 (2, (array([  84830.03      , -379638.79974669,  208872.70347547,
          -379777.87779615,  208959.86191631,    8720.        ]), 5253)),
 (1,
  (array([  9959.6       , -99648.9302082 ,  55257.53080745, -99510.25648403,
           54896.1800131 ,   2205.        ]), 1347))]

In [8]:
clusters = sum_clusters.map(lambda c: (c[0], c[1][0] / c[1][1])).collect()  # take the mean
clusters

[(0, array([  5.16617941, -72.17025732,  39.72772036, -72.19200189,
          39.72846084,   1.62411765])),
 (2, array([ 16.14887302, -72.2708547 ,  39.76255539, -72.29733063,
          39.77914752,   1.66000381])),
 (1, array([  7.3939124 , -73.97841886,  41.02266578, -73.87546881,
          40.75440239,   1.63697105]))]

On rassemble les requêtes dans une fonction

In [15]:
def update_centers(data, centers):
    return (data
     .map(lambda x: (assign_cluster(x, centers), (x, 1)))  # label each point
     .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))  # sum coordinates and keep count
     .map(lambda c: (c[0], c[1][0] / c[1][1]))  # take the mean
     .collect()
    )

update_centers(data, centers)

[(0, array([  5.16617941, -72.17025732,  39.72772036, -72.19200189,
          39.72846084,   1.62411765])),
 (2, array([ 16.14887302, -72.2708547 ,  39.76255539, -72.29733063,
          39.77914752,   1.66000381])),
 (1, array([  7.3939124 , -73.97841886,  41.02266578, -73.87546881,
          40.75440239,   1.63697105]))]

In [22]:
def iterate_centers(data, k, tol=100, max_iter=100, seed=40):
    centers = data.takeSample(False, k, seed=seed)
    cost = tol + 1
    nb_iter = 0
    while cost > tol and nb_iter < max_iter:
        new_centers = update_centers(data, centers)
        cost = sum(np.sum((centers[ix] - x)**2) for (ix, x) in new_centers) / k
        for ix, x in new_centers:
            centers[ix] = x
        nb_iter += 1
    return centers, nb_iter

class KMeans:
    def __init__(self, k=3, tol=1, max_iter=100, random_state=40):
        self.k = k
        self.tol = tol
        self.centers = None
        self.nb_iter = None
        self.max_iter = max_iter
        self.random_state = random_state
        
    def fit(self, data):
        self.centers, self.nb_iter = iterate_centers(data, self.k, 
                                                     tol=self.tol, 
                                                     max_iter=self.max_iter, 
                                                     seed=self.random_state)
        return self
    
    def predict(self, data):
        return data.map(lambda x: assign_cluster(x, self.centers))

data = standardize(read_data())
km = KMeans(3, 0.001)
y = km.fit(data).predict(data)
pd.Series(y.collect()).value_counts()

0    7891
2    1407
1     702
dtype: int64

In [23]:
km.nb_iter

9

In [24]:
y.take(5)

[0, 0, 0, 0, 0]

In [25]:
km.centers

[array([-0.23087409,  0.01458125, -0.01804591,  0.01627396, -0.01648188,
        -0.38099263]),
 array([ 2.89106344, -0.03776958,  0.03219866, -0.0481574 ,  0.05117503,
        -0.11719441]),
 array([-0.14749989, -0.06304311,  0.0852903 , -0.06736218,  0.06702248,
         2.19889345])]

In [26]:
data.take(5)

[array([-0.70279833, -0.12985434,  0.10942999, -0.12926455,  0.12918747,
        -0.50717233]),
 array([ 0.59105452, -0.1460419 ,  0.10806142, -0.14227668,  0.14018595,
        -0.50717233]),
 array([-0.57758676, -0.14290217,  0.11488885, -0.14340852,  0.13522633,
         0.27950726]),
 array([-0.36890082, -0.14331615,  0.11104561, -0.14343924,  0.1364141 ,
        -0.50717233]),
 array([-0.61932395, -0.14152195,  0.11580952, -0.14013918,  0.14046325,
        -0.50717233])]