In [1]:

import numpy as np
from sklearn import datasets

from NPIR import NPIR
import pandas as pd
from sklearn.datasets import make_blobs
import datetime
import warnings
from sklearn import metrics
from collections import Counter as Cs
from timeit import default_timer as timer
from datetime import timedelta
import matplotlib.pyplot as plt


import findspark
findspark.init()
from pyspark import SparkContext
from operator import *
from pyspark import StorageLevel

from pyspark.sql.types import IntegerType, FloatType, BooleanType, StringType, StructType,\
StructField,ArrayType, DataType
from pyspark.sql.functions import udf, log, rand, monotonically_increasing_id, col, broadcast,\
greatest, desc, asc, row_number, avg, mean, least, struct, lit, sequence, sum
from functools import reduce

import pyspark.sql.functions as F

from pyspark.sql import SparkSession, SQLContext, Window, Row, DataFrame
from pyspark import SparkConf

In [2]:
spark = SparkSession.builder.master("local[*]").config("spark.sql.broadcastTimeout", "30000s").\
config("spark.network.timeout","30000s").config("spark.executor.heartbeatInterval","12000000ms").\
config("spark.storage.blockManagerSlaveTimeoutMs","12000001ms").config("spark.driver.maxResultSize","4g").\
config("spark.default.parallelism", "100").config("spark.memory.offHeap.enabled","true").\
config("spark.memory.offHeap.size", "4g").appName("NPIR_Parallel").getOrCreate()

In [3]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [4]:
#read csv
data_spark_df = spark.read.format('csv').option('header','False').option('index','False').load('blobs.csv')
data_spark_df.count()

500

In [5]:
data_spark_df = data_spark_df.select('_c0', '_c1')

In [6]:
new_name = ['first', 'second']
data_spark_rdd = data_spark_df.toDF(*new_name).rdd.filter(lambda x:x)
data_spark_df = data_spark_df.toDF(*new_name)

In [7]:
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
spark.conf.set('spark.jars.packages','com.databricks:spark-cav_2.11')
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.execution.arrow.enabled", "False")
sqlContext.setConf("spark.sql.shuffle.partitions", "10")

In [8]:
points = data_spark_df.count()
points

500

In [9]:
#IR: The indexing ratio to be used for generating the maximum index
IR = 0.2
#The number of iteration i
i = 5
k = 3 #k: Number of clusters
# count = Cs()
chunk = 100

In [10]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

data_spark = data_spark_df.withColumn('index_column_name', row_number().\
                                          over(Window.orderBy(monotonically_increasing_id())) - 1)
# data_spark.show()


In [11]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [12]:
leaderheadr = ['chunkLabel', 'old label', 'count']
leaderheadr.extend([str(x) for x in range(1, len(data_spark.columns))])
leaderheadr = tuple(leaderheadr)
leaderheadr

('chunkLabel', 'old label', 'count', '1', '2')

In [13]:
labelsheader = ('chunkLabel', 'label')
labelsheader

('chunkLabel', 'label')

In [14]:
labels = sqlContext.createDataFrame([np.full(len(labelsheader), np.nan).tolist()],labelsheader)
labels = labels.na.drop()

leaders = sqlContext.createDataFrame([np.full(len(leaderheadr), np.nan).tolist()],leaderheadr)
leaders = leaders.na.drop()

ii = 0
for z in range(0, points, chunk):
    j = z + chunk
    data = data_spark.where(col("index_column_name").between(z, j-1)).toPandas()
    data.drop("index_column_name",axis=1,inplace=True)
    data = data.astype(float)
    from NPIR import NPIR
    label = NPIR(data.values,k,IR,i)
    data['labels'] = label
    
    # Adding to pyspard label
    chunklabel = np.full(len(label), ii).tolist()
    labelDF = [(x, y) for x, y in zip(chunklabel, label)]
    labelsDF = sqlContext.createDataFrame(labelDF,labelsheader)
    labels = unionAll(labels, labelsDF)

    leader = []
    f = list(Cs(label))
    f.sort()
    for i in f:
        leader.append([round(np.mean(z), 4) for z in data[data['labels']==i].values[:,:-1].T])
    del data
    del NPIR
    
    # Adding to pyspark leaders
    for x in range(len(leader)):
        x1 = [ii, x, Cs(label)[x]]
        x1.extend(leader[x])
        leader[x] = x1
    leaderDF = sqlContext.createDataFrame(leader,leaderheadr)
    leaders = unionAll(leaders, leaderDF)
    ii += 1

