In [None]:
import random
import pandas as pd
import time
import cdd
import enum
from math import sqrt, atan, floor, ceil, pi
import numpy as np
import sys

# libraries for LP 
from pulp import LpMinimize, LpProblem, LpStatus, LpVariable, LpMaximize, PULP_CBC_CMD
import gurobipy as gp

%matplotlib inline
from matplotlib import pyplot as plt



import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession




conf=pyspark.SparkConf().setMaster("local[20]").setAppName("Skyline Algos")
#conf=pyspark.SparkConf().setMaster('spark://master:7077').setSparkHome('/usr/local/spark').setAppName("Skyline Algos")

#conf.set("spark.executor.instances", "1")
#conf.set("spark.executor.memory", "6g")
#conf.set("spark.worker.memory", "18g")
#conf.set("spark.driver.bindAddress", "10.75.4.81")
#conf.set("spark.driver.port", "8619")
#conf.set("spark.driver.host", "10.75.4.81")
conf.set("spark.driver.memory", "2g")
#conf.set("spark.executor.memory", "4g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()




In [None]:

#global value for the coefficient of the Constraints
#case 2 dimension
# -x + y <= 0
# 0 <= x <= 1
# 0 <= y <= 1
A = np.array([[-1, 1],
                 [-1, 0],
                 [1, 0],
                 [0, -1],
                 [0, 1]])
b = np.array([[0],[0],[1],[0],[1]])
# x + y = 1
C = np.array([[1, 1]])
d = np.array([[1]])

#case 4 dimensions
# -x + y <= 0
# 0 <= x <= 1
# 0 <= y <= 1
# 0 <= z <= 1
# 0 <= u <= 1
E = np.array([[-1, 1, 0, 0],
                 [-1, 0, 0, 0],
                 [1, 0, 0, 0],
                 [0, -1, 0, 0],
                 [0, 1, 0, 0],
                 [0, 0, -1, 0],
                 [0, 0, 1, 0],
                 [0, 0, 0, -1],
                 [0, 0, 0, 1]])
f = np.array([[0],[0],[1],[0],[1],[0],[1],[0],[1]])
# x + y +z + u= 1
G = np.array([[1, 1, 1, 1]])
h = np.array([[1]])

#case 6 dimensions
# -x + y <= 0
# 0 <= x <= 1
# 0 <= y <= 1
# 0 <= z <= 1
# 0 <= u <= 1
#...
I = np.array([[-1, 1, 0, 0, 0, 0],
                 [-1, 0, 0, 0,0,0],
                 [1, 0, 0, 0,0,0],
                 [0, -1, 0, 0,0,0],
                 [0, 1, 0, 0,0,0],
                 [0, 0, -1, 0,0,0],
                 [0, 0, 1, 0,0,0],
                 [0, 0, 0, -1,0,0],
                 [0, 0, 0, 1,0,0],
                 [0, 0, 0, 0,-1,0],
                 [0, 0, 0, 0,1,0],
                 [0, 0, 0, 0,0,-1],
                 [0, 0, 0, 0,0,1]])
l = np.array([[0],[0],[1],[0],[1],[0],[1],[0],[1],[0],[1],[0],[1]])
# x + y +z + u + ..= 1
M = np.array([[1, 1, 1, 1, 1, 1]])
n = np.array([[1]])
#case 7 dimensions
# -x + y <= 0
# 0 <= x <= 1
# 0 <= y <= 1
# 0 <= z <= 1
# 0 <= u <= 1
#...
O = np.array([[-1, 1, 0, 0, 0, 0, 0],
                 [-1,0, 0, 0, 0, 0, 0],
                 [1, 0, 0, 0, 0, 0, 0],
                 [0,-1, 0, 0, 0, 0, 0],
                 [0, 1, 0, 0, 0, 0, 0],
                 [0, 0,-1, 0, 0, 0, 0],
                 [0, 0, 1, 0, 0, 0, 0],
                 [0, 0, 0,-1, 0, 0, 0],
                 [0, 0, 0, 1, 0, 0, 0],
                 [0, 0, 0, 0,-1, 0, 0],
                 [0, 0, 0, 0, 1, 0, 0],
                 [0, 0, 0, 0, 0,-1, 0],
                 [0, 0, 0, 0, 0, 1, 0],
                 [0, 0, 0, 0, 0, 0, -1],
                 [0, 0, 0, 0, 0, 0, 1],])
p_ = np.array([[0],[0],[1],[0],[1],[0],[1],[0],[1],[0],[1],[0],[1],[0],[1]])
# x + y +z + u + ..= 1
Q = np.array([[1, 1, 1, 1, 1, 1, 1]])
r = np.array([[1]])

In [None]:
#function to check if a tuple dominates another tuple
def dominates(a, b):
    hasStrict = False
    
    for i in range(len(a)):
        if a[i] > b[i]:
            return False
        elif a[i] < b[i]:
            hasStrict = True
    return hasStrict

#function to compute the value of one side of the inequality in VE version
def computeInequality(tupl, vertices, p):
    res = []
    for i in range(len(vertices)):
        value = 0
        for j in range(len(tupl)):
            value += vertices[i][j]*(tupl[j]**p)
        res.append(value)   
    #print(res)
    return res

#function that computes the objective function of the LP based on the value of the two tuples that we are comparing
def objective_function(i, j, p):
    obj_func_w = []
    for k in range(len(i)):
        obj_func_w.append(i[k]**p - j[k]**p)
    
    return obj_func_w

#function that computes the coordinates of the centroid using the vertex of the polyhedron
def compute_centroid(vertices):
    sum_axis = []
    centroid = []
    for i in range(len(vertices[0])):
        sum_axis.append(0)
        centroid.append(0)
    for i in range(len(vertices)):
        for j in range(len(vertices[i])):
            sum_axis[j] += vertices[i][j]
    for i in range(len(sum_axis)):
        centroid[i] = sum_axis[i] / len(vertices)
    
    return centroid

#sort function based on weights (e.g. centroid coordinates)
def sort_function(data, weight):
    value = 0
    for i in range(len(data)):
        value += data[i]*weight[i]
    return value

#function to compute the vertices from a set of equation
def computeVertices(A,b,C,d):
    m_ineq = np.hstack( (b, -A) )

    mat = cdd.Matrix(m_ineq, number_type='float') 
    mat.rep_type = cdd.RepType.INEQUALITY

    m_eq = np.hstack( (d, -C) )
    mat.extend(m_eq, linear=True)
    #print(mat.__getitem__(0), mat.__getitem__(1))
    # print(mat.lin_set)

    poly = cdd.Polyhedron(mat)
    ext = poly.get_generators()
    #print(ext)
    vertices = []
    for i in range(len(ext)):
        w2 = []
        for j in range(len(ext[i][1:])):
            w2.append(ext[i][j+1])
        vertices.append(w2)
    print('Vertices: ' + str(vertices))
    
    return vertices

#function that computes the left side of the equations in the po algorithm
def computeLeftSide(variables,po,i,p):
    value= 0 
    for j in range(len(variables)):
        value += variables[j]*(po[j][i]**p)
    return value

def getVariables(numDim):
    variables = []
    for i in range(numDim):
        variables.append(LpVariable(name= "x" + str(i), lowBound= 0,upBound=1))
    
    return variables

def getVariables_primal(numDim):
    variables = []
    variables.append(LpVariable(name= "phi"))
    for i in range(numDim):
        variables.append(LpVariable(name= "x" + str(i), lowBound= 0,upBound=1))

    return variables

#function that creates a gurobipy model for the dual computation of po
def getModel_dual(num):
    with gp.Env(empty=True) as env:
        env.setParam('OutputFlag', 0)
        env.start()
        m=gp.Model(env=env)
    m.Params.LogToConsole = 0
    variables = []
    for i in range(num):
        x= m.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS, name="x" + str(i))
        variables.append(x)
    
    return m,variables

#function that creates a gurobipy model for the computation of nd
def getModel_nd(num):
    with gp.Env(empty=True) as env:
        env.setParam('OutputFlag', 0)
        env.start()
        m=gp.Model(env=env)
    m.Params.LogToConsole = 0
    variables = []
    var_names = []
    for i in range(num):
        x= m.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS, name="x" + str(i))
        variables.append(x)
        var_names.append("x"+str(i))
    
    return m,variables,var_names

#function that creates a gurobipy model for the primal computation of po
def getModel_primal(numDim):
    with gp.Env(empty=True) as env:
        env.setParam('OutputFlag', 0)
        env.start()
        m=gp.Model(env=env)
    m.Params.LogToConsole = 0
    variables = []
    var_names = []
    phi= m.addVar(vtype=gp.GRB.CONTINUOUS, name="phi")
    variables.append(phi)
    var_names.append("phi")
    for i in range(numDim):
        x= m.addVar(lb=0.0, ub=1.0, vtype=gp.GRB.CONTINUOUS, name="w" + str(i))
        variables.append(x)
        var_names.append("w"+str(i))
    
    return m,variables,var_names

#function that compute the volume 
def volume(p):
    volume = 1
    for i in range(len(p)):
        volume *= p[i]
    return volume




In [None]:
# function that find the skyline using BNL
def find_skyline_bnl(data):
    #Finds the skyline using a block-nested loop.
    if not isinstance(data,list):
        data = list(data)
    skyline = []
    c = 0
    for i in data:
        is_dominated = False
        for j in skyline:
            c += 1
            if dominates(j,i):
                is_dominated = True
                break
        if is_dominated:
            continue
        # removing dominated points from the window
        to_drop = []
        for k in skyline:
            c += 1
            if dominates(i, k):
                to_drop.append(k)

        for drop in to_drop:
            skyline.remove(drop)

        skyline.append(i)

    print('comparisons:'+str(c))
        
    return skyline

def find_skyline_sfs(data, weights):

    if not isinstance(data,list):
        data = list(data)

    #sort the dataset using a sort function
    data.sort(key = lambda x: sort_function(x, weights))

    skyline = []
    #c = 0
    # Loop through the rest of the rows
    for i in data:
        is_dominated = False

        for j in skyline:
            #c+=1
            if dominates(j,i):
                is_dominated = True
                break

        if is_dominated:
            continue

        skyline.append(i)

    #print('comparisons:'+str(c))
    return skyline

def find_skyline_sfs2(data):

    if not isinstance(data,list):
        data = list(data)

    #sort the dataset using a sort function
    data.sort()

    skyline = []
    c = 0
    # Loop through the rest of the rows
    for i in data:
        is_dominated = False

        for j in skyline:
            c+=1
            if dominates(j,i):
                is_dominated = True
                break

        if is_dominated:
            continue

        skyline.append(i)

    print('comparisons:'+str(c))
    return skyline



#function that finds the skyline using SaLSa
def find_skyline_SaLSa(data):
    if not isinstance(data,list):
        data = list(data)
    c = 0
    if len(data)== 0:
        return []
    data.sort(key= lambda x: (min(x), sum(x)))
    
    skyline = []
    
    p_stop = data[0]
    
    for p in data:
        is_dominated = False
        if (min(p) > max(p_stop)) or ((min(p) == max(p_stop)) and p != p_stop):
            break
      
        for i in skyline:
            c += 1
            if dominates(i, p):
                is_dominated = True
                break
        
        if not is_dominated:
            if max(p) < max(p_stop):
                p_stop = p
            
            skyline.append(p)
    print('comparisons: '+str(c))
    return skyline




In [None]:
def sfs_multithread(datapoints, global_set, weights):
    nd = []
    
    datapoints = list(datapoints)
        
    datapoints.sort(key=lambda x: sort_function(x, weights))
    
    for ps in datapoints:
        # ps : potentially non dominated point
        for other in global_set:
            if ps == other:
                nd.append(ps)
                break
            dominated = False
            if dominates(other, ps):
                dominated = True
                break
            
