## IMPORT DATA + LIBRARY

<p> Cette partie est dédiée à l'import de spark, nécéssaire à la suite du projet. Le projet à pour objectif de coder un Kmeadian, celui-ci s'utilisera sur une base de donnée binaire. La base de donnée est composée d'individus,  qui après reshape, forment des chiffes.<p>
   

In [None]:
%classpath add mvn org.apache.spark spark-sql_2.11 2.4.4
org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.ERROR);

In [None]:
%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4

In [None]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .appName("Simple Application")
                        .master("local[4]")
                        .config("spark.ui.enabled", "false")
                        .getOrCreate()
val sc = spark.sparkContext

## KMEDIAN FINAL VERSION

<p> Le code est écrit à l'aide de classes, les classes sont héréditaires. La distance utilisée est la distance de Hamming, celle-ci est codée en XOR. L'algorithme à pour paramètre une liste de Point (classe), le nombre de clusters et le nombre d'itérations. Nous avons essayé de mapReduce l'algorithme afin d'obtenir de meilleures performances.<p>

In [None]:
import scala.util.Random

case class Point(x: List[Int]){
    def hd( b: List[Int]) = { 
    var c = Array[Int]()
    var i = 0
    while(i < this.x.length) {  
        c = c :+ (this.x(i)^b(i))
        i+=1 }
    c.sum
    }
}

case class Cluster(centroid: Point, points: List[Point] = Nil)

case class ClusterBatch(clusters: List[Cluster])


def Kmedian(allPoints: List[Point], n: Int, k: Int): List[ClusterBatch]= {
    var innitCluster = initialClusters(allPoints, n)
    var initialBatch = ClusterBatch(innitCluster)
    var assignedClusters = List[Cluster]()
    var reCentered : ClusterBatch = null
    var i = 0
    while(i < k){
        assignedClusters = assignement(allPoints, initialBatch.clusters)
        reCentered = ClusterBatch(assignedClusters)
        i = i+1    
    }
    List(reCentered)
}

def assignement(allPoints: List[Point], clusters: List[Cluster]): List[Cluster] = {
    val ptsByCluster = allPoints.groupBy(p => clusters.minBy(_.centroid.hd(p.x)))
    clusters.map(c =>
        c.copy(points = ptsByCluster.getOrElse(c, Nil))
    )
}

def initialClusters(allPts: List[Point], n: Int): List[Cluster] = 
    Random.shuffle(allPts).take(n).map(pt => Cluster(pt))

def reCenter(clusters: List[Cluster]): List[Cluster] = clusters.map { c =>
    var sumVect = List[Int]()
    var newCentroid = List[Int]()
    var barycenter = List[Int]()
    sumVect = c.points.map(_.x).transpose.map(_.sum)
    barycenter = sumVect.map( j => if(j >= (c.points.size+1)/2 ){1}else{0})
    c.copy(centroid = Point(barycenter))
}

## DATA

<p> C'est ici que la base de donnée est importée, il y a quelques modifications à effectuer pour qu'elle soit intégrable à l'algorithme. <p>

In [None]:
val Digits = spark.read
  .option("header", "false")
  .option("inferSchema", "true")
  .csv("data/digits.csv") 

In [None]:
val zero = Digits.take(Digits.count.toInt).toList

In [None]:
val data = zero.map{ c=>
    var correction = List[Int]()
    for(i <- 0 until c.size){
        correction = correction :+ c(i).asInstanceOf[Int]   
    }
    correction
}

In [None]:
val dataFinal = data.map{n=>
    Point(n)
}

# KMEDIAN RESULT

<p> Voici les résultats, le Kmedian est plutot long à lançer, mais il fonctionne <p>

In [None]:
%%time

val result = Kmedian(dataFinal, 10, 50)

In [None]:
val meanDigits = result(0).clusters.map(_.centroid.x)

## DATA VIZUALISATION

<p> Nous avons décidé de reshape les centroids de chaque clusters afin de déterminer si celui-ci fonctionne bien. L'objectif est d'avoir tous les chiffes affichés (de 0 à 9). <p>

In [None]:
%%python

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from beakerx.object import beakerx

In [None]:
beakerx.meanDigitPython = meanDigits.map(_.toSeq.toArray)

In [None]:
%%python

data = pd.DataFrame(beakerx.meanDigitPython)
for i in range(len(data)):
    plt.figure(figsize=(4, 4))
    uniform_data = np.reshape(list(data.iloc[i,:]), (16, 15))
    sns.heatmap(uniform_data)

Projet réalisé par : AIT MOHEMO Hamza, MAIZA Fares, GROSJEAN Adrien, RACHATI Imad