In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.clustering import GaussianMixture
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.rdd import RDD
import numpy as np
import time

In [None]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('Distributed Learning of Finite Gaussian Mixtures') \
                    .getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc)
#     .config("spark.driver.memory", "15g") \

In [None]:
# Importation des packages
import time
from numpy import array
# Répertoire courant ou répertoire accessible de tous les "workers" du cluster
DATA_PATH=""

In [None]:
# Chargement des fichiers
import urllib.request
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_train.csv",DATA_PATH+"mnist_train.csv")
f = urllib.request.urlretrieve("https://www.math.univ-toulouse.fr/~besse/Wikistat/data/mnist_test.csv",DATA_PATH+"mnist_test.csv")

In [None]:
trainRDD = sc.textFile(DATA_PATH+"mnist_train.csv").map(lambda l: [float(x) for x in l.split(',')])
testRDD = sc.textFile(DATA_PATH+'mnist_test.csv').map(lambda l: [float(x) for x in l.split(',')])

In [None]:
def list_to_Row(l):    
    #Creation d'un vecteur sparse pour les features
    features = Vectors.sparse(784,dict([(i,v) for i,v in enumerate(l[:-1]) if v!=0]))
    row = Row(label = l[-1], features= features)
    return row
def Split_per(j):
            def Split_per_index(i, iter):
                return iter if i == j else []
            return Split_per_index


In [None]:
trainDF = trainRDD.map(list_to_Row).toDF()
testDF = testRDD.map(list_to_Row).toDF()

In [None]:
labels = trainDF.select(trainDF.columns[0])
labels.show(5)

In [None]:
X = trainDF.select(trainDF.columns[1])
X.show(5)