#         for other in global_set:
#             # dominated other
#             dominated = False
#             # num of dimensions in which the points are equal
#             dimEqual = 0
#             for k in range(len(global_set[0])):
#                 if ps[k] < other[k] :
#                     dominated = True
#                     break
#                 elif other[k] == ps[k]:
#                     dimEqual = dimEqual + 1
#             if dominated == True:
#                 continue
#             # We suppose that the global_set is ordered. 
#             # Keeping in mind that the global_set is a superset of our datapoints, if we find our point in the global_set
#             # then all other points can not dominated this current point.
#             if dimEqual == len(global_set[0]):
#                 nd.append(ps)
#                 break
            
#             break
            
    return nd


def salsa_multithread(data, global_set):
    nd = []

    if not isinstance(data,list):
        data = list(data)

    if len(data)== 0:
        return []
    data.sort(key= lambda x: (min(x), sum(x)))

    p_stop = data[0]

    for p in data:
        # ps : potentially non dominated point
        if (min(p) > max(p_stop)) or ((min(p) == max(p_stop)) and p != p_stop):
            break
        for other in global_set:
            # dominated other
            dominated = False
            # num of dimensions in which the points are equal
            dimEqual = 0
            for k in range(len(global_set[0])):
                if p[k] < other[k] :
                    dominated = True
                    break
                elif other[k] == p[k]:
                    dimEqual = dimEqual + 1
            if dominated == True:
                continue

            if max(p) < max(p_stop):
                p_stop = p
            # We suppose that the global_set is ordered.
            # Keeping in mind that the global_set is a superset of our datapoints, if we find our point in the global_set
            # then all other points can not dominated this current point.
            if dimEqual == len(global_set[0]):
                nd.append(p)
                break

            break

    return nd

def sve1f_multithread(data, global_set, vertices, p):

    if not isinstance(data,list):
        data = list(data)

    if len(data) == 0:
        return []

    centroids =  compute_centroid(vertices)
    #print(centr)
    data.sort(key=lambda x: sort_function(x, centroids))
    nd = []
    for d in data:
        left_hand_side = computeInequality(d, vertices, p)
        is_dominated = True
        for other in global_set:
            # dominated other
            if other == d:
                nd.append(d)
                break
            if dominates(other, d):
                break
            right_hand_side = computeInequality(other, vertices, p)
            for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t F-dominates s otherwise no
                is_dominated = True
                if left_hand_side[k] < right_hand_side[k]:
                    is_dominated = False
                    break
            if is_dominated:
                break

    return nd

def pond_primal_pulp_multithread(data, global_set, constraints, k_):
    if not isinstance(data,list):
        data = list(data)

    if len(data) == 0:
        return []

    delta = 2
    lastRound = False
    minimum = 0
    dimensions = len(data[0])

    while(not lastRound):

        if delta >= (len(global_set) - 1):
            lastRound = True
        data_reversed = data.copy()
        data_reversed.reverse()
        for t in data_reversed: #candidate F-dominated tuple
            model = LpProblem(sense=LpMaximize)
            #Prepare the LP model
            variables = getVariables_primal(dimensions)

            temp2 = 0
            for cons in constraints:
                i = 0
                for index in range(len(cons)):
                    temp2 += cons[index]*variables[index+1]
                model += (temp2 <= k_[i])
                i += 1

            minimum = min(delta, len(global_set)-1)
            if minimum == 0:
                break
            temp = 0
            for i in range(len(variables)-1):
                temp += variables[i+1]
            model += temp == 1

            po_temp = global_set.copy()
            po_temp.remove(t)

            for j in range(minimum): #compute the inequalities and add it to the LP model
                left_side = 0
                for i in range(dimensions):
                    left_side += variables[i+1] * (t[i]-po_temp[j][i])
                model += (left_side + variables[0] <= 0)
            model.setObjective(variables[0])

            model.solve(PULP_CBC_CMD(msg=False))

            if (model.objective.value() > 0):
                continue

            data.remove(t)
            global_set.remove(t)

            del model

        delta = delta * 2

    return data


def po_primal_pulp_non_inc_multi(po, global_set, constraints, k_):
    if not isinstance(po,list):
        po = list(po)

    if len(po) == 0:
        return []
    dimensions = len(po[0])
    po_reversed = po.copy()
    po_reversed.reverse()
    for t in po_reversed:
        model = LpProblem(sense=LpMaximize)
        #Prepare the LP model
        variables = getVariables_primal(dimensions)
        temp2 = 0
        for cons in constraints:
            i = 0
            for index in range(len(cons)):
                temp2 += cons[index]*variables[index+1]
            model += (temp2 <= k_[i])
            i += 1
        temp = 0
        for i in range(len(variables)-1):
            temp += variables[i+1]

        model += temp == 1
        po_temp = global_set.copy()
        po_temp.remove(t)
        for j in range(len(po_temp)): #compute the inequalities and add it to the LP model
            left_side = 0
            for i in range(dimensions):
                left_side += variables[i+1] * (t[i]-po_temp[j][i])
            model += (left_side + variables[0] <= 0)
        model.setObjective(variables[0])
        model.solve(PULP_CBC_CMD(msg=False))

        if (model.objective.value() > 0): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
#                 print("Removed "+ str(t))
            continue
            #po_reversed.remove(t)
        po.remove(t)
        del model

    return po


In [None]:
#unpack each data by it's first index
def execute_sfs_indexed(input_list, weights):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = find_skyline_sfs(nd,weights)
    return nd

def execute_sfs_indexed2(input_list, weights):
    input_list = []
    for l in f:
        p =l.strip().split(" ")

        point=[]
        for x in p:
            point.append(float(x))
        input_list.append(point)
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = find_skyline_sfs(nd,weights)
    return nd


def execute_sfs_indexed_with_memory(input_list, memory_list,weights):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sfs_with_memory(nd, memory_list, weights)
    return nd

def execute_saLSa_indexed(input_list):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = find_skyline_SaLSa(nd)
    return nd

def execute_salsa_indexed_with_memory(input_list, memory_list):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = salsa_with_memory(nd, memory_list)
    return nd
def execute_sve1_filter_with_memory(input_list, representative, vertices, p, onlyFilter):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = filter_nd_with_memory2(nd, representative, vertices, p, onlyFilter)
    return nd