In [15]:
data_spark = leaders.withColumn("first_numeric", leaders["1"].cast(FloatType()))
data_spark = data_spark.withColumn("second_numeric", data_spark["2"].cast(FloatType()))
data_spark = data_spark.drop('1').drop('2').drop('chunkLabel').drop('old label')
count = data_spark.select('count')
data_spark = data_spark.drop('1').drop('2').drop('chunkLabel').drop('old label').drop('count')
data_spark

DataFrame[first_numeric: float, second_numeric: float]

In [16]:
count = count.withColumn('Cindex', row_number().\
                                          over(Window.orderBy(monotonically_increasing_id())) - 1)
count

DataFrame[count: double, Cindex: int]

In [17]:
def MyCheckUpdate(a, b, c, d):
    a = float(a)
    b = float(b)
    c = float(c)
    d = float(d)
    res = (a-c) + (b-d)
    if res == 0:
        return 1.0
    return 0.0

check_centroid = udf(lambda x,y,z,r: MyCheckUpdate(x,y,z,r), FloatType())

def squaree1(c,u,f,g):
    c = float(c)
    u = float(u)
    f = float(f)
    g = float(g)
    array1 = np.array([c,u])
    array2 = np.array([f,g])
    dist = np.linalg.norm(array1-array2)
    dist = dist.item()
    return dist

squaree_spark1 = udf(lambda x,y,z,r: squaree1(x,y,z,r), FloatType())
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

DataFrame[key: string, value: string]

In [18]:
df_centroid = data_spark.sample(False, 0.4,seed = 0).limit(1).cache()

new_name = ['x','y']
df_centroid = df_centroid.toDF(*new_name)

#just for first round
#first time
i = 0
data_cent = data_spark.join(broadcast(df_centroid))
data_cent1 = data_cent.withColumn(str(i),squaree_spark1(data_cent.columns[0],data_cent.columns[1],
                                              data_cent.columns[2*i+2],data_cent.columns[2*i+3]))
data_cent2 = data_cent1.drop(data_cent1.columns[i+2]).drop(data_cent1.columns[i+3])
data_cent3 = data_cent2.withColumn('mindist',col(str(i)))
data_cent4 = data_cent3.withColumn('mindist1',least(data_cent3.columns[i+2], col('mindist')))
data_cent4 = data_cent4.drop('mindist')
data_cent5 = data_cent4.withColumnRenamed('mindist1','mindist')
next_selected = data_cent5.orderBy(desc('mindist')).limit(1).select(data_cent5.columns[0:2])#1:3

df_centroid = df_centroid.union(next_selected)
u = [str(i)+'x',str(i)+'y']
next_selected = next_selected.toDF(*u)
def initial_centroids(next_selected,data_cent_5_persist, i):
    data_cent6 = data_cent_5_persist.join(broadcast(next_selected))
    data_cent6 = data_cent6.withColumn(str(i),squaree_spark1(data_cent6.columns[0],data_cent6.columns[1],
                                             data_cent6.columns[i+3],data_cent6.columns[i+4]))#+4 +5
    data_cent6 = data_cent6.drop(data_cent6.columns[i+3]).drop(data_cent6.columns[i+4])#+4 +5
    data_cent6 = data_cent6.withColumn('mindist1',least(data_cent6.columns[i+3], col('mindist')))#4
    data_cent6 = data_cent6.drop('mindist')
    data_cent6 = data_cent6.withColumnRenamed('mindist1','mindist')
    next_cent = data_cent6.orderBy(desc('mindist')).limit(1).select(data_cent6.columns[0:2])#1:3
    return next_cent,data_cent6


data_cent_5_persist = data_cent5.persist(StorageLevel.MEMORY_ONLY_2)


