# Change the cardinality

This notebook allows to test the algorithms by changing the cardinality of the dataset

In [1]:
import dataset
from skyline import Skyline
from matplotlib import pyplot as plt

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [2]:
files_to_distribute = [
    "/home/pindozzi/skyline_code/skyline.py",
    "/home/pindozzi/skyline_code/skyline_debug.py",
    "/home/pindozzi/skyline_code/skylineV2.py",
    "/home/pindozzi/skyline_code/skylineV3.py",
    "/home/pindozzi/skyline_code/utils.py",
    "/home/pindozzi/skyline_code/accumulator.py",
    "/home/pindozzi/skyline_code/dataset.py",
    "/home/pindozzi/skyline_code/grid_filtering.py"
]

# Creazione della stringa con i percorsi dei file separati da virgole
files_string = ",".join(files_to_distribute)

In [None]:
spark = SparkSession.builder.\
        master("spark://10.75.4.191:7077").config("spark.files", files_string).getOrCreate()
print("spark session created")

In [4]:
skyline = Skyline()
weights = [1.0, 1.0, 1.0, 1.0]

# Parallel Algorithms

In [5]:
dataConfiguration = dataset.DataGenConfig(spreadPercentage=100, dataRange=[0,1])

# uncomment the data distribution you prefer
dataConfiguration.setAntiCorrelated() 
# dataConfiguration.setIndependent() 
# dataConfiguration.setCorrelated() 
dataConfiguration.numberOfDimensions = 4

max_num_of_points = int(100*1e6)

In [None]:
#These rows of code are needed just to populate the x axis of the graphs
points_array = [0, int(1e6), int(10*(1e6)), int(25*(1e6)), int(50*(1e6)), int(75*(1e6)), int(100*(1e6))]
print(points_array)

In [None]:
# obtain an array of different datasets (with increasing size)
datasets_array = []
for num in points_array[1:]:
    dataConfiguration.setNumberOfData(num)
    dataset = dataset.dataGenerator(dataConfiguration)
    datasets_array.append(dataset)

Iterate through the array of dataset in order to teset the algorithms with different dataset sizes

In [None]:
random_partitioning_sfs = [0]


for data in datasets_array:
    random = skyline.random_partitioning_sfs(spark, data, weights, 120)
    random_partitioning_sfs.append(random)

In [None]:
grid_partitioning_sfs = [0]

for data in datasets_array:
    grid = skyline.parallel_grid_partitioning_sfs(spark, data, weights, 6)
    grid_partitioning_sfs.append(grid)

In [None]:
angular_partitioning_sfs = [0]

for data in datasets_array:
    angular = skyline.parallel_angled_partitioning_sfs(spark, data, weights, 6)
    angular_partitioning_sfs.append(angular)

In [None]:
one_slice_partitioning_sfs = [0]

for data in datasets_array:
    one_slice = skyline.sliced_partitioning_sfs(spark, data, weights, 120)
    one_slice_partitioning_sfs.append(one_slice)

In [None]:
plt.plot(points_array, one_slice_partitioning_sfs, "r-", label='sliced_partitioning_sfs')
plt.plot(points_array, random_partitioning_sfs, "b-", label='random_partitioning_sfs')
plt.plot(points_array, grid_partitioning_sfs, "g-", label='grid_partitioning_sfs')
plt.plot(points_array, angular_partitioning_sfs, "k-", label='angular_partitioning_sfs')
plt.legend()
plt.show()

# Improved Parallel Algorithms

In [None]:
angular_with_dm = [0]

for data in datasets_array:
    time = skyline.angular_partition_with_sfs_representative_filtering_dominance_region(spark, data, weights, 6, 100)
    angular_with_dm.append(time)

In [None]:
angular_with_angular = [0]

for data in datasets_array:
    time = skyline.angular_partition_with_sfs_representative_filtering_angular(spark, data, weights, 6, 100)
    angular_with_angular.append(time)

In [None]:
sliced_with_dm = [0]

for data in datasets_array:
    time = skyline.one_slice_with_sfs_representative_dominance_region(spark, data, weights, 120, 6, 100)
    sliced_with_dm.append(time)

In [None]:
sliced_with_angular = [0]

for data in datasets_array:
    time = skyline.one_slice_with_sfs_representative_filtering_angular(spark, data, weights, 120, 6, 100)
    sliced_with_angular.append(time)

In [None]:
all_parallel = [0]

for data in datasets_array:
    time = skyline.AllParallel_sfs(spark, data, weights, numReps = 100)
    sliced_with_angular.append(time)

In [None]:
plt.plot(points_array, angular_with_dm, "r-", label='angular_partitioning_with_sfs_dominance_region')
plt.plot(points_array, angular_with_angular, "b-", label='angular_partitioning_with_sfs_angular')
plt.plot(points_array, sliced_with_dm, "g-", label='one_slice_with_sfs_dominance_region')
#plt.plot(points_array, sliced_with_angular, "k-", label='one_slice_with_sfs_repr_angular')
plt.plot(points_array, all_parallel, "y-", label='all_parallel_sfs')
plt.title("Total Execution Time - Improved Algorithms - Anticorrelated Dataset")
plt.xlabel("Number of Data")
plt.ylabel("Time(s)")
plt.legend()
plt.show()