def execute_sve1_indexed(input_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sve1(nd,vertices, p)
    return nd

def execute_sve1_indexed_with_memory(input_list, memory_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sve1_with_memory(nd, memory_list, vertices, p)
    return nd

def execute_sve1f_indexed(input_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sve1f(nd,vertices, p)
    return nd
def execute_sve1f_multithread(input_list, global_set, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sve1f_multithread(nd,global_set, vertices, p)
    return nd

def execute_sve1f_indexed_with_memory(input_list, memory_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = sve1f_with_memory(nd, memory_list, vertices, p)
    return nd

def execute_POND_indexed(input_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = POND(nd,vertices, p)
    return nd

def execute_pond_indexed_dual(input_list, vertices, p):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = po_dual(nd,vertices, p)
    return nd

def execute_pond_indexed_primal(input_list, constraints, k_):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = po_primal(nd, constraints, k_)
    return nd


def execute_pond_indexed_primal_pulp(input_list, constraints, k_):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = po_primal_pulp(nd, constraints, k_)
    return nd

In [None]:

#function that normalize the data in the dataset
def normalize_data(data):
    if not isinstance(data, list):
        data = list(data)
    return ( 0.999999*(data - np.min(data))) / (np.max(data) - np.min(data))

### Data generation configuration

# Type of dataset generation
class DataGenEnum(enum.Enum):
    antiCorrelated = 1
    anticorrelated = 1
    Anticorrelated = 1
    AntiCorrelated = 1
    correlated = 2
    Correlated = 2
    Independent = 3
    independent = 3
    
    
class DataGenConfig():
    def __init__(self, typeOfCorrelation = DataGenEnum.independent, 
                 dataRange = [0,1], avg = 0.5, skylinePercentage = 1,
                 numberOfData = 10**6, numberOfDimensions = 4,
                 spreadPercentage = 10): 
        self.typeOfCorrelation = typeOfCorrelation
        self.dataRange = dataRange
        # UNUSED Variable
        self.avg = avg
        self.skylinePercentage = skylinePercentage
        self.numberOfData = numberOfData
        self.numberOfDimensions = numberOfDimensions
        self.spreadPercentage = spreadPercentage
        
    def setCorrelated(self):
            self.typeOfCorrelation = DataGenEnum.correlated
    def setAntiCorrelated(self):
            self.typeOfCorrelation = DataGenEnum.antiCorrelated
    def setIndependent(self):
            self.typeOfCorrelation = DataGenEnum.independent 
            
    def setNumberOfData(self, numData):
        self.numberOfData = numData
    
# Method that  creates the different types of datasets based on the distribution   
def dataGenerator(dataConfiguration = None):
    if dataConfiguration == None :
        dataConfiguration = DataGenConfig()
        
    typeOfCorrelation = dataConfiguration.typeOfCorrelation
    dataRange = dataConfiguration.dataRange
    avg = dataConfiguration.avg
    skylinePercentage = dataConfiguration.skylinePercentage
    numberOfData = dataConfiguration.numberOfData
    numberOfDimensions = dataConfiguration.numberOfDimensions
    spreadPercentage = dataConfiguration.spreadPercentage
    
    minDataValue = dataRange[0]
    maxDataValue = dataRange[1]
    data = []
    if typeOfCorrelation == DataGenEnum.independent:
        for i in range(numberOfData):
            datum = []
            for i in range(numberOfDimensions):
                datum.append(random.random()*(maxDataValue-minDataValue)+minDataValue)
            data.append(datum)
    elif typeOfCorrelation == DataGenEnum.correlated:
        for i in range(numberOfData):
            datum = []
            datum.append(random.random()*(maxDataValue-minDataValue)+minDataValue)
            relatedValue = datum[0]
            spread = spreadPercentage * 0.01
            for i in range(1, numberOfDimensions):
                datum.append(relatedValue + ((random.random()-0.5)*spread) )
            data.append(datum)
    else: #typeOfCorrelation = antiCorrelated
        for i in range(numberOfData):
            datum = []
            datum.append(random.random()*(maxDataValue-minDataValue)+minDataValue)
            relatedValue = maxDataValue-datum[0]
            spread = spreadPercentage * 0.01
            for i in range(1, numberOfDimensions):
                datum.append(relatedValue + (relatedValue*(random.random()-0.5)*spread) )
            data.append(datum)
    return data

In [None]:
#CENTRALIZED VERSION
#First ND algorithm: BNL with LP + 2 phases: ULP2

def ulp2(data, p, model, variables):
    #PHASE 1
    start = time.time()
    skyline = find_skyline_bnl(data)
    #print(nd)
    end = time.time() - start
    print('Number of points in the skyline ' + str(len(skyline)))
    print('Time taken ' + str(end))
    #PHASE 2
    model_debug=True
    nd = skyline.copy()
    nd.reverse()
    #count = 0
    for s in skyline:
#         count += 1
#         print(count)
        for t in nd:
            if s==t:
                continue
            obj_fun_w = []
            obj_fun_w = objective_function(s, t, p)
            model2 = model.copy()
            temp= 0
            for pos in range(len(variables)):
                temp += obj_fun_w[pos] * variables[pos]
            model2 += temp
            model2.solve(PULP_CBC_CMD(msg=False))
            if model_debug:
                print(model2)
                model_debug=False
            #print(model2.objective.value())
            
            if model2.objective.value()>=0:
                #print(model2.objective.value(), s)
                nd.remove(s)
                break
                   
    return nd    

#Second ND algorithm: SFS with LP + 2 phases: SLP2
def slp2(data, p, model, centr, variables):
    #PHASE 1
    start = time.time()
    skyline = find_skyline_sfs(data, centr)
    #print(nd)
    end = time.time() - start
    print('Number of points in the skyline ' + str(len(skyline)))
    print('Time taken ' + str(end))
    #PHASE 2
    model_debug=True
    nd = []
    #count = 0
    for s in skyline: #Candidate F-dominated tuples
#         count += 1
#         print(count)
        is_dominated = False
        for t in nd: #candidate F-dominant tuple
            obj_fun_w = []
            obj_fun_w = objective_function(s, t, p)
            model2 = model.copy()
            temp= 0
            for pos in range(len(variables)):
                temp += obj_fun_w[pos] * variables[pos]
            model2 += temp
            model2.solve(PULP_CBC_CMD(msg=False))
            if model_debug:
                print(model2)
                model_debug=False
            #print(model2.objective.value())
            
            if model2.objective.value()>=0:  #if the solution of the LP has non-negative solution t f-dominates s
                is_dominated = True
                break
        if is_dominated:
            continue
        nd.append(s)
    return nd

def slp2(data, p, model, centr, variables):
    #PHASE 1
    start = time.time()
    skyline = find_skyline_sfs(data, centr)
    #print(nd)
    end = time.time() - start
    print('Number of points in the skyline ' + str(len(skyline)))
    print('Time taken ' + str(end))
    #PHASE 2
    model_debug=True
    nd = []
    #count = 0
    for s in skyline: #Candidate F-dominated tuples
#         count += 1
#         print(count)
        is_dominated = False
        for t in nd: #candidate F-dominant tuple
            obj_fun_w = []
            obj_fun_w = objective_function(s, t, p)
            model2 = model.copy()
            temp= 0
            for pos in range(len(variables)):
                temp += obj_fun_w[pos] * variables[pos]
            model2 += temp
            model2.solve(PULP_CBC_CMD(msg=False))
            if model_debug:
                print(model2)
                model_debug=False
            #print(model2.objective.value())

            if model2.objective.value()>=0:  #if the solution of the LP has non-negative solution t f-dominates s
                is_dominated = True
                break
        if is_dominated:
            continue
        nd.append(s)
    return nd

#Third ND algorithm: BNL with VE + 2 phases: UVE2
def uve2(data, vertices, p):
    #PHASE 1
    start = time.time()
    skyline = find_skyline_bnl(data)
    #print(nd)
    end = time.time() - start
    print('Number of points in the skyline ' + str(len(skyline)))
    print('Time taken ' + str(end))
    #PHASE 2
    #print(skyline)
    nd = skyline.copy()
    
    #count = 0
    for s in skyline:
        #count += 1
        #print(count)
        is_dominated = False
        left_hand_side = computeInequality(s, vertices, p)
        #print(s, left_hand_side)
        for t in nd:
            if s==t:
                continue
            right_hand_side = computeInequality(t, vertices, p)
            #print(t, right_hand_side)
            for k in range(len(right_hand_side)):  #if s satisfies all the inequalities, t f-dominates s otherwise no
                is_dominated = True
                if (left_hand_side[k] < right_hand_side[k]):
                    is_dominated = False
                    break
            if is_dominated:
                break
        if is_dominated:
            #print(s)
            nd.remove(s)
    return nd

#4° ND algorithm: BNL with VE + 2 phases: SVE2
def sve2(data, vertices, p):
    #PHASE 1
    start = time.time()
    centr =  compute_centroid(vertices)
    print(centr)
    skyline = find_skyline_sfs(data, centr)
    #print(nd)
    end = time.time() - start
    print('Number of points in the skyline ' + str(len(skyline)))
    print('Time taken ' + str(end))
    #PHASE 2
    #print(skyline)
    nd = []
    #count = 0
    for s in skyline: #candidate F-dominated tuple
#         count += 1
#         print(count)
        is_dominated = False
        left_hand_side = computeInequality(s, vertices, p)
        #print(s, left_hand_side)
        for t in nd: #candidate F-dominant tuple
            right_hand_side = computeInequality(t, vertices, p)
            #print(t, right_hand_side)
            for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t f-dominates s otherwise no
                is_dominated = True
                if (left_hand_side[k] < right_hand_side[k]):
                    is_dominated = False
                    break
            if is_dominated:
                #print(str(t) + ' dominates '+ str(s))
                break
        if is_dominated:
            continue
        nd.append(s)
    return nd

#5° ND algorithm: BNL with VE + 1 phase: SVE1
def sve1(data, vertices, p):
    #PHASE 1
    if not isinstance(data,list):
        data = list(data)

    nd = []
    #count = 0
    centr =  compute_centroid(vertices)
    #print(centr)
    data_sorted = data.copy()
    data_sorted.sort(key=lambda x: sort_function(x, centr))
    for s in data_sorted: #candidate F-dominated tuple
        is_dominated = False
        for t in nd: #candidate F-dominant tuple
            if dominates(t,s):  # if true, t dominates s
                is_dominated = True
                break
        if is_dominated:
            continue
        left_hand_side = computeInequality(s, vertices, p)
        for t in nd:  #candidate F-dominant tuple
            right_hand_side = computeInequality(t, vertices, p)
            #print(t, right_hand_side)
            for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t F-dominates s otherwise no
                is_dominated = True
                if (left_hand_side[k] < right_hand_side[k]):
                    is_dominated = False
                    break
            if is_dominated:
                #print(str(t) + ' dominates '+ str(s))
                break
        if is_dominated:
            continue
#         count += 1
#         print(count)
        nd.append(s)            
    
    return nd

#6° ND algorithm: BNL with VE + 1 phase (Variant): SVE1F
def sve1f(data, vertices, p):
    #PHASE 1
    if not isinstance(data,list):
        data = list(data)

    nd = []
    #count = 0
    centr =  compute_centroid(vertices)
    #print(centr)
    data_sorted = data.copy()
    data_sorted.sort(key=lambda x: sort_function(x, centr))
    
    for s in data_sorted: #candidate F-dominated tuple
        is_dominated = False
        left_hand_side = computeInequality(s, vertices, p)
        for t in nd: #candidate F-dominant tuple
            
            #print(t, right_hand_side)
            if dominates(t,s): #if true, t dominates s
                is_dominated  = True
                break
            right_hand_side = computeInequality(t, vertices, p)
            for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t F-dominates s otherwise no
                is_dominated = True
                if (left_hand_side[k] < right_hand_side[k]):
                    is_dominated = False
                    break
            if is_dominated:
                break
        if is_dominated:
            continue
#         count += 1
#         print(count)
        nd.append(s)            
    
    return nd

#PO Algorithm: version with PuLP optimizer and dual computation of po
def POND(po, vertices, p):
    if not isinstance(po,list):
        po = list(po)

    delta = 2
    lastRound = False
    minimum = 0
    
    while(not lastRound):
        
        if delta >= (len(po) - 1):
            lastRound = True
        po_reversed = po.copy()
        po_reversed.reverse()
        for t in po_reversed: #candidate F-dominated tuple
            model = LpProblem(sense=LpMinimize)
            variables= []
            minimum = min(delta, len(po)-1)
            if minimum == 0:
                break
                
            #Prepare the LP model
            variables = getVariables(minimum)
            temp = 0
            for i in range(len(variables)):
                temp += variables[i]
            model += temp == 1
            
            po_temp = []
            po_temp = po.copy()
            po_temp.remove(t)
            
            for l in range(len(vertices)): #compute the inequalities and add it to the LP model
                left_side = 0
                right_side = 0
                for i in range(len(vertices[l])):    
                    left_side += vertices[l][i] * computeLeftSide(variables,po_temp,i,p)
                    right_side += vertices[l][i]* (t[i]**p)
                model += left_side <= right_side
#             print(minimum)
#             print(po_temp)
#             print(t)
#             print(model)
            model.solve(PULP_CBC_CMD(msg=False))
            #print(LpStatus[model.status])
            if (not (LpStatus[model.status] == 'Infeasible')): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
#                 print("Removed "+ str(t))
                po.remove(t)
                #po_reversed.remove(t)
            del model
        
        delta = delta * 2
    
    return po

def po_primal_pulp(po, constraints, k_):
    if not isinstance(po,list):
        po = list(po)

    if len(po) == 0:
        return []

    delta = 2
    lastRound = False
    dimensions = len(po[0])

    while(not lastRound):

        if delta >= (len(po) - 1):
            lastRound = True
        po_reversed = po.copy()
        po_reversed.reverse()
        for t in po_reversed: #candidate F-dominated tuple
            model = LpProblem(sense=LpMaximize)
            #Prepare the LP model
            variables = getVariables_primal(dimensions)

            temp2 = 0
            for cons in constraints:
                i = 0
                for index in range(len(cons)):
                    temp2 += cons[index]*variables[index+1]
                model += (temp2 <= k_[i])
                i += 1

            minimum = min(delta, len(po)-1)
            if minimum == 0:
                break

            temp = 0
            for i in range(len(variables)-1):
                temp += variables[i+1]

            model += temp == 1

            po_temp = po.copy()
            po_temp.remove(t)

            for j in range(minimum): #compute the inequalities and add it to the LP model
                left_side = 0
                for i in range(dimensions):
                    left_side += variables[i+1] * (t[i]-po_temp[j][i])
                model += (left_side + variables[0] <= 0)
            model.setObjective(variables[0])
            model.solve(PULP_CBC_CMD(msg=False))

            if (model.objective.value() > 0): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
#                 print("Removed "+ str(t))
                continue
                #po_reversed.remove(t)
            po.remove(t)
            del model

        delta = delta * 2

    return po

def po_primal_pulp_non_inc(po, constraints, k_):
    if not isinstance(po,list):
        po = list(po)

    if len(po) == 0:
        return []
    dimensions = len(po[0])
    po_reversed = po.copy()
    po_reversed.reverse()
    for t in po_reversed:
        model = LpProblem(sense=LpMaximize)
        #Prepare the LP model
        variables = getVariables_primal(dimensions)
        temp2 = 0
        for cons in constraints:
            i = 0
            for index in range(len(cons)):
                temp2 += cons[index]*variables[index+1]
            model += (temp2 <= k_[i])
            i += 1
        temp = 0
        for i in range(len(variables)-1):
            temp += variables[i+1]

        model += temp == 1
        po_temp = po.copy()
        po_temp.remove(t)
        for j in range(len(po_temp)): #compute the inequalities and add it to the LP model
            left_side = 0
            for i in range(dimensions):
                left_side += variables[i+1] * (t[i]-po_temp[j][i])
            model += (left_side + variables[0] <= 0)
        model.setObjective(variables[0])
        model.solve(PULP_CBC_CMD(msg=False))

        if (model.objective.value() > 0): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
#                 print("Removed "+ str(t))
            continue
            #po_reversed.remove(t)
        po.remove(t)
        del model

    return po

#PO Algorithm: version with gurobipy optimizer and dual computation of po 
def po_dual(po, vertices, p):
    if not isinstance(po,list):
        po = list(po)
        
    delta = 2
    lastRound = False
    minimum = 0

    count_round = 0
    count_lp = 0
    while(not lastRound):
        count_round +=1
        if delta >= (len(po) - 1):
            lastRound = True
        
        po_reversed = po.copy()
        po_reversed.reverse()
        
        for t in po_reversed: #candidate F-dominated tuple
            variables= []
            minimum = min(delta, len(po)-1)
            if minimum == 0:
                break
            count_lp +=1
            #Prepare the LP model
            m, variables = getModel_dual(minimum)
            temp = 0
            for i in range(len(variables)):
                temp += variables[i]            
            m.addConstr(temp == 1)
            
            # first min(σ , |PO| − 1) tuples in PO \ {t}
            po_temp = []
            po2 = [s for s in po if s != t]
            for k in range(minimum):
                    po_temp.append(po2[k])
            for l in range(len(vertices)): #compute the inequalities and add it to the LP model
                left_side = 0
                right_side = 0
                for i in range(len(vertices[l])):    
                    left_side += vertices[l][i] * computeLeftSide(variables,po_temp,i,p)
                    right_side += vertices[l][i]* (t[i]**p)
                m.addConstr(left_side <= right_side)
        
            m.optimize()
            
            if (not (m.status == 3)): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
                po.remove(t)
        
        delta = delta * 2
    
#     print('Total round: '+ str(count_round))
#     print('Total lp: '+ str(count_lp))
#     print('Last value: '+ str(minimum))
    return po

#PO Algorithm: version with gurobipy optimizer and primal computation of po 
def po_primal(po, constraints, k_):

    if not isinstance(po,list):
        po = list(po)
        
    if len(po) == 0:
        return []
    delta = 2
    lastRound = False
    minimum = 0

    count_round = 0
    count_lp = 0
    
    
    dimensions = len(po[0])
    #Prepare the LP model 
    m, variables, var_names = getModel_primal(dimensions)

    m.setObjective(variables[0], gp.GRB.MAXIMIZE)
    temp = 0
    for i in range(len(variables)-1):
        temp += variables[i+1]            
    m.addConstr(temp == 1)

    temp2 = 0
    for cons in constraints:
        i = 0
        for index in range(len(cons)):
            temp2 += cons[index]*variables[index+1]
        m.addConstr(temp2 <= k_[i])
        i += 1
    
    while(not lastRound):
        count_round +=1
        if delta >= (len(po) - 1):
            lastRound = True
        
        po_reversed = po.copy()
        po_reversed.reverse()
        
        for t in po_reversed: #candidate F-dominated tuple
            
            minimum = min(delta, len(po)-1)
            if minimum == 0:
                break
            count_lp +=1  
            
            m.update()
            m_temp = m.copy()
            
            
            # first min(σ , |PO| − 1) tuples in PO \ {t}
            po_temp = []
            po2 = [s for s in po if s != t]
            for k in range(minimum):
                    po_temp.append(po2[k])
            for j in range(minimum): #compute the inequalities and add it to the LP model
                left_side = 0
                for i in range(dimensions): 
                    left_side += m_temp.getVarByName(var_names[i+1])*(t[i]-po_temp[j][i])
                m_temp.addConstr(left_side + m_temp.getVarByName(var_names[0]) <= 0)
#             m_temp.update()
            m_temp.optimize()
            if (m_temp.status == 2) and (m_temp.objVal > 0): # PO contains t iff the linear system is infeasible, otherwise is F-dominated by a convex combination of the other
                continue
            
            po.remove(t)
                    
            del m_temp
        delta = delta * 2
    
    return po

In [None]:
#Function that computes the index of the tuples

#Angular partitioning
def get_angular_partitionIndex(datapoint, dimensions, numSlices = 2):
    
    angle = 0
    for i in range(dimensions):
        angle = angle + datapoint[i]**2
        
    anglesArray = []
    ## first is radius then all angles
    for i in range(dimensions):
        if i == 0:
            # radius
            continue
        else:
            angle = angle - (datapoint[i-1]**2) 
            angle = max(0,angle)
            if datapoint[i-1] == 0:
                value = sqrt(angle) / (0.0001)
            else:
                value = sqrt(angle) /  datapoint[i-1]
        anglesArray.append(value)
        
    index = 0
    for i in range(len(anglesArray)):
        index = index + floor(atan(anglesArray[i])*(2/pi)*numSlices) * (numSlices**i)
    return index

#Grid partitioning
def get_grid_partition_index(datapoint, numSlices = 2):
    
    index = 0
    for i in range(len(datapoint)):
        # Maps space from 0 to numSlices ^ dimensions - 1
        if datapoint[i] >= 1:
            index = index + (numSlices-1) * (numSlices**i)
        else:
            index = index + floor(datapoint[i] * numSlices) * (numSlices**i)
    return index


In [None]:
# Grid partitioning
print('Grid partitioning with a serial grid filtering phase')
class Container:
    def __init__(self, worstPoint = [], bestPoint = [], dataContained = []):
        #worst tuple
        self.worstPoint = worstPoint
        #best tuple
        self.bestPoint = bestPoint
        self.dataContained = dataContained
        
    def addPoint(self, dataPoint):
        if len(dataPoint) != len(self.worstPoint):
            raise Exception('Datapoint dimension not consistent with container point`s dimensions: ' \
                            + str(len(dataPoint)) + \
                            ' ' + str(len(self.worstPoint)))
        self.dataContained.add(dataPoint)

            
def filtering_containers(containerList):
    nd = []
    containerList.sort(key=lambda x: (min(x.bestPoint)))
    for container in containerList:
        if not container.dataContained: #if dataContained array is not empty
            continue
        bp = container.bestPoint
        dominated = False
        for other in nd:
            if container == other:
                continue
            #if the best point of the selected container is dominated by the worstPoint of one of the other containers 
            if dominates(other.worstPoint, bp):
                dominated = True
                break 
        if dominated:
            continue 
        nd.append(container)
    
    return nd
    
# Finds the skyline of grid containers based on its representative point min and max
def query_containers(datapoints, numSlicesPerDimension = 8):
    
    limit = 1 / (numSlicesPerDimension)
    #print('iterating - limit: ' +str(limit))
    dimensions = len(datapoints[0])
    num_slices_in_space = numSlicesPerDimension**dimensions
    containerList = []
    # create N square containers with each container having the datapoints contained and an index
    for i in range(num_slices_in_space): 
        worst = []
        best =[]
        for j in range(dimensions):
            #inizializzazione worst tuple
            index_w = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            worst.insert(j, index_w * limit + limit)
            #inizializzazione best tuple
            index_b = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            best.insert(j, index_b * limit)
                
        containerList.insert(i, Container(worst, best, []))
        
    for dp in datapoints:
        index = 0
        for i in range(len(dp)):
            if dp[i] >= 1:
                index = index + floor(0.99999 / limit) * (numSlicesPerDimension**i)
            else:
                index = index + floor(dp[i] / limit) * (numSlicesPerDimension**i)
        (containerList[index].dataContained).append(dp)

    print("Number of containers before filtering: " + str(len(containerList)))
    resultingContainers = filtering_containers(containerList)
    input_list = []
    for container in resultingContainers:
        input_list = input_list + container.dataContained
    print('Number of points after filtering: '+ str(len(input_list)))
    return input_list

In [None]:
print('Helper methods for Representative filtering.')
def get_best_representatives(index, dataset, n = 30):
    # index is the dimension which we should check
    # is a tuple with area_covered as the first element and the n-dimensional point as the second element

    best_n_points = []
    limit = 1/n # default is 0.01
    for i in range(n):
        best_n_points.append((0,[]))
    counter = 0
    for point in dataset:
        counter = counter + 1
        area_covered = 1
        for value in point:
            area_covered = area_covered * (1-value)
        rep_index = floor(point[index]/limit)
        if best_n_points[rep_index][0] < area_covered:
            best_n_points[rep_index] = (area_covered, point)
    best_n_points = [x[1] for x in best_n_points if x[1]]
    return best_n_points


def filter_with_memory(datapoints, reps, weights, onlyFilter = True):
    
    nd = []
#     comp = []
#     comp.insert(0,0) 
    for ps in datapoints:
        # ps : potentially non dominated point
        dominated = False
        for rep in reps:
            #comp[0] = comp[0] +1
            if dominates(rep, ps):
                dominated = True # other point is dominated
                break
        if not dominated:
            nd.append(ps)
    
    if onlyFilter:
        return nd
    return find_skyline_sfs(nd, weights)

def filter_with_improvement(datapoints, reps, weights, onlyFilter = False):
    
    nd = []
        
    comp = []
    comp.insert(0,0) 
    
    for ps in datapoints:
        # ps : potentially non dominated point
        dominated = False
        for rep in reps:
            comp[0] = comp[0] +1
            if ps == rep: 
                break
            if dominates(rep, ps):
                dominated = True # other point is dominated
                break
            
        if not dominated:
            nd.append(ps)
    
    if onlyFilter:
        return comp
    return find_skyline_sfs(nd, weights)
def filter_nd_with_memory2(datapoints, reps, vertices, p, onlyFilter = False):
    
    if not isinstance(datapoints,list):
        datapoints = list(datapoints)
    
    nd = []
    #centr =  compute_centroid(vertices)
    #print(centr)
    #datapoints.sort(key=lambda x: sort_function(x, centr))
    
    for s in datapoints:
        # ps : potentially non dominated point
        is_dominated = False
        for rep in reps:
            if dominates(rep, s):
                is_dominated = True # other point is dominated
                break
        if is_dominated:
            continue
        
        left_hand_side = computeInequality(s, vertices, p)
        for t in reps: #candidate F-dominant tuple
            #print(t, right_hand_side)
            if t==s:
                break
            right_hand_side = computeInequality(t, vertices, p)
            for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t F-dominates s otherwise no
                is_dominated = True
                if (left_hand_side[k] < right_hand_side[k]):
                    is_dominated = False
                    break
            if is_dominated:
                break
        if is_dominated:
            continue
        nd.append(s)   
    
    if onlyFilter:
        return nd
    return sve1f(nd, vertices, p)


def filter_nd_with_memory(datapoints, reps, vertices, p, onlyFilter = False):
    
    nd = []
#     centr =  compute_centroid(vertices)
#     #print(centr)
#     datapoints = datapoints.copy()
#     datapoints.sort(key=lambda x: sort_function(x, centr))
    
#     for s in datapoints:
#         # ps : potentially non dominated point
#         is_dominated = False
#         left_hand_side = computeInequality(s, vertices, p)
#         for t in reps: #candidate F-dominant tuple
#             #print(t, right_hand_side)
#             if dominates(t,s): #if true, t dominates s
#                 is_dominated  = True
#                 break
#             right_hand_side = computeInequality(t, vertices, p)
#             for k in range(len(right_hand_side)): #if s satisfies all the inequalities, t F-dominates s otherwise no
#                 is_dominated = True
#                 if (left_hand_side[k] < right_hand_side[k]):
#                     is_dominated = False
#                     break
#             if is_dominated:
#                 break
#         if is_dominated:
#             continue
#         nd.append(s)   
    for ps in datapoints:
        # ps : potentially non dominated point
        dominated = False
        for rep in reps:
            if dominates(rep, ps):
                dominated = True # other point is dominated
                break
        if not dominated:
            nd.append(ps)
    
    return sve1f(nd, vertices, p)


#Function that finds the representatives in parallel 
def parallel_representative_filtering(dataAsList, weights, slicesForSorting = 12, onlyFilter = True, numberReps = 30):
    
    start_indexing = time.time()

    representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0]) ) \
                                    .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, n = numberReps)) \
                                    .collect()
    end_indexing = time.time() - start_indexing
    print('Time taken to find best reps ' + str(end_indexing))
    print('Length of representatives: ' + str(len(representatives)))
    representatives = find_skyline_sfs(representatives, weights)
    print('Length of representatives after skyline query: ' + str(len(representatives)))
    
    start_parallel = time.time()
    parallel_skyline = []
    
    parallel_skyline = spark.sparkContext.parallelize(dataAsList, slicesForSorting)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    #print('Number of comparisons: '+ str(parallel_skyline))
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    
    return parallel_skyline

def parallel_representative_filtering_angular(dataAsList, weights, numSlices = 5, onlyFilter = True, numberReps = 30):

    start_indexing = time.time()

    representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0]) ) \
                                    .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, n = numberReps)) \
                                    .collect()
    end_indexing = time.time() - start_indexing
    print('Time taken to find best reps ' + str(end_indexing))
    print('Length of representatives: ' + str(len(representatives)))
    representatives = find_skyline_sfs(representatives, weights)
    print('Length of representatives after skyline query: ' + str(len(representatives)))

    start_parallel = time.time()
    parallel_skyline = []
    dimensions = len(dataAsList[0])
    parallel_skyline = spark.sparkContext.parallelize(dataAsList) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_filter_with_memory(x, representatives, weights, onlyFilter), preservesPartitioning=True)  \
                    .collect()
    end_parallel = time.time() - start_parallel
    #print('Number of comparisons: '+ str(parallel_skyline))
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))

    return parallel_skyline