# start = timer()
for i in range(1,k-1):
    next_selected, data_cent_5_persist = initial_centroids(next_selected,data_cent_5_persist, i)
    u = [str(i)+'x',str(i)+'y']
    next_selected = next_selected.toDF(*u)

# end = timer()
# print ("Execution time HH:MM:SS:",timedelta(seconds=end-start))



i= k-1
data_cent11 = data_cent_5_persist.join(broadcast(next_selected))
data_cent11 = data_cent11.withColumn(str(i),squaree_spark1(data_cent11.columns[0],data_cent11.columns[1],\
                                                           data_cent11.columns[k+2],data_cent11.columns[k+3]))
data_cent11 = data_cent11.drop('mindist').drop(data_cent11.columns[k+2]).drop(data_cent11.columns[k+3])


def FindMinCOl( *row_list):
    ind = row_list.index(min(*row_list))
    return int(ind)

find_min_val_name = udf(FindMinCOl, IntegerType())
data_cent14 = data_cent11.withColumn('defined_cluster', find_min_val_name(*data_cent11.columns[2:3+k]))
data_cent14 = data_cent14.select('first_numeric','second_numeric','defined_cluster')

spark.sparkContext.getConf().getAll()
spark.conf.get("spark.sql.shuffle.partitions")
#### Compute weighted average
data_cent14 = data_cent14.withColumn('index', row_number().\
                                          over(Window.orderBy(monotonically_increasing_id())) - 1)

data_cent14 = (data_cent14.join(count, (col('Cindex') == col('index')),\
          "inner")).drop('index').drop('Cindex')

for c in data_spark.columns:
    data_cent14 = data_cent14.withColumn(c+'*',col(c)*col('count') )
    data_cent14 = data_cent14.drop(c).withColumnRenamed(c+'*', c)

new_centroid = data_cent14.groupBy('defined_cluster').sum('first_numeric', 'second_numeric').\
withColumnRenamed('defined_cluster', 'defined_cluster*')

new_centroid1 = data_cent14.groupBy('defined_cluster').sum('count')
new_centroid = (new_centroid1.join(new_centroid, (col('defined_cluster') == col('defined_cluster*')),\
          "inner")).drop('defined_cluster*')

# for c in new_centroid.columns:
#     new_centroid = new_centroid.withColumnRenamed(c, c.replace('sum(', '').replace(')', ''))

for c in new_centroid.columns[2:]:
    new_centroid = new_centroid.withColumn(c, col(c) / col('sum(count)'))

new_centroid = new_centroid.drop('sum(count)')

In [None]:
spark.sparkContext.getConf().getAll()
spark.conf.get("spark.sql.shuffle.partitions")
def UpdateCentroid(x):
    data_cent_join1 = data_spark.join(broadcast(x))
    data_cent_join2 = data_cent_join1.withColumn('dist',squaree_spark1(data_cent_join1.columns[0],
                                                                 data_cent_join1.columns[1],
                                       data_cent_join1.columns[3],data_cent_join1.columns[4]))#3 4
    w = Window.partitionBy(data_cent_join2.columns[1])
    next_centroid = data_cent_join2.withColumn('mindist', F.min('dist').over(w)).\
    filter(col('dist') == col('mindist')).drop('dist')
    
    
    
    
    #### Compute weighted average
    data_cent14 = next_centroid.withColumn('index', row_number().\
                                              over(Window.orderBy(monotonically_increasing_id())) - 1)

    data_cent14 = (data_cent14.join(count, (col('Cindex') == col('index')),\
              "inner")).drop('index').drop('Cindex')

    for c in data_spark.columns:
        data_cent14 = data_cent14.withColumn(c+'*',col(c)*col('count') )
        data_cent14 = data_cent14.drop(c).withColumnRenamed(c+'*', c)

    new_centroid = data_cent14.groupBy('defined_cluster').sum('first_numeric', 'second_numeric').\
    withColumnRenamed('defined_cluster', 'defined_cluster*')

    new_centroid1 = data_cent14.groupBy('defined_cluster').sum('count')
    new_centroid = (new_centroid1.join(new_centroid, (col('defined_cluster') == col('defined_cluster*')),\
              "inner")).drop('defined_cluster*')


    for c in new_centroid.columns[2:]:
        new_centroid = new_centroid.withColumn(c, col(c) / col('sum(count)'))

    update_new_centroid = new_centroid.drop('sum(count)')
    ############
    
    
    
    
    return update_new_centroid, next_centroid

