From 500a1e7a198ee05645389cfe98a04f318fe7b1a9 Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Sun, 26 Nov 2023 15:30:42 -0800 Subject: [PATCH 1/6] Create GaskySparkReducerJob working on creating intervals for the gasky algorithm --- .../com/css534/parallel/GaskySparkReducerJob | 236 ++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 spark/src/main/java/com/css534/parallel/GaskySparkReducerJob diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob new file mode 100644 index 0000000..cef3f91 --- /dev/null +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -0,0 +1,236 @@ +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import java.util.Collections; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class GaskySparkJob { + public static void main(String[] args) { + // Create a Spark context + SparkConf conf = new SparkConf().setAppName("GaskySparkReducerJob"); + JavaSparkContext sparkContext = new JavaSparkContext(conf); + + // Load input data + JavaRDD inputData = sparkContext.textFile(args[0]); + + // Debug: Print the input data + System.out.println("Debug: Input Data:"); + inputData.foreach(line -> System.out.println(line)); + + // Process the input data and create key-value pairs + JavaPairRDD, Iterable>> result = inputData + .flatMapToPair(line -> parseInputData(line)) + .groupByKey(); + + // Collect the result and sort it + List, Iterable>>> resultList = result.collect(); + + // Create a new list with the sorted elements + List, Iterable>>> sortedResultList = new ArrayList<>(resultList); + sortedResultList.sort(Comparator.comparing( + (Tuple2, Iterable>> tuple) -> tuple._1(), + Tuple2Comparator.INSTANCE + )); + + // Separate keys and values from the sorted result + List> keysList = new ArrayList<>(); + List>> valuesList = new ArrayList<>(); + + for (Tuple2, Iterable>> tuple : sortedResultList) { + keysList.add(tuple._1()); + valuesList.add(tuple._2()); + } + + // Example: Print keys and values + System.out.println("Keys:"); + keysList.forEach(key -> System.out.println(key)); + + System.out.println("Values:"); + valuesList.forEach(values -> { + values.forEach(value -> System.out.print(" " + value)); + System.out.println(""); + }); + + // Call the new method to apply gaskyAlgorithm + applyGaskyAlgorithm(sparkContext,keysList, valuesList); + + // Stop the Spark context + sparkContext.stop(); + } + + // New method to apply gaskyAlgorithm to each tuple along with its corresponding key + private static void applyGaskyAlgorithm(JavaSparkContext sparkContext,List> keysList, List>> valuesList) { + System.out.println("valuesList size"+valuesList.size()); + for (int i = 0; i < valuesList.size(); i++) { + Tuple2 keysTuple = keysList.get(i); + int colNumber = keysTuple._2(); + Iterable> valuesTuple = valuesList.get(i); + + + System.out.println("Column number"+colNumber); + // Assuming gaskyAlgorithm takes an Iterable> and a Tuple2 + gaskyAlgorithm(sparkContext, valuesTuple, colNumber); + } + } + + private static void gaskyAlgorithm(JavaSparkContext sparkContext, Iterable> values, int colNumber) { + int totalPoints = Iterables.size(values); + List distances = new ArrayList<>(Collections.nCopies(totalPoints, Double.MAX_VALUE)); + System.out.println("Total points in Cartesian: " + totalPoints); + + if (totalPoints > 2) { + // Convert Iterable to a List + List> points = Lists.newArrayList(values); + + // Use Spark transformations to filter points based on dominance + JavaPairRDD filteredPointsRDD = sparkContext.parallelize(points) + .mapToPair(point -> new Tuple2<>(point._1().doubleValue(), calcBisectorProjections(point._1().doubleValue(), point._2()))) + .reduceByKey((point1, point2) -> ((Double)point1) > ((Double)point2) ? point1 : point2) + .values(); + + // Collect the results back to the driver + List> filteredPoints = filteredPointsRDD.collect(); + + System.out.println("The current remained dominated points are " + filteredPoints.size()); + List proximityProjectionsPoints = findProximityPoints(sparkContext, Lists.newArrayList(filteredPoints), totalPoints); + } +} + + + // FlatMap function to process input data and generate key-value pairs + private static Iterator, Tuple2>> parseInputData(String line) { + System.out.println("Debug: Processing Line: " + line); + + String[] distFavArray = line.split("\\s+"); + List, Tuple2>> result = new ArrayList<>(); + + if (distFavArray.length > 0) { + String facilityName = distFavArray[0]; + int matrixRowNumber = Integer.parseInt(distFavArray[1]); + + // Convert the strings to a list of strings + List binMatrixValues = Arrays.asList(Arrays.copyOfRange(distFavArray, 2, distFavArray.length)); + + double[] leftDistance = new double[binMatrixValues.get(0).length()]; + double[] rightDistance = new double[binMatrixValues.get(0).length()]; + + Arrays.fill(leftDistance, Double.MAX_VALUE); + Arrays.fill(rightDistance, Double.MAX_VALUE); + + leftDistance = getLeftDistance(leftDistance, binMatrixValues); + rightDistance = getRightDistance(rightDistance, binMatrixValues); + + for (int i = 0; i < binMatrixValues.get(0).length(); i++) { + result.add(new Tuple2<>(new Tuple2<>(facilityName, i + 1), + new Tuple2<>(matrixRowNumber, Double.min(leftDistance[i], rightDistance[i])))); + } + } + + // Debug: Print the generated key-value pairs + System.out.println("Debug: Generated Key-Value Pairs:"); + result.forEach(tuple -> System.out.println(tuple)); + + return result.iterator(); + } + + private static double[] getLeftDistance(double[] leftDistance, List gridRows) { + boolean isFavlFound = false; + for (int i = 0; i < gridRows.get(0).length(); i++) { + if (gridRows.get(0).charAt(i) == '1') { + leftDistance[i] = 0; + isFavlFound = true; + } else if (isFavlFound) { + leftDistance[i] = leftDistance[i - 1] + 1; + } + } + return leftDistance; + } + + private static double[] getRightDistance(double[] rightDistance, List gridRows) { + boolean isFavrFound = false; + for (int i = gridRows.get(0).length() - 1; i >= 0; --i) { + if (gridRows.get(0).charAt(i) == '1') { + rightDistance[i] = 0; + isFavrFound = true; + } else if (isFavrFound) { + rightDistance[i] = rightDistance[i + 1] + 1; + } + } + return rightDistance; + } + + // Comparator for Tuple2 + static class Tuple2Comparator implements Comparator>, Serializable { + static final Tuple2Comparator INSTANCE = new Tuple2Comparator(); + + @Override + public int compare(Tuple2 tuple1, Tuple2 tuple2) { + int compareResult = tuple1._1().compareTo(tuple2._1()); + if (compareResult == 0) { + // If the first elements are equal, compare the second elements + compareResult = Integer.compare(tuple1._2(), tuple2._2()); + } + return compareResult; + } + } + + + private static Tuple2 calcBisectorProjections(Double x1, Double y1){ + double x = x1; + double y = y1; + double xx = ((y * y) - (y * y) + (x * x) - (x * x)) / (2 * (y - x)); + double yy = 0; + return new Tuple2<>(xx, yy); + } + + private static List findProximityPoints(JavaSparkContext sparkContext, + List> unDominatedPoints, + final int totalPoints) { + // Create a JavaRDD from the list of unDominatedPoints + JavaRDD> unDominatedPointsRDD = sparkContext.parallelize(unDominatedPoints); + + // Calculate intervals between consecutive points + JavaRDD> intervalsRDD = unDominatedPointsRDD + .zipWithIndex() + .filter(tuple -> tuple._2() > 0) // Exclude the first point + .map(tuple -> { + Tuple2 point1 = unDominatedPoints.get(tuple._2().intValue() - 1); + Tuple2 point2 = tuple._1(); + + return new Tuple2<>( + (point1._1() + point2._1()) / 2, + 0.0 // point lying on with intersection on X axis + ); + }); + + // Combine intervals + JavaPairRDD mergedIntervalRDD = intervalsRDD + .mapToPair(interval -> new Tuple2<>(interval._1(), interval._1())) + .reduceByKey((interval1, interval2) -> new Tuple2<>(interval1, interval2)); + + // Collect the results back to the driver + List> mergedIntervals = mergedIntervalRDD.collect(); + + // Convert List> to List + List mergedIntervalList = new ArrayList<>(); + for (Tuple2 interval : mergedIntervals) { + mergedIntervalList.add(new double[]{interval._1(), totalPoints}); + } + + return mergedIntervalList; +} + + +} From a9b423adbbcef3341026d69d5b8a9bcd1f391c50 Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:53:48 -0800 Subject: [PATCH 2/6] Update GaskySparkReducerJob --- .../com/css534/parallel/GaskySparkReducerJob | 230 +++++++++++++----- 1 file changed, 163 insertions(+), 67 deletions(-) diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob index cef3f91..3979021 100644 --- a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -17,9 +17,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; public class GaskySparkJob { + private static final int GRID_SIZE = 8; public static void main(String[] args) { // Create a Spark context - SparkConf conf = new SparkConf().setAppName("GaskySparkReducerJob"); + SparkConf conf = new SparkConf().setAppName("GaskySparkJob"); JavaSparkContext sparkContext = new JavaSparkContext(conf); // Load input data @@ -70,42 +71,128 @@ public class GaskySparkJob { sparkContext.stop(); } - // New method to apply gaskyAlgorithm to each tuple along with its corresponding key - private static void applyGaskyAlgorithm(JavaSparkContext sparkContext,List> keysList, List>> valuesList) { - System.out.println("valuesList size"+valuesList.size()); - for (int i = 0; i < valuesList.size(); i++) { - Tuple2 keysTuple = keysList.get(i); - int colNumber = keysTuple._2(); - Iterable> valuesTuple = valuesList.get(i); - - - System.out.println("Column number"+colNumber); + private static void applyGaskyAlgorithm(JavaSparkContext sparkContext, List> keysList, List>> valuesList) { + System.out.println("valuesList size" + valuesList.size()); + + // Create JavaRDDs for keys and values + JavaRDD> keysRDD = sparkContext.parallelize(keysList); + JavaRDD>> valuesRDD = sparkContext.parallelize(valuesList); + + // Zip the two RDDs to create a PairRDD + JavaPairRDD, Iterable>> zippedRDD = keysRDD.zip(valuesRDD); + + // Filter out points with Double.MAX_VALUE distance + JavaPairRDD, Iterable>> filteredRDD = zippedRDD.mapValues(values -> + filterUnfavorablePoints(values) + ); + + // Debug: Print the filtered points + System.out.println("Remaining Points after filtering:"); + filteredRDD.collect().forEach(tuple -> { + Tuple2 keysTuple = tuple._1(); + int colNumber = keysTuple._2(); + Iterable> remainingPoints = tuple._2(); + System.out.println("Column number: " + colNumber); + System.out.println("Remaining Points: " + remainingPoints); + }); + + // Apply gaskyAlgorithm to the remaining points + filteredRDD.foreach(tuple -> { + Tuple2 keysTuple = tuple._1(); + int colNumber = keysTuple._2(); + Iterable> remainingPoints = tuple._2(); + + System.out.println("Applying gaskyAlgorithm for Column number: " + colNumber); // Assuming gaskyAlgorithm takes an Iterable> and a Tuple2 - gaskyAlgorithm(sparkContext, valuesTuple, colNumber); + gaskyAlgorithm(remainingPoints, colNumber); + }); + } + + private static Iterable> filterUnfavorablePoints(Iterable> values) { + List> filteredValues = new ArrayList<>(); + for (Tuple2 value : values) { + if (!value._2().equals(Double.MAX_VALUE)) { + filteredValues.add(value); + } + } + return filteredValues; + } + + + + private static void gaskyAlgorithm(Iterable> remainingPoints, int colNumber) { + int totalRemainingPoints = Iterables.size(remainingPoints); + List> points = new ArrayList<>(); + + // Convert remainingPoints to Tuple2 objects + for (Tuple2 point : remainingPoints) { + points.add(new Tuple2<>(point._1().doubleValue(), point._2())); + } + + // Debug: Print initial points + System.out.println("Initial Points for Column number: " + colNumber); + points.forEach(System.out::println); + + // Filtering based on dominance + int currentWindowStart = 1; + while (points.size() >= 3 && currentWindowStart <= points.size() - 2) { + Tuple2 ii = points.get(currentWindowStart - 1); + Tuple2 jj = points.get(currentWindowStart); + Tuple2 kk = points.get(currentWindowStart + 1); + + if (ii != null && jj != null && kk != null) { + double xij = calcBisectorProjections(ii._1(), ii._2(), jj._1(), jj._2())._1(); + double xjk = calcBisectorProjections(jj._1(), jj._2(), kk._1(), kk._2())._1(); + + // Debug: Print xij and xjk + System.out.println("xij: " + xij + ", xjk: " + xjk); + + if (xij > xjk) { + // Debug: Print the removed point + System.out.println("Removed point: " + jj); + + // Remove the middle point + points.remove(currentWindowStart); + } else { + // Move to the next window + currentWindowStart++; + } } } - private static void gaskyAlgorithm(JavaSparkContext sparkContext, Iterable> values, int colNumber) { - int totalPoints = Iterables.size(values); - List distances = new ArrayList<>(Collections.nCopies(totalPoints, Double.MAX_VALUE)); - System.out.println("Total points in Cartesian: " + totalPoints); + // Debug: Print final points after filtering + System.out.println("Filtered Points after Gasky Algorithm for Column number: " + colNumber); + points.forEach(System.out::println); + + // Call the method to find proximal points + List proximalPoints = findProximityPoints(points, totalRemainingPoints, GRID_SIZE); - if (totalPoints > 2) { - // Convert Iterable to a List - List> points = Lists.newArrayList(values); + // Debug: Print proximal points + System.out.println("Proximal Points for Column number: " + colNumber); + proximalPoints.forEach(point -> System.out.println(Arrays.toString(point))); +} + + private static List findProximityPoints(List> unDominatedPoints, final int totalPoints, final int gridSize) { + List> intervals = new ArrayList<>(); - // Use Spark transformations to filter points based on dominance - JavaPairRDD filteredPointsRDD = sparkContext.parallelize(points) - .mapToPair(point -> new Tuple2<>(point._1().doubleValue(), calcBisectorProjections(point._1().doubleValue(), point._2()))) - .reduceByKey((point1, point2) -> ((Double)point1) > ((Double)point2) ? point1 : point2) - .values(); + // Calculate intervals based on the unDominatedPoints + for (int i = 1; i < unDominatedPoints.size(); i++) { + Tuple2 point1 = unDominatedPoints.get(i - 1); + Tuple2 point2 = unDominatedPoints.get(i); + intervals.add(new Tuple2<>((point1._1() + point2._1()) / 2, 0.0)); + } - // Collect the results back to the driver - List> filteredPoints = filteredPointsRDD.collect(); + // Combine intervals using a frame + List mergedInterval = new ArrayList<>(intervals.size()); + mergedInterval.add(new double[]{1, intervals.get(0)._1()}); - System.out.println("The current remained dominated points are " + filteredPoints.size()); - List proximityProjectionsPoints = findProximityPoints(sparkContext, Lists.newArrayList(filteredPoints), totalPoints); + for (int i = 1; i < intervals.size(); i++) { + mergedInterval.add(new double[]{intervals.get(i - 1)._1(), intervals.get(i)._1()}); } + + mergedInterval.add(new double[]{intervals.get(intervals.size() - 1)._1(), GRID_SIZE}); + + return mergedInterval; } @@ -187,50 +274,59 @@ public class GaskySparkJob { } - private static Tuple2 calcBisectorProjections(Double x1, Double y1){ - double x = x1; - double y = y1; - double xx = ((y * y) - (y * y) + (x * x) - (x * x)) / (2 * (y - x)); + + private static Tuple2 calcBisectorProjections(double x, double y, double x1, double y1) { + double xx = ((y1 * y1) - (y * y) + (x1 * x1) - (x * x)) / (2 * (x1 - x)); double yy = 0; return new Tuple2<>(xx, yy); } - private static List findProximityPoints(JavaSparkContext sparkContext, - List> unDominatedPoints, - final int totalPoints) { - // Create a JavaRDD from the list of unDominatedPoints - JavaRDD> unDominatedPointsRDD = sparkContext.parallelize(unDominatedPoints); - - // Calculate intervals between consecutive points - JavaRDD> intervalsRDD = unDominatedPointsRDD - .zipWithIndex() - .filter(tuple -> tuple._2() > 0) // Exclude the first point - .map(tuple -> { - Tuple2 point1 = unDominatedPoints.get(tuple._2().intValue() - 1); - Tuple2 point2 = tuple._1(); - - return new Tuple2<>( - (point1._1() + point2._1()) / 2, - 0.0 // point lying on with intersection on X axis - ); - }); - - // Combine intervals - JavaPairRDD mergedIntervalRDD = intervalsRDD - .mapToPair(interval -> new Tuple2<>(interval._1(), interval._1())) - .reduceByKey((interval1, interval2) -> new Tuple2<>(interval1, interval2)); - - // Collect the results back to the driver - List> mergedIntervals = mergedIntervalRDD.collect(); - - // Convert List> to List - List mergedIntervalList = new ArrayList<>(); - for (Tuple2 interval : mergedIntervals) { - mergedIntervalList.add(new double[]{interval._1(), totalPoints}); +// private static List findProximityPoints(JavaSparkContext sparkContext, +// List> unDominatedPoints, +// final int totalPoints) { +// // Create a JavaRDD from the list of unDominatedPoints +// JavaRDD> unDominatedPointsRDD = sparkContext.parallelize(unDominatedPoints); + +// // Calculate intervals between consecutive points +// JavaRDD> intervalsRDD = unDominatedPointsRDD +// .zipWithIndex() +// .filter(tuple -> tuple._2() > 0) // Exclude the first point +// .map(tuple -> { +// Tuple2 point1 = unDominatedPoints.get(tuple._2().intValue() - 1); +// Tuple2 point2 = tuple._1(); + +// return new Tuple2<>( +// (point1._1() + point2._1()) / 2, +// 0.0 // point lying on with intersection on X axis +// ); +// }); + +// // Combine intervals +// JavaPairRDD mergedIntervalRDD = intervalsRDD +// .mapToPair(interval -> new Tuple2<>(interval._1(), interval._1())) +// .reduceByKey((interval1, interval2) -> new Tuple2<>(interval1, interval2)); + +// // Collect the results back to the driver +// List> mergedIntervals = mergedIntervalRDD.collect(); + +// // Convert List> to List +// List mergedIntervalList = new ArrayList<>(); +// for (Tuple2 interval : mergedIntervals) { +// mergedIntervalList.add(new double[]{interval._1(), totalPoints}); +// } + +// return mergedIntervalList; +// } + +// Add this method to your class +private static Iterable iterableToJava(scala.collection.Iterable scalaIterable) { + List javaList = new ArrayList<>(); + scala.collection.Iterator scalaIterator = scalaIterable.iterator(); + while (scalaIterator.hasNext()) { + javaList.add(scalaIterator.next()); } - - return mergedIntervalList; + return javaList; } - } + From b8be69a43c4315ba19257eaa7222e8b765d97ad9 Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Tue, 28 Nov 2023 17:03:38 -0800 Subject: [PATCH 3/6] Update GaskySparkReducerJob 3 --- .../com/css534/parallel/GaskySparkReducerJob | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob index 3979021..79e15ad 100644 --- a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -165,16 +165,64 @@ public class GaskySparkJob { points.forEach(System.out::println); // Call the method to find proximal points - List proximalPoints = findProximityPoints(points, totalRemainingPoints, GRID_SIZE); + List proximityProjectionsPoints = findProximityPoints(points, totalRemainingPoints, GRID_SIZE); // Debug: Print proximal points System.out.println("Proximal Points for Column number: " + colNumber); - proximalPoints.forEach(point -> System.out.println(Arrays.toString(point))); + proximityProjectionsPoints.forEach(point -> System.out.println(Arrays.toString(point))); + + int unDominatedPointsSize = points.size(); + int proximityIntervals = proximityProjectionsPoints.size() - 1; + int dominatedCoordinatesDistances = 0; + List distances = new ArrayList<>(Collections.nCopies(GRID_SIZE, Double.MAX_VALUE)); + //update distance implementation + for (int interval = 0; interval < proximityIntervals; interval++) { + double[] currentInterval = proximityProjectionsPoints.get(interval); + Tuple2 dominantPoint = points.get(dominatedCoordinatesDistances); + int start = (int) currentInterval[0]; + int end = (int) currentInterval[1]; + for (int xCord = start; xCord <= end; xCord++) { + // if (xCord >= 1 && xCord <= distances.size()) { + // double currentDistance = distances.get(xCord - 1); + // double newDistance = findEuclideanDistance(xCord, 0, currentInterval[2], currentInterval[3]); + + // if (currentDistance != Double.MAX_VALUE) { + // newDistance = Math.min(newDistance, currentDistance); + // } + + // distances.set(xCord - 1, newDistance); + // } + // } + + // dominatedCoordinatesDistances++; + if (distances.get(xCord - 1) != Double.MAX_VALUE){ + distances.set( + xCord - 1, + Double.min( + findEuclideanDistance(xCord, 0, dominantPoint._1(), dominantPoint._2()), + distances.get(xCord - 1) + ) + ); + }else + distances.set( + xCord - 1, + findEuclideanDistance(xCord, 0, dominantPoint._1(), dominantPoint._2()) + ); + } + dominatedCoordinatesDistances++; + } + + // Debug: Print updated distances + System.out.println("Updated Distances:"); + distances.forEach(System.out::println); + + } private static List findProximityPoints(List> unDominatedPoints, final int totalPoints, final int gridSize) { List> intervals = new ArrayList<>(); + System.out.println("unDominatedPoints size: " + unDominatedPoints.size()); // Calculate intervals based on the unDominatedPoints for (int i = 1; i < unDominatedPoints.size(); i++) { Tuple2 point1 = unDominatedPoints.get(i - 1); @@ -195,6 +243,13 @@ public class GaskySparkJob { return mergedInterval; } + private static double findEuclideanDistance(int x, int y, int x1, int y1) { + return Math.sqrt((x1 - x) * (x1 - x) + (y1 - y) * (y1 - y)); + } + + private static double findEuclideanDistance(double x, double y, double x1, double y1){ + return Math.sqrt((x1 - x) * (x1 - x) + (y1 - y) * (y1 - y)); + } // FlatMap function to process input data and generate key-value pairs private static Iterator, Tuple2>> parseInputData(String line) { From 40b4bef0fca5007f1eb0458c17d003619778133c Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Tue, 28 Nov 2023 19:41:50 -0800 Subject: [PATCH 4/6] Update GaskySparkReducerJob map reduce Results for the entire dataset: (F2,7) || [3.1622776601683795,3.0,3.1622776601683795,3.605551275463989,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [3.0,7.0,0.0,7.0] (F2,1) || [3.1622776601683795,3.0,1.4142135623730951,1.0,1.4142135623730951,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [3.0,1.0,1.0,1.0,6.0,1.0] (F2,8) || [4.123105625617661,4.0,4.123105625617661,4.47213595499958,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [4.0,8.0,1.0,8.0] (F1,8) || [1.4142135623730951,1.0,1.4142135623730951,1.0,2.0,2.23606797749979,1.7976931348623157E308,1.7976931348623157E308] || [1.0,8.0,1.0,8.0,2.0,8.0,2.0,8.0] (F2,4) || [1.0,0.0,1.0,2.0,2.23606797749979,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [0.0,4.0,2.0,4.0,3.0,4.0] (F2,5) || [1.4142135623730951,1.0,1.4142135623730951,2.23606797749979,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [1.0,5.0,2.0,5.0] (F2,6) || [2.23606797749979,2.0,2.23606797749979,2.8284271247461903,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [2.0,6.0,1.0,6.0] (F1,1) || [3.1622776601683795,3.0,3.0,1.4142135623730951,1.0,3.0,3.1622776601683795,1.7976931348623157E308] || [3.0,1.0,3.0,1.0,1.0,1.0,3.0,1.0,5.0,1.0] (F1,2) || [2.23606797749979,2.0,2.0,1.0,0.0,2.0,2.23606797749979,1.7976931348623157E308] || [2.0,2.0,2.0,2.0,0.0,2.0,2.0,2.0,4.0,2.0] (F1,3) || [1.4142135623730951,1.0,1.0,1.4142135623730951,1.0,1.0,1.4142135623730951,1.7976931348623157E308] || [1.0,3.0,1.0,3.0,1.0,3.0,1.0,3.0,3.0,3.0] (F1,5) || [1.4142135623730951,1.0,1.0,1.4142135623730951,1.0,1.0,1.4142135623730951,1.7976931348623157E308] || [1.0,5.0,1.0,5.0,1.0,5.0,1.0,5.0,1.0,5.0] (F2,2) || [2.23606797749979,2.0,1.0,0.0,1.0,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [2.0,2.0,0.0,2.0,5.0,2.0] (F2,3) || [1.4142135623730951,1.0,1.4142135623730951,1.0,1.4142135623730951,1.7976931348623157E308,1.7976931348623157E308,1.7976931348623157E308] || [1.0,3.0,1.0,3.0,4.0,3.0] (F1,4) || [1.0,0.0,0.0,1.0,1.0,0.0,1.0,1.7976931348623157E308] || [0.0,4.0,0.0,4.0,0.0,4.0,2.0,4.0] (F1,7) || [1.0,0.0,1.0,0.0,1.0,1.4142135623730951,1.7976931348623157E308,1.7976931348623157E308] || [0.0,7.0,0.0,7.0,1.0,7.0,1.0,7.0] (F1,6) || [1.4142135623730951,1.0,1.4142135623730951,1.0,0.0,1.0,1.7976931348623157E308,1.7976931348623157E308] || [1.0,6.0,1.0,6.0,0.0,6.0,0.0,6.0] --- .../com/css534/parallel/GaskySparkReducerJob | 235 +++++++++--------- 1 file changed, 116 insertions(+), 119 deletions(-) diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob index 79e15ad..a601886 100644 --- a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -13,100 +13,128 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import com.google.common.collect.Lists; + +// import GaskySparkReduce.SkylineObjects; + import com.google.common.collect.Iterables; import com.google.common.collect.Lists; public class GaskySparkJob { private static final int GRID_SIZE = 8; + public static void main(String[] args) { - // Create a Spark context - SparkConf conf = new SparkConf().setAppName("GaskySparkJob"); - JavaSparkContext sparkContext = new JavaSparkContext(conf); + SparkConf conf = new SparkConf().setAppName("GaskySparkJob"); + try (JavaSparkContext sparkContext = new JavaSparkContext(conf)) { - // Load input data JavaRDD inputData = sparkContext.textFile(args[0]); - // Debug: Print the input data System.out.println("Debug: Input Data:"); inputData.foreach(line -> System.out.println(line)); - // Process the input data and create key-value pairs JavaPairRDD, Iterable>> result = inputData .flatMapToPair(line -> parseInputData(line)) .groupByKey(); - // Collect the result and sort it List, Iterable>>> resultList = result.collect(); - // Create a new list with the sorted elements - List, Iterable>>> sortedResultList = new ArrayList<>(resultList); - sortedResultList.sort(Comparator.comparing( - (Tuple2, Iterable>> tuple) -> tuple._1(), - Tuple2Comparator.INSTANCE - )); - // Separate keys and values from the sorted result + // Create a modifiable list before sorting + List, Iterable>>> modifiableResultList = new ArrayList<>(resultList); + + // Sort the modifiable list + modifiableResultList.sort(Comparator.comparing(Tuple2::_1, Tuple2Comparator.INSTANCE)); + + List> keysList = new ArrayList<>(); List>> valuesList = new ArrayList<>(); - for (Tuple2, Iterable>> tuple : sortedResultList) { + for (Tuple2, Iterable>> tuple : resultList) { keysList.add(tuple._1()); valuesList.add(tuple._2()); } - // Example: Print keys and values System.out.println("Keys:"); - keysList.forEach(key -> System.out.println(key)); + keysList.forEach(System.out::println); System.out.println("Values:"); - valuesList.forEach(values -> { - values.forEach(value -> System.out.print(" " + value)); - System.out.println(""); - }); + valuesList.forEach(values -> values.forEach(value -> System.out.println(" " + value))); + + List, Tuple2, List>>>> skylineObjectsList = + applyGaskyAlgorithm(sparkContext, keysList, valuesList); + + StringBuilder totalDistances = new StringBuilder(); + + for (Tuple2, Tuple2, List>>> skylineObject : skylineObjectsList) { + totalDistances.append(skylineObject._1()).append(" || ["); + + List distances = skylineObject._2()._1(); + for (int j = 0; j < distances.size(); j++) { + totalDistances.append(distances.get(j)); + if (j != distances.size() - 1) { + totalDistances.append(","); + } + } - // Call the new method to apply gaskyAlgorithm - applyGaskyAlgorithm(sparkContext,keysList, valuesList); + totalDistances.append("] || ["); - // Stop the Spark context - sparkContext.stop(); + List> points = skylineObject._2()._2(); + for (int j = 0; j < points.size(); j++) { + Tuple2 point = points.get(j); + totalDistances.append(point._1()).append(",").append(point._2()); + if (j != points.size() - 1) { + totalDistances.append(","); + } + } + + totalDistances.append("]\n"); + } + + System.out.println("Results for the entire dataset:\n" + totalDistances.toString()); + } catch (Exception e) { + e.printStackTrace(); } +} - private static void applyGaskyAlgorithm(JavaSparkContext sparkContext, List> keysList, List>> valuesList) { - System.out.println("valuesList size" + valuesList.size()); - // Create JavaRDDs for keys and values - JavaRDD> keysRDD = sparkContext.parallelize(keysList); - JavaRDD>> valuesRDD = sparkContext.parallelize(valuesList); - // Zip the two RDDs to create a PairRDD - JavaPairRDD, Iterable>> zippedRDD = keysRDD.zip(valuesRDD); + private static List, Tuple2, List>>>> applyGaskyAlgorithm(JavaSparkContext sparkContext, List> keysList, List>> valuesList) { + System.out.println("valuesList size" + valuesList.size()); - // Filter out points with Double.MAX_VALUE distance - JavaPairRDD, Iterable>> filteredRDD = zippedRDD.mapValues(values -> - filterUnfavorablePoints(values) - ); + // Create JavaRDDs for keys and values + JavaRDD> keysRDD = sparkContext.parallelize(keysList); + JavaRDD>> valuesRDD = sparkContext.parallelize(valuesList); - // Debug: Print the filtered points - System.out.println("Remaining Points after filtering:"); - filteredRDD.collect().forEach(tuple -> { + // Zip the two RDDs to create a PairRDD + JavaPairRDD, Iterable>> zippedRDD = keysRDD.zip(valuesRDD); + + // Filter out points with Double.MAX_VALUE distance + JavaPairRDD, Iterable>> filteredRDD = zippedRDD.mapValues(values -> + filterUnfavorablePoints(values) + ); + + // Debug: Print the filtered points + System.out.println("Remaining Points after filtering:"); + filteredRDD.collect().forEach(tuple -> { Tuple2 keysTuple = tuple._1(); int colNumber = keysTuple._2(); Iterable> remainingPoints = tuple._2(); System.out.println("Column number: " + colNumber); System.out.println("Remaining Points: " + remainingPoints); - }); - - // Apply gaskyAlgorithm to the remaining points - filteredRDD.foreach(tuple -> { - Tuple2 keysTuple = tuple._1(); - int colNumber = keysTuple._2(); - Iterable> remainingPoints = tuple._2(); - - System.out.println("Applying gaskyAlgorithm for Column number: " + colNumber); - // Assuming gaskyAlgorithm takes an Iterable> and a Tuple2 - gaskyAlgorithm(remainingPoints, colNumber); - }); - } + }); + + // Apply gaskyAlgorithm to the remaining points + return filteredRDD.map(tuple -> { + Tuple2 keysTuple = tuple._1(); + int colNumber = keysTuple._2(); + Iterable> remainingPoints = tuple._2(); + + System.out.println("Applying gaskyAlgorithm for Column number: " + colNumber); + // Assuming gaskyAlgorithm takes an Iterable> and a Tuple2 + Tuple2, List>> skylineObjects = gaskyAlgorithm(remainingPoints, colNumber); + return new Tuple2<>(keysTuple, skylineObjects); + }).collect(); + +} private static Iterable> filterUnfavorablePoints(Iterable> values) { List> filteredValues = new ArrayList<>(); @@ -120,8 +148,10 @@ public class GaskySparkJob { - private static void gaskyAlgorithm(Iterable> remainingPoints, int colNumber) { + private static Tuple2,List>> gaskyAlgorithm(Iterable> remainingPoints, int colNumber) { int totalRemainingPoints = Iterables.size(remainingPoints); + List distances = new ArrayList<>(Collections.nCopies(GRID_SIZE, Double.MAX_VALUE)); + if (totalRemainingPoints > 2) { List> points = new ArrayList<>(); // Convert remainingPoints to Tuple2 objects @@ -135,7 +165,7 @@ public class GaskySparkJob { // Filtering based on dominance int currentWindowStart = 1; - while (points.size() >= 3 && currentWindowStart <= points.size() - 2) { + while (points.size() >= 3 && currentWindowStart <= points.size() - 2) { Tuple2 ii = points.get(currentWindowStart - 1); Tuple2 jj = points.get(currentWindowStart); Tuple2 kk = points.get(currentWindowStart + 1); @@ -174,7 +204,6 @@ public class GaskySparkJob { int unDominatedPointsSize = points.size(); int proximityIntervals = proximityProjectionsPoints.size() - 1; int dominatedCoordinatesDistances = 0; - List distances = new ArrayList<>(Collections.nCopies(GRID_SIZE, Double.MAX_VALUE)); //update distance implementation for (int interval = 0; interval < proximityIntervals; interval++) { double[] currentInterval = proximityProjectionsPoints.get(interval); @@ -182,19 +211,6 @@ public class GaskySparkJob { int start = (int) currentInterval[0]; int end = (int) currentInterval[1]; for (int xCord = start; xCord <= end; xCord++) { - // if (xCord >= 1 && xCord <= distances.size()) { - // double currentDistance = distances.get(xCord - 1); - // double newDistance = findEuclideanDistance(xCord, 0, currentInterval[2], currentInterval[3]); - - // if (currentDistance != Double.MAX_VALUE) { - // newDistance = Math.min(newDistance, currentDistance); - // } - - // distances.set(xCord - 1, newDistance); - // } - // } - - // dominatedCoordinatesDistances++; if (distances.get(xCord - 1) != Double.MAX_VALUE){ distances.set( xCord - 1, @@ -211,12 +227,32 @@ public class GaskySparkJob { } dominatedCoordinatesDistances++; } - // Debug: Print updated distances System.out.println("Updated Distances:"); distances.forEach(System.out::println); + for (Tuple2 point : points) { + double y = point._2(); + Tuple2 updatedPoint = new Tuple2<>(y, (double) colNumber); + // Replace the existing point with the updated one + points.set(points.indexOf(point), updatedPoint); + } + + // Debug: Print updated points + System.out.println("Updated Points for Column number: " + colNumber); + points.forEach(System.out::println); + + // check for the points based on the dominance + return new Tuple2, List>>(distances, points); + + } + else{ + return new Tuple2<>( + Collections.emptyList(), // Empty list for distances + Collections.emptyList() // Empty list for points + ); + } } private static List findProximityPoints(List> unDominatedPoints, final int totalPoints, final int gridSize) { @@ -314,20 +350,19 @@ public class GaskySparkJob { } // Comparator for Tuple2 - static class Tuple2Comparator implements Comparator>, Serializable { - static final Tuple2Comparator INSTANCE = new Tuple2Comparator(); - - @Override - public int compare(Tuple2 tuple1, Tuple2 tuple2) { - int compareResult = tuple1._1().compareTo(tuple2._1()); - if (compareResult == 0) { - // If the first elements are equal, compare the second elements - compareResult = Integer.compare(tuple1._2(), tuple2._2()); - } - return compareResult; +static class Tuple2Comparator implements Comparator>, Serializable { + static final Tuple2Comparator INSTANCE = new Tuple2Comparator(); + + @Override + public int compare(Tuple2 tuple1, Tuple2 tuple2) { + int compareResult = tuple1._1().compareTo(tuple2._1()); + if (compareResult == 0) { + // If the first elements are equal, compare the second elements + return Integer.compare(tuple1._2(), tuple2._2()); } + return compareResult; } - +} private static Tuple2 calcBisectorProjections(double x, double y, double x1, double y1) { @@ -336,43 +371,6 @@ public class GaskySparkJob { return new Tuple2<>(xx, yy); } -// private static List findProximityPoints(JavaSparkContext sparkContext, -// List> unDominatedPoints, -// final int totalPoints) { -// // Create a JavaRDD from the list of unDominatedPoints -// JavaRDD> unDominatedPointsRDD = sparkContext.parallelize(unDominatedPoints); - -// // Calculate intervals between consecutive points -// JavaRDD> intervalsRDD = unDominatedPointsRDD -// .zipWithIndex() -// .filter(tuple -> tuple._2() > 0) // Exclude the first point -// .map(tuple -> { -// Tuple2 point1 = unDominatedPoints.get(tuple._2().intValue() - 1); -// Tuple2 point2 = tuple._1(); - -// return new Tuple2<>( -// (point1._1() + point2._1()) / 2, -// 0.0 // point lying on with intersection on X axis -// ); -// }); - -// // Combine intervals -// JavaPairRDD mergedIntervalRDD = intervalsRDD -// .mapToPair(interval -> new Tuple2<>(interval._1(), interval._1())) -// .reduceByKey((interval1, interval2) -> new Tuple2<>(interval1, interval2)); - -// // Collect the results back to the driver -// List> mergedIntervals = mergedIntervalRDD.collect(); - -// // Convert List> to List -// List mergedIntervalList = new ArrayList<>(); -// for (Tuple2 interval : mergedIntervals) { -// mergedIntervalList.add(new double[]{interval._1(), totalPoints}); -// } - -// return mergedIntervalList; -// } - // Add this method to your class private static Iterable iterableToJava(scala.collection.Iterable scalaIterable) { List javaList = new ArrayList<>(); @@ -384,4 +382,3 @@ private static Iterable iterableToJava(scala.collection.Iterable scala } } - From f66be961f7968a339578681596c517425ad5bfdd Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Wed, 29 Nov 2023 18:59:36 -0800 Subject: [PATCH 5/6] Update GaskySparkReducerJob --- .../com/css534/parallel/GaskySparkReducerJob | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob index a601886..cc7716b 100644 --- a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -14,8 +14,6 @@ import java.util.Iterator; import java.util.List; import com.google.common.collect.Lists; -// import GaskySparkReduce.SkylineObjects; - import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -24,6 +22,7 @@ public class GaskySparkJob { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("GaskySparkJob"); + try (JavaSparkContext sparkContext = new JavaSparkContext(conf)) { JavaRDD inputData = sparkContext.textFile(args[0]); @@ -37,27 +36,30 @@ public class GaskySparkJob { List, Iterable>>> resultList = result.collect(); - - // Create a modifiable list before sorting - List, Iterable>>> modifiableResultList = new ArrayList<>(resultList); - - // Sort the modifiable list - modifiableResultList.sort(Comparator.comparing(Tuple2::_1, Tuple2Comparator.INSTANCE)); - + // Create a new list with the sorted elements + List, Iterable>>> sortedResultList = new ArrayList<>(resultList); + sortedResultList.sort(Comparator.comparing( + (Tuple2, Iterable>> tuple) -> tuple._1(), + Tuple2Comparator.INSTANCE + )); List> keysList = new ArrayList<>(); List>> valuesList = new ArrayList<>(); - for (Tuple2, Iterable>> tuple : resultList) { + for (Tuple2, Iterable>> tuple : sortedResultList) { keysList.add(tuple._1()); valuesList.add(tuple._2()); } + // Example: Print keys and values System.out.println("Keys:"); - keysList.forEach(System.out::println); + keysList.forEach(key -> System.out.println(key)); System.out.println("Values:"); - valuesList.forEach(values -> values.forEach(value -> System.out.println(" " + value))); + valuesList.forEach(values -> { + values.forEach(value -> System.out.print(" " + value)); + System.out.println(""); + }); List, Tuple2, List>>>> skylineObjectsList = applyGaskyAlgorithm(sparkContext, keysList, valuesList); @@ -204,6 +206,7 @@ public class GaskySparkJob { int unDominatedPointsSize = points.size(); int proximityIntervals = proximityProjectionsPoints.size() - 1; int dominatedCoordinatesDistances = 0; + //update distance implementation for (int interval = 0; interval < proximityIntervals; interval++) { double[] currentInterval = proximityProjectionsPoints.get(interval); @@ -363,7 +366,6 @@ static class Tuple2Comparator implements Comparator>, Se return compareResult; } } - private static Tuple2 calcBisectorProjections(double x, double y, double x1, double y1) { double xx = ((y1 * y1) - (y * y) + (x1 * x1) - (x * x)) / (2 * (x1 - x)); From 207dd2ace101adb228e2369715a9aeb9f4394850 Mon Sep 17 00:00:00 2001 From: mkaur71199 <150730813+mkaur71199@users.noreply.github.com> Date: Thu, 30 Nov 2023 12:35:17 -0800 Subject: [PATCH 6/6] Update GaskySparkReducerJob update --- .../java/com/css534/parallel/GaskySparkReducerJob | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob index cc7716b..9aa0c21 100644 --- a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -82,7 +82,7 @@ public class GaskySparkJob { List> points = skylineObject._2()._2(); for (int j = 0; j < points.size(); j++) { Tuple2 point = points.get(j); - totalDistances.append(point._1()).append(",").append(point._2()); + totalDistances.append(point._1()); if (j != points.size() - 1) { totalDistances.append(","); } @@ -206,9 +206,10 @@ public class GaskySparkJob { int unDominatedPointsSize = points.size(); int proximityIntervals = proximityProjectionsPoints.size() - 1; int dominatedCoordinatesDistances = 0; - + + //update distance implementation - for (int interval = 0; interval < proximityIntervals; interval++) { + for (int interval = 0; interval < proximityProjectionsPoints.size(); interval++) { double[] currentInterval = proximityProjectionsPoints.get(interval); Tuple2 dominantPoint = points.get(dominatedCoordinatesDistances); int start = (int) currentInterval[0]; @@ -230,13 +231,15 @@ public class GaskySparkJob { } dominatedCoordinatesDistances++; } + + // Debug: Print updated distances System.out.println("Updated Distances:"); distances.forEach(System.out::println); for (Tuple2 point : points) { - double y = point._2(); - Tuple2 updatedPoint = new Tuple2<>(y, (double) colNumber); + double x = point._1(); + Tuple2 updatedPoint = new Tuple2<>(x, (double) colNumber); // Replace the existing point with the updated one points.set(points.indexOf(point), updatedPoint); }