def parallel_representative_filtering_imp(dataAsList, weights, slicesForSorting = 12, onlyFilter = True, givenReps = [], numberReps = 30):
    
    start_indexing = time.time()
    if not givenReps:
        representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0]) ) \
                                    .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, n = numberReps)) \
                                    .collect()
    else:
        representatives = givenReps
    end_indexing = time.time() - start_indexing
    print('Time taken to find best reps ' + str(end_indexing))
    print('Length of representatives: ' + str(len(representatives)))
    representatives = find_skyline_sfs(representatives, weights)
    print('Length of representatives after skyline query: ' + str(len(representatives)))
    
    start_parallel = time.time()
    parallel_skyline = []
    
    parallel_skyline = spark.sparkContext.parallelize(dataAsList, slicesForSorting)\
                                .mapPartitions(lambda x : filter_with_improvement(x, representatives, weights, onlyFilter)) \
                                .collect()
    
    end_parallel = time.time() - start_parallel
    print('Number of comparisons: '+ str(parallel_skyline))
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    
    return parallel_skyline

def parallel_representative_filtering_nd(dataAsList, vertices,p, slicesForSorting = 12, onlyFilter = True, givenReps = [], numberReps = 30):

    start_indexing = time.time()
    if not givenReps:
        representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0]) ) \
                                    .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, n = numberReps)) \
                                    .collect()
    else:
        representatives = givenReps
    end_indexing = time.time() - start_indexing
    print('Time taken to find best reps ' + str(end_indexing))
    print('Length of representatives: ' + str(len(representatives)))
    representatives = sve1f(representatives, vertices, p)
    print('Length of representatives after ND query: ' + str(len(representatives)))

    start_parallel = time.time()
    parallel_skyline = []

    parallel_skyline = spark.sparkContext.parallelize(dataAsList, slicesForSorting)\
                                .mapPartitions(lambda x : filter_nd_with_memory2(x, representatives, vertices, p, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    #print('Number of comparisons: '+ str(parallel_skyline))
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))

    return parallel_skyline