new_centroid_persist = new_centroid.persist(StorageLevel.MEMORY_AND_DISK)

# start = timer()

for i in range(20):
    new_centroid_persist, final_data = UpdateCentroid(new_centroid_persist)

# end = timer()


final_data = final_data.withColumnRenamed('avg(first_numeric)','cent_x').\
withColumnRenamed('avg(second_numeric)','cent_y')

In [None]:
new_centroid_persist

In [None]:
# data_cent11 = sc.parallelize([])
data_cent11 = 0
for i in range(k):
    u = [ str(i)+'x',str(i)+'y']
    next_selected = new_centroid_persist.filter(col('defined_cluster') == str(i)).\
    drop('defined_cluster').toDF(*u)
    if i == 0:
        data_cent11 = data_spark_df.join(broadcast(next_selected))
#         print(data_cent.count())
        
        data_cent11 = data_cent11.withColumn(str(i),squaree_spark1(data_cent11.columns[0],\
                            data_cent11.columns[1],data_cent11.columns[i+2],data_cent11.columns[i+3]))
        data_cent11 = data_cent11.drop(data_cent11.columns[i+2]).drop(data_cent11.columns[i+3])
        data_cent11 = data_cent11.withColumn('mindist',col(str(i)))
        data_cent11 = data_cent11.withColumn('mindist1',least(data_cent11.columns[i+2], col('mindist')))
        data_cent11 = data_cent11.drop('mindist')
        data_cent11 = data_cent11.withColumnRenamed('mindist1','mindist')
    elif i > 0:
        data_cent11 = data_cent11.join(broadcast(next_selected))
        data_cent11 = data_cent11.withColumn(str(i),squaree_spark1(data_cent11.columns[0],\
                                    data_cent11.columns[1], data_cent11.columns[i+2],data_cent11.columns[i+3]))
        data_cent11 = data_cent11.drop(u[0]).drop(u[1])
        data_cent11 = data_cent11.withColumn('mindist1',least(data_cent11.columns[i+3], col('mindist')))#4
        data_cent11 = data_cent11.drop('mindist')
        data_cent11 = data_cent11.withColumnRenamed('mindist1','mindist')
#         next_cent = data_cent11.orderBy(desc('mindist')).limit(1).select(data_cent11.\
#                                                                          columns[:len(data_spark_df.columns)-1])#1:3
data_cent11 = data_cent11.drop('mindist')

def FindMinCOl( *row_list):
    ind = row_list.index(min(*row_list))
    return int(ind)
find_min_val_name = udf(FindMinCOl, IntegerType())

data_cent11 = data_cent11.withColumn('defined_cluster', find_min_val_name(*data_cent11.columns[2:3+k]))
data_cent11 = data_cent11.select('first','second','defined_cluster')

In [None]:
d = data_cent11.toPandas()

In [None]:

final_data = final_data.withColumn('index', row_number().\
                                          over(Window.orderBy(monotonically_increasing_id())) - 1).\
drop('mindist').drop('second_numeric').drop('first_numeric')

In [None]:
leaders = leaders.withColumn('index_column_name', row_number().\
                                          over(Window.orderBy(monotonically_increasing_id())) - 1)

In [None]:
leaders1 = leaders.drop('count')

In [None]:
lead_final = (leaders1
    .join(final_data, (col('index_column_name') == col('index')), "inner")).drop('1').drop('2').\
    drop('index_column_name').drop('index')

In [None]:
lead_final = lead_final.withColumnRenamed('chunkLabel','chunk')

In [None]:
label_lead_final = (labels
    .join(lead_final,  ((col('label') == col('old label')) & (col('chunkLabel') == col('chunk'))),\
          "left")).drop('chunkLabel').drop('label').drop('old label').drop('chunk')


In [None]:
d = label_lead_final.toPandas()