In [None]:
class Gaussian_mixture_reduction :
    
    def __init__(self ) :
        self.map_means , self.map_covs , self.map_weights , self.Means , self.Covaes ,  self.Weights = [None]*6
        
    def fit(self , X  , n_components , n_partition):
        self.X            = X
        self.X.cache()
        N = self.X.count()
        self.n_components = n_components
        self.n_partition  = n_partition
        Means_global, Covars_global , Weights_global= [[]]*3
        def Split_per(j):
            def Split_per_index(i, iter):
                return iter if i == j else []
            return Split_per_index
        global_time_start = time.time()
        for n_per in range(self.X.rdd.getNumPartitions()):
            start = time.time()
            rdd_per = (self.X).rdd.mapPartitionsWithIndex(Split_per(n_per)).coalesce(1).repartition(n_partition).toDF() ; M= rdd_per.rdd.count()
            model =  GaussianMixture().setK(10).setSeed(538009335).fit(rdd_per) ; rdd_per = None 
            Means_global.append( [model.gaussians[k].mu.array for k in range(self.n_components)  ])
            Covars_global.append( [model.gaussians[i].sigma.toArray() for i in range(self.n_components) ])
            Weights_global.append( np.array(model.weights)* (M/N )) ; model = None 
            end = time.time() 
            print ('fitting partitioned model{0} take '.format(i) , end - start , "seconde")
        global_time_end = time.time()
        print ('training take  ', global_time_end- global_time_start  , "seconde")
        self.map_means = np.array(Means_global)
        self.map_covs = np.array( Covars_global)
        self.map_weights = np.array( Weights_global)
        start = time.time()
        self.majorization_minimization() 
        end = time.time()
        print ('GMR algorithm take  ', global_time_end- global_time_start  , "seconde")
        self.Means_broadcast = sc.broadcast(tuple(map(DenseVector, self.Means))) 
        self.Covars_broadcast = sc.broadcast(tuple(map(lambda x : DenseMatrix(2,2, x) , list(map(np.ravel ,self.Covars )) )))
        self.Weights_broadcast = sc.broadcast(self.Weights.tolist()) 
        return self
        
    def Gaussian_distance(self , Gaussian_1 , Gaussian_2 , which="W2"):
        mu1, Sigma1 = Gaussian_1 ; mu2, Sigma2 = Gaussian_2
        if which == "KL":
            m0, S0 = Gaussian_1 ;  m1, S1 = Gaussian_2
            N = m0.shape[0] ; iS1 = np.linalg.pinv(S1) ; diff = m1 - m0
            tr_term   = np.trace(np.dot(iS1 , S0)) ; det_term  = np.log(np.linalg.det(S1)/np.linalg.det(S0))
            quad_term = np.dot(np.dot(diff.T , np.linalg.pinv(S1) ), diff) 
            return .5 * (tr_term + det_term + quad_term - N)
        elif which == "W2":
            if mu1.shape[0] == 1 or mu2.shape[0] == 1: # 1 dimensional
                W2_squared = (mu1 - mu2)**2 + (np.sqrt(Sigma1) - np.sqrt(Sigma2))**2
                W2_squared = np.asscalar(W2_squared)
            else: # multi-dimensional
                sqrt_Sigma1 = linalg.sqrtm(Sigma1)
                Sigma = Sigma1 + Sigma2 - 2 * linalg.sqrtm(sqrt_Sigma1 @ Sigma2 @ sqrt_Sigma1)
                W2_squared = np.linalg.norm(mu1 - mu2)**2 + np.trace(Sigma) + 1e-13
                return np.sqrt(W2_squared)
        else:
            raise ValueError("This ground distance is not implemented!")
        
    def Joint_Distribution(self):
        K   = self.n_components ;   MK  = self.n_partition*self.n_components ; dis = np.zeros((MK,K)) ; PI  = np.zeros_like(dis) 
        for i in range(MK):
            dis[i , :]= np.array(list(map(lambda aggr_params : np.absolute(self.Gaussian_distance([self.map_means[i], 
                       self.map_covs[i]], aggr_params , which="KL")) , zip(self.Means, self.Covars))))
            PI[i,np.argmin(dis[i, :], axis=0)] = self.map_weights[i]
        distance = (PI *dis).sum()
        return  PI , distance
    
    def majorization_minimization(self) :
        K   = self.n_components ; MK  = self.n_partition*self.n_components ; d = self.d
        self.Means  = KMeans(n_clusters = self.n_components, random_state=0).fit(self.map_means).cluster_centers_
        self.Covars = np.full((self.n_components,d,d) , np.identity(d)) 
        for it in range(self.n_iters):
            PI , dis = self.Joint_Distribution() 
            PI_mu= (PI/PI.sum(0))
            for k in range(self.n_components):
                self.Means[k]  = np.dot(PI_mu[:,k] , self.map_means)
                self.Covars[k] = np.sum(PI_mu[:,k][:, np.newaxis , np.newaxis]*self.map_covs, axis =0)
                + np.sum(PI_mu[:,k][:, np.newaxis ,np.newaxis ] * np.array(list(map(lambda row : row[:,np.newaxis]*row, 
                                                self.map_means-self.Means[k]))) ,axis = 0)
        self.Weights = PI.sum(0)
        return self.Means , self.Covars ,  self.Weights
    
    def score(self , data ):
        Means = self.Means ; Covars = self.Covars ; Weights = self.Weights ; n_components = self.n_components
        def local_score( data , Means = Means , Covars = Covars , Weights = Weights , n_components = n_components):
            data = np.vstack(data)
            if data.ndim == 1: data = data[:, np.newaxis]
            if data.size == 0: return np.array([]), np.empty((0, n_components))
            if data.shape[1] != Means.shape[1]: raise ValueError('The shape of x is not compatible with self')
            min_covar=1.e-7 ; n_samples, n_dim = data.shape ; nmix = n_components ; log_prob = np.empty((n_samples, nmix))
            for c, (mu, cv) in enumerate(zip(Means, Covars)):
                try: cv_chol = linalg.cholesky(cv, lower=True)
                except linalg.LinAlgError:
                    try: cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim),lower=True)
                    except linalg.LinAlgError: raise ValueError("'covars' must be symmetric, ""positive-definite")
                cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol)))
                cv_sol = linalg.solve_triangular(cv_chol, (data - mu).T, lower=True).T
                log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) + n_dim * np.log(2 * np.pi) + cv_log_det)
            log_prob +=np.log(Weights) ; log_prob = logsumexp(log_prob, axis=1)
            return log_prob
        return data.mapPartitions(local_score).collect()
        
    def predictSoft(self, x):
        weights =self.Weights_broadcast.value  ; means = self.Means_broadcast.value ; sigmas =self.Covars_broadcast.value
        if isinstance(x, RDD):
            membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector), _convert_to_vector(weights), means, sigmas)
            return membership_matrix.map(lambda x: pyarray.array('d', x))
        else:
            raise TypeError("data should be a RDD, received %s" % type(x)) 
            
    def predict(self, x):
        if isinstance(x, RDD):
            cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
            return cluster_labels.collect()
        else:
            raise TypeError("data should be a RDD, received %s" % type(x)) 

In [None]:
model = Gaussian_mixture_reduction().fit(X , 10 , 12)



In [None]:
pip uninstall pyspark


In [None]:
y