def parallel_representative_filtering_nd_angular(dataAsList, vertices, p, numSlices = 5, onlyFilter = True, numberReps = 30):

    start_indexing = time.time()

    representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0]) ) \
                                    .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, n = numberReps)) \
                                    .collect()
    end_indexing = time.time() - start_indexing
    print('Time taken to find best reps ' + str(end_indexing))
    print('Length of representatives: ' + str(len(representatives)))
    representatives = sve1f(representatives, vertices, p)
    print('Length of representatives after skyline query: ' + str(len(representatives)))

    start_parallel = time.time()
    parallel_skyline = []
    dimensions = len(dataAsList[0])
    parallel_skyline = spark.sparkContext.parallelize(dataAsList) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_sve1_filter_with_memory(x, representatives, vertices, p, onlyFilter), preservesPartitioning=True)  \
                    .collect()
    end_parallel = time.time() - start_parallel
    #print('Number of comparisons: '+ str(parallel_skyline))
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))

    return parallel_skyline

In [None]:
def computeList(l):

    p =l.strip().split(" ")
    point=[]
    for x in p:
        point.append(float(x))
    return point

def parallel_3P(dataAsList, weights, numReps=30, slicesForSorting = 12):
    #numReps = ceil(len(dataAsList)/1000)
    sortedData = []
    print('Slices for representative skyline ' + str(slicesForSorting))
    start = time.time()
    #merge sort on the first 
    #if len(dataAsList) <= 10**5:
    #    dataAsList.sort(key = lambda x : x[0])
    #    sortedData = dataAsList
    #else:
    #    sortedData = spark.sparkContext.parallelize(dataAsList, max(12,slicesForSorting)).sortBy(lambda x: x[0]).collect()
    dataAsList.sort(key = lambda x : x[0])
    sortedData = dataAsList
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))
    
    start_indexing = time.time()
    representatives = spark.sparkContext.parallelize(sortedData, len(dataAsList[0])) \
                                .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, numReps)) \
                                .collect()
    end_indexing = time.time() - start_indexing
    print('Length of representatives: ' + str(len(representatives)) + ', time taken to find: '+ str(end_indexing))

    start_parallel = time.time()
    parallel_skyline = spark.sparkContext.parallelize(sortedData, slicesForSorting)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights, onlyFilter = False)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    print('Length of the parallel section skyline: ' + str(len(parallel_skyline))+ ', time taken to find it: '+ str(end_parallel))
    
    return parallel_skyline

def parallel_3P2(dataset, weights, slicesForSorting = 12):
    #numReps = ceil(len(dataAsList)/1000)
    sortedData = []
    print('Slices for representative skyline ' + str(slicesForSorting))
    start = time.time()
    #merge sort on the first
    rdd = spark.sparkContext.textFile(os.getcwd() + '/datasets' + dataset, minPartitions=500)
    dataAsList = rdd.map(computeList).collect()
    dataAsList = normalize_data(dataAsList).tolist()
    print(len(dataAsList))
    print(dataAsList[0:5])

    start = time.time()
    dataAsList.sort(key = lambda x : x[0])
    #sortedData = spark.sparkContext.parallelize(dataAsList, slicesForSorting).mapPartitions().collect()

    #sortedData = spark.sparkContext.parallelize(dataAsList, max(12,slicesForSorting)).sortBy(lambda x: x[0]).collect()
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))

    start_indexing = time.time()
    representatives = spark.sparkContext.parallelize(dataAsList, len(dataAsList[0])) \
                                .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y, 100)) \
                                .collect()
    end_indexing = time.time() - start_indexing
    print('Length of representatives: ' + str(len(representatives)) + ', time taken to find: '+ str(end_indexing))

    start_parallel = time.time()
    parallel_skyline = []
    parallel_skyline = spark.sparkContext.parallelize(dataAsList, slicesForSorting)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    print('Length of the parallel section skyline: ' + str(len(parallel_skyline))+ ', time taken to find it: '+ str(end_parallel))

    return parallel_skyline

def parallel_3P_sve1f(dataAsList, vertices, p, slicesForSorting = 12):
    #numReps = ceil(len(dataAsList)/1000)
    sortedData = []
    print('Slices for representative skyline ' + str(slicesForSorting))
    start = time.time()
    #merge sort on the first 
    dataAsList.sort(key = lambda x : x[0])
    sortedData = dataAsList
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))
    
    start_indexing = time.time()
    representatives = spark.sparkContext.parallelize(sortedData, len(dataAsList[0])) \
                                .mapPartitionsWithIndex(lambda index, y: get_best_representatives(index, y)) \
                                .collect()
    end_indexing = time.time() - start_indexing
    representatives = sve1f(representatives, vertices, p)
    print('Length of representatives: ' + str(len(representatives)) + ', time taken to find: '+ str(end_indexing))

    start_parallel = time.time()
    parallel_skyline = []

    parallel_skyline = spark.sparkContext.parallelize(sortedData, slicesForSorting)\
                                .mapPartitions(lambda x : filter_nd_with_memory2(x, representatives, vertices, p)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    print('Length of the parallel section skyline: ' + str(len(parallel_skyline))+ ', time taken to find it: '+ str(end_parallel))
    
    return parallel_skyline


In [None]:
print('BNL distributed version')

#Random partitioning
def parallel_bnl(data, numberOfSlices = 30):
    start1 = time.time()

    initialValues = spark.sparkContext.parallelize(data, numberOfSlices).mapPartitions(find_skyline_bnl).collect()
    end = time.time() - start1
    print("Length of the local skylines after parallel section is : " + str(len(initialValues)) + ", time taken: " + str(end))
    skyline = find_skyline_bnl(initialValues)
    end2 = time.time() - start1
    print("Length of the skyline is : " + str(len(skyline)) + ", total time taken: " + str(end2))

In [None]:
print('SFS distributed computation')
import os
#Random partitioning
def parallel_sfs(data, weights, numberOfSlices = 100):
    start1 = time.time()

    initialValues = spark.sparkContext.parallelize(data, numberOfSlices).mapPartitions(lambda x: find_skyline_sfs(x, weights)).collect()
    end = time.time() - start1
    print("Length of the local skylines after parallel section is : " + str(len(initialValues)) + ", time taken: " + str(end))

    skyline = find_skyline_sfs(initialValues, weights)
    end2 = time.time() - start1
    print("Length of the skyline is : " + str(len(skyline)) + ", total time taken: " + str(end2))

#One-slice partitioning

def sliced_partitioning_sfs(datapoints, weights, numPartitions=100):
    start = time.time()
    datapoints.sort(key = lambda x : x[0])
    sortedData = datapoints
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))

    start1 = time.time()
    initialValues = spark.sparkContext.parallelize(sortedData, numPartitions).mapPartitions(lambda x: find_skyline_sfs(x, weights)).collect()
    end = time.time() - start1
    print("Length of the local skylines after parallel section is : " + str(len(initialValues)) + ", time taken: " + str(end))

    skyline = find_skyline_sfs(initialValues, weights)
    end2 = time.time() - start1
    print("Length of the skyline is : " + str(len(skyline)) + ", total time taken: " + str(end2))

#Angular partitioning
def parallel_angled_partitioning_sfs(dataArray, weights, numSlices = 5):
    
    dimensions = len(dataArray[0])

    #numberOfSlices = min(max(8,  ceil((sys.getsizeof(dataArray)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(dataArray) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_sfs_indexed(x, weights), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of skyline after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = find_skyline_sfs(initialResult,weights)
    end_seq = time.time() - seq_time

    print('AP: Length of the skyline is :' + str(len(finRes))+ ', total time: ' + str(end+end_seq))

def angular_sfs(dataset, weights, dimensions, numSlices = 5):

    #dimensions = len(dataArray[0])
    rdd = spark.sparkContext.textFile(os.getcwd() + '/datasets/' + dataset)
    #numberOfSlices = min(max(8,  ceil((sys.getsizeof(dataArray)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = rdd.map(lambda x : get_angular_partitionIndex2(x, dimensions, numSlices)) \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_sfs_indexed(x, weights), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('Length of skyline after parallel phase of Angular Partitioning is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = find_skyline_sfs(initialResult,weights)
    end_seq = time.time() - seq_time

    print('Length of the skyline is :' + str(len(finRes))+ ', total time: ' + str(end+end_seq))


#grid partitioning
def parallel_grid_partitioning_sfs(dataArray, weights, numSlices = 4):
    dimensions = len(dataArray[0])

    start = time.time()
    initialResults = spark.sparkContext.parallelize(dataArray) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_sfs_indexed(x, weights), preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('Length of skyline after parallel phase of Grid Partitioning is :' + str(len(initialResults)) + ",time taken: "+ str(end))
    seq_time = time.time()
    finRes = find_skyline_sfs(initialResults,weights)
    end_seq = time.time() - seq_time

    print('Length of the skyline is :' + str(len(finRes))+ ', total time: ' + str(end+end_seq))


    
# Angular partitioning with a serial grid filtering phase 
def sfs_angular_partitioning_with_serial_grid_filtering(datapoints, weights, numberOfSlices = 5):
    start = time.time()
    if numberOfSlices == -1 :
        numberOfSlices = min(max(8,  ceil((sys.getsizeof(datapoints)/1024/1000) * 0.4 ) ), 24)

    input_list = query_containers(datapoints)
    end = time.time() - start
    print('Time for the container serial query: ' + str(end))
    start_parallel = time.time()
    finalResult = parallel_angled_partitioning_sfs(input_list, weights, numberOfSlices)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

def sfs_angular_partitioning_with_serial_grid_filtering2(dataset, weights, numberOfSlices = 4):
    start = time.time()


    input_list = query_containers(dataset, numberOfSlices)
    end = time.time() - start
    print('Time for the container serial query: ' + str(end))
    start_parallel = time.time()
    finalResult = parallel_angled_partitioning_sfs(input_list, weights)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))
    
    
# Grid partitioning with a serial grid filtering phase 
def sfs_grid_partitioning_with_serial_grid_filtering(datapoints, weights, numberOfSlices = 4):

    start = time.time()
    input_list = query_containers(datapoints, numberOfSlices)
    end = time.time() - start
    print('Time for the container serial query: ' + str(end))
    start_parallel = time.time()
    finalResult = parallel_grid_partitioning_sfs(input_list, weights)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))
    

#Parallel3P SFS
def AllParallel_sfs(datapoints, weights, numPartFirst=100, numPartSecond=100, numReps = 30):

    start = time.time()
    parallel_skyline = parallel_3P(datapoints, weights, numReps, numPartFirst)
    end = time.time()-start
    start_serial = time.time()
    #numberOfSlices = min(max(8,  ceil((sys.getsizeof(parallel_skyline)/1024/1000) * 0.4 ) ), 24)
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numPartSecond)\
                                .mapPartitions(lambda x : sfs_multithread(x, parallel_skyline, weights)) \
                                .collect()    

    end_serial = time.time() - start_serial
    print('Length of the skyline: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))

def AllParallel_sfs2(dataset, weights, numberOfSlices=30):

    start = time.time()
    parallel_skyline = parallel_representative_filtering(dataset, weights, onlyFilter = False)
    end = time.time()-start
    start_serial = time.time()
    parallel_skyline.sort(key = lambda x: sort_function(x, weights))
    #numberOfSlices = min(max(8,  ceil((sys.getsizeof(parallel_skyline)/1024/1000) * 0.4 ) ), 24)
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numberOfSlices)\
                                .mapPartitions(lambda x : sfs_multithread(x, parallel_skyline, weights)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of the skyline: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))
    
def AllParallel_sfs3(dataset, weights, numberOfSlices=30, numRep = 100):

    start = time.time()
    parallel_skyline = representative_smallest(dataset, weights, numRep, onlyFilter = False)
    end = time.time()-start
    start_serial = time.time()
    #numberOfSlices = min(max(8,  ceil((sys.getsizeof(parallel_skyline)/1024/1000) * 0.4 ) ), 24)
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numberOfSlices)\
                                .mapPartitions(lambda x : sfs_multithread(x, parallel_skyline, weights)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of the skyline: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))
    



In [None]:
print('SaLSa distributed version')

#Random partitioning
def parallel_SaLSa(data, numOfSlices =30):
    start1 = time.time()

    initialValues = spark.sparkContext.parallelize(data, numOfSlices).mapPartitions(find_skyline_SaLSa).collect()
    end = time.time() - start1
    print("Length after parallel phase : " + str(len(initialValues)) + ", time taken: " + str(end))

    skyline = find_skyline_SaLSa(initialValues)
    end2 = time.time() - start1
    print("Length of the skyline is : " + str(len(skyline)) + ", total time taken: " + str(end2))

def parallel_angled_partitioning_saLSa(dataArray, numSlices = 2):
    dimensions = len(dataArray[0])
    
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(dataArray, numSlices) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(execute_saLSa_indexed, preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of skyline after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = find_skyline_SaLSa(initialResult)
    end_seq = time.time() - seq_time

    print('AP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq) )
    
def naive_grid_partitioning_saLSa(dataArray, numSlices = 2):
    dimensions = len(dataArray[0])
    
    print('Number of slices := ' + str(numSlices**dimensions))
    
    start = time.time()
    m2 = spark.sparkContext.parallelize(dataArray, numSlices) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(execute_saLSa_indexed, preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('GP: Length of skyline after parallel phase is :' + str(len(m2)) + ",time taken: "+ str(end))
    seq_time = time.time()
    finRes = find_skyline_SaLSa(m2)
    end_seq = time.time() - seq_time

    print('AP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    
def saLSa_angular_partitioning_with_serial_grid_filtering(datapoints, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    #finalResult = parallel_sfs(input_list, weights)
    finalResult = naive_grid_partitioning_saLSa(input_list)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

def saLSa_grid_partitioning_with_serial_grid_filtering(datapoints, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    #finalResult = parallel_sfs(input_list, weights)
    finalResult = parallel_angled_partitioning_saLSa(input_list)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))
    

def parallel3P_salsa(datapoints, weights, numSlices=12):
    start = time.time()
    parallel_skyline = parallel_3P(datapoints, weights, numSlices)
    end = time.time()-start
    start_serial = time.time()
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numSlices)\
                                .mapPartitions(lambda x : salsa_multithread(x, parallel_skyline)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of the skyline: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))

In [None]:
print('SVE1 distributed computation')
#Random partitioning
def parallelSVE1(data, vertices, p, numOfSlices = -1):
    start = time.time()
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(data)/1024/1000) * 0.4 ) ), 24)
    initialValues = spark.sparkContext.parallelize(data,numOfSlices).mapPartitions(lambda x: sve1(x, vertices, p),preservesPartitioning = True).collect()
    
    end = time.time()-start
    print("Length of nd after parallel section : " + str(len(initialValues)) + ", time taken: " + str(end))
    start2 = time.time()
    nd = sve1(initialValues,vertices, p)
    end2= time.time()-start2
    print("Length of nd is : " + str(len(nd)) + ", time taken: " + str(end2))
    print("Total time nd: " +str(end+end2))
    
    return nd

def parallel_angled_partitioning_sve1(dataArray, vertices, p, numSlices = 2):
    
    dimensions = len(dataArray[0])

    numOfSlices = min(max(8,  ceil((sys.getsizeof(dataArray)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(dataArray, numOfSlices) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_sve1_indexed(x, vertices, p), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of nd after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = sve1(initialResult,vertices, p)
    end_seq = time.time() - seq_time

    print('AP: Length of nd is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq) )
    return finRes

def naive_grid_partitioning_sve1(dataArray, vertices, p, numSlices = 2):
    dimensions = len(dataArray[0])
    
    print('Number of slices := ' + str(numSlices**dimensions))

    numOfSlices = min(max(8,  ceil((sys.getsizeof(data)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    initialResult = spark.sparkContext.parallelize(dataArray, numOfSlices) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_sve1_indexed(x, vertices, p), preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('GP: Length of nd after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))

    seq_time = time.time()
    finRes = sve1(initialResult,vertices,p)
    end_seq = time.time() - seq_time

    print('GP: Length of nd is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq) )
    return finRes

def sve1_angular_partitioning_with_serial_grid_filtering(datapoints, vertices, p, numOfSlices = -1):
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(data)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    input_list = query_containers(datapoints, numOfSlices)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = parallel_angled_partitioning_sve1(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))
    
def sve1_grid_partitioning_with_serial_grid_filtering(datapoints, vertices, p, numOfSlices = -1):
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(data)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    input_list = query_containers(datapoints, numOfSlices)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = naive_grid_partitioning_sve1(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))
    


In [None]:
print('SVE1F distributed computation')
import random
#Random partitioning
def parallelSVE1F(data, vertices, p, numOfSlices = 100):

    start = time.time()
    initialValues = spark.sparkContext.parallelize(data,numOfSlices).mapPartitions(lambda x: sve1f(x, vertices, p),
                                                             preservesPartitioning = True).collect()
    end = time.time()-start
    print("Length of nd after parallel section: " + str(len(initialValues)) + ", time taken: " + str(end))
    start_seq = time.time()
    nd = sve1f(initialValues, vertices, p)
    end_seq= time.time()-start_seq
    print("Length of nd is : " + str(len(nd)) + ", time taken: " + str(end_seq))
    print("Total time nd: " +str(end+end_seq))
    
    return nd

#One-slice partitioning

def sliced_partitioning_sve1f(datapoints, vertices, p, numPartitions=100):
    start = time.time()
    datapoints.sort(key = lambda x : x[0])
    sortedData = datapoints
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))

    start1 = time.time()
    initialValues = spark.sparkContext.parallelize(sortedData, numPartitions).mapPartitions(lambda x: sve1f(x, vertices, p)).collect()
    end = time.time() - start1
    print("Length of the local skylines after parallel section is : " + str(len(initialValues)) + ", time taken: " + str(end))

    nd = sve1f(initialValues, vertices, p)
    end2 = time.time() - start1
    print("Length of nd is : " + str(len(nd)) + ", total time taken: " + str(end2))

def parallel_angled_partitioning_sve1f(dataArray, vertices, p, numSlices = 5):
    
    dimensions = len(dataArray[0])
    #numOfSlices = min(max(8,  ceil((sys.getsizeof(dataArray)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(dataArray) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_sve1f_indexed(x, vertices, p))  \
                    .collect()
    end = time.time()- start
    print('AP: Length of nd after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = sve1f(initialResult,vertices, p)
    end_seq = time.time() - seq_time

    print('AP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes
    
def naive_grid_partitioning_sve1f(dataArray, vertices, p, numSlices = 4):
    dimensions = len(dataArray[0])


    print('Number of slices := ' + str(numSlices**dimensions))
    
    start = time.time()
    initialResult = spark.sparkContext.parallelize(dataArray) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_sve1f_indexed(x, vertices, p)) \
                    .collect()
    end = time.time()- start
    print('GP: Length of nd after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))

    seq_time = time.time()
    finRes = sve1f(initialResult,vertices,p)
    end_seq = time.time() - seq_time

    print('GP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes

def sve1f_grid_partitioning_with_serial_grid_filtering(datapoints, vertices, p, numOfSlices = -1):
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(datapoints)/1024/1000) * 0.4 ) ), 24)
    start = time.time()
    input_list = query_containers(datapoints, numOfSlices)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    #finalResult = parallel_sfs(input_list, weights)
    finalResult = naive_grid_partitioning_sve1f(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

def sve1f_angular_partitioning_with_serial_grid_filtering(datapoints, vertices, p):

    start = time.time()
    input_list = query_containers(datapoints)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    #finalResult = parallel_sfs(input_list, weights)
    finalResult = parallel_angled_partitioning_sve1f(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))


def allParallel_sve1f(datapoints, vertices, p, numOfSlices=30):

    start = time.time()
    centroids = compute_centroid(vertices)
    parallel_skyline = parallel_3P_sve1f(datapoints, vertices, p, numOfSlices)
    end = time.time()-start
    start_serial = time.time()
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numOfSlices*2)\
                                .mapPartitions(lambda x : sve1f_multithread(x, parallel_skyline, vertices, p)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of nd: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))
    return parallel_global_skyline

def allParallel_sve1f2(datapoints, vertices, p, numOfSlices=20):

    start = time.time()
    parallel_skyline = parallel_representative_filtering_nd(datapoints, vertices, p, numOfSlices, onlyFilter=False)
    end = time.time()-start
    start_serial = time.time()
    dimensions = len(datapoints[0])
    datapoints.sort(key=lambda x: x[0])
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numOfSlices)\
                                .mapPartitions(lambda x : sve1f_multithread(x, parallel_skyline, vertices, p)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of nd: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))
    return parallel_global_skyline

def allParallel_sve1f3(datapoints, vertices, p, numOfSlices=200):

    start = time.time()
    centr = compute_centroid(vertices)
    parallel_skyline = parallel_representative_filtering(datapoints, centr,numOfSlices, onlyFilter=False)
    end = time.time()-start
    start_serial = time.time()
    dimensions = len(datapoints[0])
    datapoints.sort(key=lambda x: x[0])
    parallel_global_skyline = spark.sparkContext.parallelize(parallel_skyline, numOfSlices)\
                                .mapPartitions(lambda x : sve1f_multithread(x, parallel_skyline, vertices, p)) \
                                .collect()

    end_serial = time.time() - start_serial
    print('Length of nd: ' + str(len(parallel_global_skyline)) + ', Time taken to find: ' + str(end_serial))
    print('Total time taken with representatives: ' + str(end_serial+end))
    return parallel_global_skyline

In [None]:
print('PO primal gurobipy distributed computation')
#Random partitioning
def parallelPOND_primal(nd, constraints, k_, numSlices = 60):
    start = time.time()
    initialValues = spark.sparkContext.parallelize(nd, numSlices).mapPartitions(lambda x: po_primal(x, constraints, k_), preservesPartitioning = True).collect()
    end = time.time()-start
    print("Length of po after parallel section: " + str(len(initialValues)) + ", time taken: " + str(end))
    start_seq = time.time()
    po = po_primal(initialValues, constraints, k_)
    end_seq= time.time()-start_seq
    print("Length of po is : " + str(len(po)) + ", time taken: " + str(end_seq))
    print("Total time po: " +str(end+end_seq+end_nd))

def parallel_angled_partitioning_pond_primal(nd, constraints, k_, numSlices = 5):
    
    dimensions = len(nd[0])
    
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(nd, numSlices**(dimensions-1)) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_pond_indexed_primal(x, constraints, k_), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = po_primal(initialResult,constraints, k_)
    end_seq = time.time() - seq_time

    print('AP: Length of po is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes

def naive_grid_partitioning_pond_primal(nd, constraints, k_, numSlices = 6):
    dimensions = len(nd[0])
    
    print('Number of slices := ' + str(numSlices**dimensions))
    
    start = time.time()
    initialResult = spark.sparkContext.parallelize(nd, numSlices) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_pond_indexed_primal(x, constraints, k_), preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('GP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))

    seq_time = time.time()
    finRes = po_primal(initialResult,constraints, k_)
    end_seq = time.time() - seq_time

    print('GP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes
    
def pond_primal_angled_partitioning_with_serial_grid_filtering(datapoints, constraints, k_, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = naive_grid_partitioning_pond_primal(input_list, constraints, k_)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

def pond_primal_grid_partitioning_with_serial_grid_filtering(datapoints, constraints, k_, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = naive_grid_partitioning_pond_primal(input_list, constraints, k_)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end)

In [None]:
print('PO primal pulp distributed computation')
#Random partitioning
import random
def parallelPOND_primal_pulp(nd, constraints, k_, numSlices = 60):
    start = time.time()
    random.shuffle(nd)
    initialValues = spark.sparkContext.parallelize(nd, numSlices).mapPartitions(lambda x: po_primal_pulp(x, constraints, k_)).collect()
    end = time.time()-start
    print("Length of po after parallel section: " + str(len(initialValues)) + ", time taken: " + str(end))
    start_seq = time.time()
    po = po_primal(initialValues, constraints, k_)
    end_seq= time.time()-start_seq
    print("Length of po is : " + str(len(po)) + ", time taken: " + str(end_seq))
    print("Total time po: " +str(end+end_seq))

def parallel_angled_partitioning_pond_primal_pulp(nd, constraints, k_, numSlices = 5):

    dimensions = len(nd[0])

    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(nd, numSlices**(dimensions-1)) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_pond_indexed_primal_pulp(x, constraints, k_), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = po_primal(initialResult,constraints, k_)
    end_seq = time.time() - seq_time

    print('AP: Length of po is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes

def naive_grid_partitioning_pond_primal_pulp(nd, constraints, k_, numSlices = 6):
    dimensions = len(nd[0])

    print('Number of slices := ' + str(numSlices**dimensions))

    start = time.time()
    initialResult = spark.sparkContext.parallelize(nd, numSlices) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_pond_indexed_primal_pulp(x, constraints, k_), preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('GP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))

    seq_time = time.time()
    finRes = po_primal(initialResult,constraints, k_)
    end_seq = time.time() - seq_time

    print('GP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    return finRes

#One-slice partitioning

def sliced_partitioning_po_primal_pulp(datapoints, contraints, k_, numPartitions=100):
    start = time.time()
    datapoints.sort(key = lambda x : x[0])
    sortedData = datapoints
    end = time.time() - start
    print('Time taken for sorting: '+ str(end))

    start1 = time.time()
    initialValues = spark.sparkContext.parallelize(sortedData, numPartitions).mapPartitions(lambda x: po_primal_pulp(x, constraints, k_)).collect()
    end = time.time() - start1
    print("Length of the local po set after parallel section is : " + str(len(initialValues)) + ", time taken: " + str(end))

    nd = po_primal(initialValues, constraints, k_)
    end2 = time.time() - start1
    print("Length of PO is : " + str(len(nd)) + ", total time taken: " + str(end2))

def allParallel_pond_primal(nd, constraints, k_, numOfSlices=-1):
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(datapoints)/1024/1000) * 0.4 ) ), 24)
    start_parallel = time.time()
    dimensions = len(nd[0])
    random.shuffle(nd)
    initialResult = spark.sparkContext.parallelize(nd, numOfSlices)\
                                .mapPartitions(lambda x : po_primal_pulp(x, constraints, k_)) \
                                .collect()
    end_parallel = time.time()-start_parallel
    print('Length of po after parallel phase: ' + str(len(initialResult)) + ', Time taken to find: ' + str(end_parallel))
    po = spark.sparkContext.parallelize(initialResult, numOfSlices*2)\
                                .mapPartitions(lambda x : pond_primal_pulp_multithread(x, initialResult, constraints, k_)) \
                                .collect()
    end_parallel = time.time()-start_parallel
    print('Length of po: ' + str(len(po)) + ', Time taken to find: ' + str(end_parallel))

    return po

def allParallel_pond_primal_angular(nd, constraints, k_, numOfSlices=100, numSlices = 5):
    if numOfSlices == -1 :
        numOfSlices = min(max(8,  ceil((sys.getsizeof(datapoints)/1024/1000) * 0.4 ) ), 24)
    start_parallel = time.time()
    dimensions = len(nd[0])
    initialResult = spark.sparkContext.parallelize(nd, numSlices**(dimensions-1)) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_pond_indexed_primal_pulp(x, constraints, k_), preservesPartitioning=True)  \
                    .collect()
    end_parallel = time.time()-start_parallel
    print('Length of po after parallel phase: ' + str(len(initialResult)) + ', Time taken to find: ' + str(end_parallel))
    po = spark.sparkContext.parallelize(initialResult, numOfSlices)\
                                .mapPartitions(lambda x : pond_primal_pulp_multithread(x, initialResult, constraints, k_)) \
                                .collect()
    end_parallel = time.time()-start_parallel
    print('Length of po: ' + str(len(po)) + ', Time taken to find: ' + str(end_parallel))

    return po

In [None]:
print('PO dual gurobipy distributed computation')
#Random partitioning
def parallelPOND_dual(nd, vertices, p, numSlices = 16):
    start = time.time()
    initialValues = spark.sparkContext.parallelize(nd, numSlices).mapPartitions(lambda x: po_dual(x, vertices, p), preservesPartitioning = True).collect()
    end = time.time()-start
    print("Length of po after parallel section: " + str(len(initialValues)) + ", time taken: " + str(end))
    start_seq = time.time()
    po = po_dual(initialValues, vertices, p)
    end_seq= time.time()-start_seq
    print("Length of po is : " + str(len(po)) + ", time taken: " + str(end_seq))
    print("Total time po: " +str(end+end_seq+end_nd))

def parallel_angled_partitioning_pond_dual(nd, vertices, p, numSlices = 2):
    
    dimensions = len(nd[0])
    
    start = time.time()
    # Partition By divides the dataset by the primary key of each tuple
    initialResult = spark.sparkContext.parallelize(nd, numSlices) \
                    .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                    .partitionBy(numSlices**(dimensions-1)) \
                    .mapPartitions(lambda x : execute_pond_indexed_dual(x, vertices, p), preservesPartitioning=True)  \
                    .collect()
    end = time.time()- start
    print('AP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))
    seq_time = time.time()
    finRes = po_dual(initialResult,vertices, p)
    end_seq = time.time() - seq_time

    print('AP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))

def naive_grid_partitioning_pond_dual(nd, vertices, p, numSlices = 2):
    dimensions = len(nd[0])
    
    print('Number of slices := ' + str(numSlices**dimensions))
    
    start = time.time()
    initialResult = spark.sparkContext.parallelize(nd, numSlices) \
                    .map(lambda x : ( get_grid_partition_index(x, numSlices), x ) )  \
                    .partitionBy(numSlices**dimensions) \
                    .mapPartitions(lambda x : execute_pond_indexed_dual(x, vertices, p), preservesPartitioning=True) \
                    .collect()
    end = time.time()- start
    print('GP: Length of po after parallel phase is :' + str(len(initialResult))+ ", time taken: "+ str(end))

    seq_time = time.time()
    finRes = po_dual(initialResult,vertices, p)
    end_seq = time.time() - seq_time

    print('GP: Length of the skyline is :' + str(len(finRes)) + ', total time taken: '+str(end+end_seq))
    
def parallel_angular_partitioning_pond_dual(datapoints, vertices, p, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = parallel_angled_partitioning_pond_dual(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

def pond_primal_grid_partitioning_with_serial_grid_filtering(datapoints, constraints, k_, numSlicesPerDimension = 12):
    start = time.time()
    input_list = query_containers(datapoints, numSlicesPerDimension)
    end = time.time() - start
    print(str(end) + ' for the container serial query')
    start_parallel = time.time()
    finalResult = naive_grid_partitioning_pond_primal(input_list, vertices, p)
    end_parallel = time.time() - start_parallel
    print('Total time: '+ str(end_parallel+end))

In [None]:
print('Sorted method and others')
def euclidean_dist(point):
    tot = 0
    for p in point:
        tot = tot + p**2
    return tot 
def get_best_representatives_sort(index, dataset, n = 30):
    # index is the dimension which we should check
    # is a tuple with area_covered as the first element and the n-dimensional point as the second element

    best_n_points = []
    
    for i in range(n):
        best_n_points.append((0,[]))
    counter = 0
    for point in dataset:
        value = euclidean_dist(point)
        rep_index = floor(point[index]*n)
        if best_n_points[rep_index][0] == 0 or best_n_points[rep_index][0] > value:
            best_n_points[rep_index] = (value, point)
    best_n_points = [x[1] for x in best_n_points if x[1]]
    return best_n_points

def representative_sorted_euclidean(datapoints, weights, numRep, numSlices=12, onlyFilter = True):
    data_sorted = datapoints.copy()
    data_sorted.sort(key=lambda x: euclidean_dist(x))
    start_parallel = time.time()
    parallel_skyline = []
    numRep = numRep*len(datapoints[0])
    
    representative = find_skyline_sfs(data_sorted[0:numRep], weights)
    #print(representative[0:10])
    end = time.time()-start_parallel
    print('Length of representatives: ' + str(len(representative)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, numSlices)\
                                .mapPartitions(lambda x : filter_with_memory(x, representative, weights, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def representative_sorted_centroids(datapoints, weights, numRep, numSlices=12, onlyFilter = True):
    data_sorted = datapoints.copy()
    data_sorted.sort(key=lambda x: sort_function(x, weights))
    start_parallel = time.time()
    parallel_skyline = []
    numRep = numRep*len(datapoints[0])

    representative = find_skyline_sfs(data_sorted[0:numRep], weights)
    #print(representative[0:10])
    end = time.time()-start_parallel
    print('Length of representatives: ' + str(len(representative)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, numSlices)\
                                .mapPartitions(lambda x : filter_with_memory(x, representative, weights, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel

    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def representative_sorted2(datapoints, weights, numRep, numSlices=12):
    data_sorted = datapoints.copy()
    data_sorted.sort(key=lambda x: x[0])
    start_parallel = time.time()
    parallel_skyline = []
    #numRep = numRep*len(datapoints[0])
    representatives = spark.sparkContext.parallelize(data_sorted, len(data_sorted[0])) \
                                .mapPartitionsWithIndex(lambda index, y: get_best_representatives_sort(index, y, numRep)) \
                                .collect()
    representative = find_skyline_sfs(representatives, weights)
    #print(representative[0:10])
    end = time.time()-start_parallel
    print('Length of representatives: ' + str(len(representative)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, numSlices)\
                                .mapPartitions(lambda x : filter_with_memory(x, representative, weights, onlyFilter=True)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def representative_sorted_nd(datapoints, vertices, p, numRep, numSlices=12, onlyFilter = True):
    centroids = compute_centroid(vertices)
    data_sorted = datapoints.copy()
    data_sorted.sort(key=lambda x: sort_function(x, centroids))
    start_parallel = time.time()
    parallel_skyline = []
    start = time.time()
    numRep = numRep*len(datapoints[0])
    representative = sve1f(data_sorted[0:numRep], vertices, p)
    end = time.time()-start
    print('Length of representatives: ' + str(len(representative)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, numSlices)\
                                .mapPartitions(lambda x : filter_nd_with_memory2(x, representative, vertices, p, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def representative_sorted_nd_angular(datapoints, vertices, p, numRep, numSlices):
    centroids = compute_centroid(vertices)
    data_sorted = datapoints.copy()
    data_sorted.sort(key=lambda x: sort_function(x, centroids))
    start_parallel = time.time()
    parallel_skyline = []
    dimensions = len(datapoints[0])
    start = time.time()
    numRep = numRep*dimensions
    representative = sve1f(data_sorted[0:numRep], vertices, p)
    end = time.time()-start
    print('Length of representatives: ' + str(len(representative)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, numSlices**(dimensions-1)) \
                        .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                        .partitionBy(numSlices**(dimensions-1)) \
                        .mapPartitions(lambda x : execute_sve1_filter_with_memory(x, representative, vertices, p)) \
                        .collect()
    end_parallel = time.time() - start_parallel
    
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def get_angular_representatives(dataset, n, weights):
    if not isinstance(dataset, list):
        dataset = list(dataset)

    dataset.sort(key=lambda x: sort_function(x,weights))
    if len(dataset) >= n:
        return dataset[0:n-1]
    return dataset

def area_function(point):
    area_covered = 1
    for value in point:
        area_covered = area_covered * (1-value)
    return area_covered

def get_angular_representatives2(dataset, n, weights):
    if not isinstance(dataset, list):
        dataset = list(dataset)

    dataset.sort(key=lambda x: area_function(x), reverse = True)
    if len(dataset) >= n:
        return dataset[0:n-1]
    return dataset



def execute_get_angular_representatives(input_list, numRepr, weights):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = get_angular_representatives(nd, numRepr, weights)
    return nd

def execute_get_angular_representatives2(input_list, numRepr, weights):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = get_angular_representatives2(nd, numRepr, weights)
    return nd
def execute_filter_with_memory(input_list, repr, weights, onlyFilter):
    i = 0
    nd = []
    for el_list in input_list:
        nd.append(el_list[1])
    nd = filter_with_memory(nd, repr, weights, onlyFilter)
    return nd

def representative_smallest(datapoints, weights, numRepr, numSlices = 4, onlyFilter = True):

    dimensions = len(datapoints[0])
    numRepr = floor(numRepr/(numSlices**(dimensions-1)))

    rep = spark.sparkContext.parallelize(datapoints) \
                        .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                        .partitionBy(numSlices**(dimensions-1)) \
                        .mapPartitions(lambda x : execute_get_angular_representatives(x, numRepr, weights)) \
                        .collect()
    start_parallel = time.time()
    print(len(rep))
    rep = find_skyline_sfs(rep, weights)
    end = time.time()-start
    print('Length of representatives: ' + str(len(rep)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, 12)\
                                .mapPartitions(lambda x : filter_with_memory(x, rep, weights, onlyFilter)) \
                                .collect()
    end_parallel = time.time() - start_parallel
    
    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def representative_smallest_nd(datapoints, vertices, p, numRepr, numSlices = 2):

    dimensions = len(datapoints[0])
    numRepr = floor(numRepr/(numSlices**(dimensions-1)))
    weights = compute_centroid(vertices)
    representatives = spark.sparkContext.parallelize(datapoints) \
                        .map(lambda x : ( get_angular_partitionIndex(x, dimensions, numSlices)  , x) )  \
                        .partitionBy(numSlices**(dimensions-1)) \
                        .mapPartitions(lambda x : execute_get_angular_representatives(x, numRepr, weights)) \
                        .collect()
    start_parallel = time.time()
    print(len(representatives))
    representatives = sve1f(representatives, vertices, p)
    end = time.time()-start
    print('Length of representatives: ' + str(len(representatives)) + ', time taken to find: '+ str(end))
    parallel_skyline = spark.sparkContext.parallelize(datapoints, 12)\
                                .mapPartitions(lambda x : filter_nd_with_memory2(x, representatives, vertices, p, onlyFilter=True)) \
                                .collect()
    end_parallel = time.time() - start_parallel

    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline


def grid_repr(datapoints, numSlicesPerDimension, weights, numRep):

    limit = 1 / (numSlicesPerDimension)
    #print('iterating - limit: ' +str(limit))
    dimensions = len(datapoints[0])
    num_slices_in_space = numSlicesPerDimension**dimensions
    containerList = []
    # create N square containers with each container having the datapoints contained and an index
    for i in range(num_slices_in_space):
        worst = []
        best =[]
        for j in range(dimensions):
            #inizializzazione worst tuple
            index_w = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            worst.insert(j, index_w * limit + limit)
            #inizializzazione best tuple
            index_b = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            best.insert(j, index_b * limit)

        containerList.insert(i, Container(worst, best, []))

    for dp in datapoints:
        index = 0
        for i in range(len(dp)):
            if dp[i] >= 1:
                index = index + floor(0.99999 / limit) * (numSlicesPerDimension**i)
            else:
                index = index + floor(dp[i] / limit) * (numSlicesPerDimension**i)
        (containerList[index].dataContained).append(dp)

    print("Number of containers before filtering: " + str(len(containerList)))

    print(len(containerList))
    repr = []
    k = 0
    while k < len(containerList):
        #print(k)
        #print(len(containerList[k].dataContained))
        if len(containerList[k].dataContained)==0:
            k += 1
            continue
        containerList[k].dataContained.sort(key=lambda x: sort_function(x,weights))
        #print(containerList[k].dataContained[0:10])
        r = find_skyline_sfs(containerList[k].dataContained[0:numRep], weights)
        repr= repr + r
        k =(floor(k/numSlicesPerDimension)+1)*numSlicesPerDimension
    return repr

def grid_repr2(datapoints, numSlicesPerDimension, weights, numRep):

    limit = 1 / (numSlicesPerDimension)
    #print('iterating - limit: ' +str(limit))
    dimensions = len(datapoints[0])
    num_slices_in_space = numSlicesPerDimension**dimensions
    containerList = []
    # create N square containers with each container having the datapoints contained and an index
    for i in range(num_slices_in_space):
        worst = []
        best =[]
        for j in range(dimensions):
            #inizializzazione worst tuple
            index_w = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            worst.insert(j, index_w * limit + limit)
            #inizializzazione best tuple
            index_b = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            best.insert(j, index_b * limit)

        containerList.insert(i, Container(worst, best, []))

    for dp in datapoints:
        index = 0
        for i in range(len(dp)):
            if dp[i] >= 1:
                index = index + floor(0.99999 / limit) * (numSlicesPerDimension**i)
            else:
                index = index + floor(dp[i] / limit) * (numSlicesPerDimension**i)
        (containerList[index].dataContained).append(dp)

    print(len(containerList))
    repr = []
    k = 0
    i = 0
    while k < len(containerList):
        #print(k)
        #print(i)
        #print(len(containerList[k].dataContained))
        if len(containerList[i].dataContained)==0:
            if(i < (k+numSlicesPerDimension**2)-1):
                i = i + numSlicesPerDimension + 1
                continue
            else:
                k = k + numSlicesPerDimension**2
                i = k
                continue
        containerList[i].dataContained.sort(key=lambda x: sort_function(x,weights))
        #print(containerList[k].dataContained[0:10])
        r = find_skyline_sfs(containerList[i].dataContained[0:numRep], weights)
        repr= repr + r
        k = k + numSlicesPerDimension**2
        i = k
    return repr

def grid_repr3(datapoints, numSlicesPerDimension, weights, numRep):

    limit = 1 / (numSlicesPerDimension)
    #print('iterating - limit: ' +str(limit))
    dimensions = len(datapoints[0])
    num_slices_in_space = numSlicesPerDimension**dimensions
    containerList = []
    # create N square containers with each container having the datapoints contained and an index
    for i in range(num_slices_in_space):
        worst = []
        best =[]
        for j in range(dimensions):
            #inizializzazione worst tuple
            index_w = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            worst.insert(j, index_w * limit + limit)
            #inizializzazione best tuple
            index_b = floor( i / (numSlicesPerDimension**j) ) % numSlicesPerDimension
            best.insert(j, index_b * limit)

        containerList.insert(i, Container(worst, best, []))

    for dp in datapoints:
        index = 0
        for i in range(len(dp)):
            if dp[i] >= 1:
                index = index + floor(0.99999 / limit) * (numSlicesPerDimension**i)
            else:
                index = index + floor(dp[i] / limit) * (numSlicesPerDimension**i)
        (containerList[index].dataContained).append(dp)

    print(len(containerList))
    repr = []
    k = 0
    j = 0
    t = 0
    while k < len(containerList)-1:
        print(k)
        j = k
        t = k + numSlicesPerDimension
        #print(i)
        #print(len(containerList[k].dataContained))
        while j < k+numSlicesPerDimension-1:
            i = j
            while(True):
                if len(containerList[i].dataContained)==0:
                    if(i < (k+numSlicesPerDimension**2)-1):
                        i = i + numSlicesPerDimension + 1
                        continue
                    else:
                        break
                containerList[i].dataContained.sort(key=lambda x: sort_function(x,weights))
                r = find_skyline_sfs(containerList[i].dataContained[0:numRep], weights)
                repr= repr + r
                break
            j = j + 1
        while t < k-1+3*numSlicesPerDimension:
            i = t
            while(True):
                if len(containerList[i].dataContained)==0:
                    if(i < (k+numSlicesPerDimension**2)-1):
                        i = i + numSlicesPerDimension + 1
                        continue
                    else:
                        break
                containerList[i].dataContained.sort(key=lambda x: sort_function(x,weights))
                r = find_skyline_sfs(containerList[i].dataContained[0:numRep], weights)
                repr= repr + r
                break
            t = t+ numSlicesPerDimension
        k = k + numSlicesPerDimension**2
        #print(containerList[k].dataContained[0:10])




    return repr

def grid_representatives(datapoints, weights, n=4, numRep=10):
    start = time.time()
    representatives = grid_repr(datapoints, n, weights, numRep)
    end = time.time() - start
    print('Length rep: '+ str(len(representatives))+ 'time taken: '+str(end))
    #print(representatives)
    parallel_skyline = spark.sparkContext.parallelize(datapoints, 12)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights, onlyFilter=True)) \
                                .collect()
    end_parallel = time.time() - start

    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def grid_representatives2(datapoints, weights, n=4, numRep=10):
    start = time.time()
    representatives = grid_repr2(datapoints, n, weights, numRep)
    end = time.time() - start
    print('Length rep: '+ str(len(representatives))+ 'time taken: '+str(end))
    #print(representatives)
    parallel_skyline = spark.sparkContext.parallelize(datapoints, 12)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights, onlyFilter=True)) \
                                .collect()
    end_parallel = time.time() - start

    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline

def grid_representatives3(datapoints, weights, n=4, numRep=10):
    start = time.time()
    representatives = grid_repr3(datapoints, n, weights, numRep)
    end = time.time() - start
    print('Length rep: '+ str(len(representatives))+ 'time taken: '+str(end))
    #print(representatives)
    parallel_skyline = spark.sparkContext.parallelize(datapoints, 12)\
                                .mapPartitions(lambda x : filter_with_memory(x, representatives, weights, onlyFilter=True)) \
                                .collect()
    end_parallel = time.time() - start

    print('Time taken to filter: ' +str(end_parallel))
    print('Length of the data after filter: ' + str(len(parallel_skyline)))
    return parallel_skyline
