diff --git a/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob new file mode 100644 index 0000000..9aa0c21 --- /dev/null +++ b/spark/src/main/java/com/css534/parallel/GaskySparkReducerJob @@ -0,0 +1,389 @@ +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import java.util.Collections; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import com.google.common.collect.Lists; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class GaskySparkJob { + private static final int GRID_SIZE = 8; + + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("GaskySparkJob"); + + try (JavaSparkContext sparkContext = new JavaSparkContext(conf)) { + + JavaRDD inputData = sparkContext.textFile(args[0]); + + System.out.println("Debug: Input Data:"); + inputData.foreach(line -> System.out.println(line)); + + JavaPairRDD, Iterable>> result = inputData + .flatMapToPair(line -> parseInputData(line)) + .groupByKey(); + + List, Iterable>>> resultList = result.collect(); + + // Create a new list with the sorted elements + List, Iterable>>> sortedResultList = new ArrayList<>(resultList); + sortedResultList.sort(Comparator.comparing( + (Tuple2, Iterable>> tuple) -> tuple._1(), + Tuple2Comparator.INSTANCE + )); + + List> keysList = new ArrayList<>(); + List>> valuesList = new ArrayList<>(); + + for (Tuple2, Iterable>> tuple : sortedResultList) { + keysList.add(tuple._1()); + valuesList.add(tuple._2()); + } + + // Example: Print keys and values + System.out.println("Keys:"); + keysList.forEach(key -> System.out.println(key)); + + System.out.println("Values:"); + valuesList.forEach(values -> { + values.forEach(value -> System.out.print(" " + value)); + System.out.println(""); + }); + + List, Tuple2, List>>>> skylineObjectsList = + applyGaskyAlgorithm(sparkContext, keysList, valuesList); + + StringBuilder totalDistances = new StringBuilder(); + + for (Tuple2, Tuple2, List>>> skylineObject : skylineObjectsList) { + totalDistances.append(skylineObject._1()).append(" || ["); + + List distances = skylineObject._2()._1(); + for (int j = 0; j < distances.size(); j++) { + totalDistances.append(distances.get(j)); + if (j != distances.size() - 1) { + totalDistances.append(","); + } + } + + totalDistances.append("] || ["); + + List> points = skylineObject._2()._2(); + for (int j = 0; j < points.size(); j++) { + Tuple2 point = points.get(j); + totalDistances.append(point._1()); + if (j != points.size() - 1) { + totalDistances.append(","); + } + } + + totalDistances.append("]\n"); + } + + System.out.println("Results for the entire dataset:\n" + totalDistances.toString()); + } catch (Exception e) { + e.printStackTrace(); + } +} + + + + private static List, Tuple2, List>>>> applyGaskyAlgorithm(JavaSparkContext sparkContext, List> keysList, List>> valuesList) { + System.out.println("valuesList size" + valuesList.size()); + + // Create JavaRDDs for keys and values + JavaRDD> keysRDD = sparkContext.parallelize(keysList); + JavaRDD>> valuesRDD = sparkContext.parallelize(valuesList); + + // Zip the two RDDs to create a PairRDD + JavaPairRDD, Iterable>> zippedRDD = keysRDD.zip(valuesRDD); + + // Filter out points with Double.MAX_VALUE distance + JavaPairRDD, Iterable>> filteredRDD = zippedRDD.mapValues(values -> + filterUnfavorablePoints(values) + ); + + // Debug: Print the filtered points + System.out.println("Remaining Points after filtering:"); + filteredRDD.collect().forEach(tuple -> { + Tuple2 keysTuple = tuple._1(); + int colNumber = keysTuple._2(); + Iterable> remainingPoints = tuple._2(); + System.out.println("Column number: " + colNumber); + System.out.println("Remaining Points: " + remainingPoints); + }); + + // Apply gaskyAlgorithm to the remaining points + return filteredRDD.map(tuple -> { + Tuple2 keysTuple = tuple._1(); + int colNumber = keysTuple._2(); + Iterable> remainingPoints = tuple._2(); + + System.out.println("Applying gaskyAlgorithm for Column number: " + colNumber); + // Assuming gaskyAlgorithm takes an Iterable> and a Tuple2 + Tuple2, List>> skylineObjects = gaskyAlgorithm(remainingPoints, colNumber); + return new Tuple2<>(keysTuple, skylineObjects); + }).collect(); + +} + + private static Iterable> filterUnfavorablePoints(Iterable> values) { + List> filteredValues = new ArrayList<>(); + for (Tuple2 value : values) { + if (!value._2().equals(Double.MAX_VALUE)) { + filteredValues.add(value); + } + } + return filteredValues; + } + + + + private static Tuple2,List>> gaskyAlgorithm(Iterable> remainingPoints, int colNumber) { + int totalRemainingPoints = Iterables.size(remainingPoints); + List distances = new ArrayList<>(Collections.nCopies(GRID_SIZE, Double.MAX_VALUE)); + if (totalRemainingPoints > 2) { + List> points = new ArrayList<>(); + + // Convert remainingPoints to Tuple2 objects + for (Tuple2 point : remainingPoints) { + points.add(new Tuple2<>(point._1().doubleValue(), point._2())); + } + + // Debug: Print initial points + System.out.println("Initial Points for Column number: " + colNumber); + points.forEach(System.out::println); + + // Filtering based on dominance + int currentWindowStart = 1; + while (points.size() >= 3 && currentWindowStart <= points.size() - 2) { + Tuple2 ii = points.get(currentWindowStart - 1); + Tuple2 jj = points.get(currentWindowStart); + Tuple2 kk = points.get(currentWindowStart + 1); + + if (ii != null && jj != null && kk != null) { + double xij = calcBisectorProjections(ii._1(), ii._2(), jj._1(), jj._2())._1(); + double xjk = calcBisectorProjections(jj._1(), jj._2(), kk._1(), kk._2())._1(); + + // Debug: Print xij and xjk + System.out.println("xij: " + xij + ", xjk: " + xjk); + + if (xij > xjk) { + // Debug: Print the removed point + System.out.println("Removed point: " + jj); + + // Remove the middle point + points.remove(currentWindowStart); + } else { + // Move to the next window + currentWindowStart++; + } + } + } + + // Debug: Print final points after filtering + System.out.println("Filtered Points after Gasky Algorithm for Column number: " + colNumber); + points.forEach(System.out::println); + + // Call the method to find proximal points + List proximityProjectionsPoints = findProximityPoints(points, totalRemainingPoints, GRID_SIZE); + + // Debug: Print proximal points + System.out.println("Proximal Points for Column number: " + colNumber); + proximityProjectionsPoints.forEach(point -> System.out.println(Arrays.toString(point))); + + int unDominatedPointsSize = points.size(); + int proximityIntervals = proximityProjectionsPoints.size() - 1; + int dominatedCoordinatesDistances = 0; + + + //update distance implementation + for (int interval = 0; interval < proximityProjectionsPoints.size(); interval++) { + double[] currentInterval = proximityProjectionsPoints.get(interval); + Tuple2 dominantPoint = points.get(dominatedCoordinatesDistances); + int start = (int) currentInterval[0]; + int end = (int) currentInterval[1]; + for (int xCord = start; xCord <= end; xCord++) { + if (distances.get(xCord - 1) != Double.MAX_VALUE){ + distances.set( + xCord - 1, + Double.min( + findEuclideanDistance(xCord, 0, dominantPoint._1(), dominantPoint._2()), + distances.get(xCord - 1) + ) + ); + }else + distances.set( + xCord - 1, + findEuclideanDistance(xCord, 0, dominantPoint._1(), dominantPoint._2()) + ); + } + dominatedCoordinatesDistances++; + } + + + // Debug: Print updated distances + System.out.println("Updated Distances:"); + distances.forEach(System.out::println); + + for (Tuple2 point : points) { + double x = point._1(); + Tuple2 updatedPoint = new Tuple2<>(x, (double) colNumber); + // Replace the existing point with the updated one + points.set(points.indexOf(point), updatedPoint); + } + + // Debug: Print updated points + System.out.println("Updated Points for Column number: " + colNumber); + points.forEach(System.out::println); + + // check for the points based on the dominance + return new Tuple2, List>>(distances, points); + + + } + else{ + return new Tuple2<>( + Collections.emptyList(), // Empty list for distances + Collections.emptyList() // Empty list for points + ); + } +} + + private static List findProximityPoints(List> unDominatedPoints, final int totalPoints, final int gridSize) { + List> intervals = new ArrayList<>(); + + System.out.println("unDominatedPoints size: " + unDominatedPoints.size()); + // Calculate intervals based on the unDominatedPoints + for (int i = 1; i < unDominatedPoints.size(); i++) { + Tuple2 point1 = unDominatedPoints.get(i - 1); + Tuple2 point2 = unDominatedPoints.get(i); + intervals.add(new Tuple2<>((point1._1() + point2._1()) / 2, 0.0)); + } + + // Combine intervals using a frame + List mergedInterval = new ArrayList<>(intervals.size()); + mergedInterval.add(new double[]{1, intervals.get(0)._1()}); + + for (int i = 1; i < intervals.size(); i++) { + mergedInterval.add(new double[]{intervals.get(i - 1)._1(), intervals.get(i)._1()}); + } + + mergedInterval.add(new double[]{intervals.get(intervals.size() - 1)._1(), GRID_SIZE}); + + return mergedInterval; +} + + private static double findEuclideanDistance(int x, int y, int x1, int y1) { + return Math.sqrt((x1 - x) * (x1 - x) + (y1 - y) * (y1 - y)); + } + + private static double findEuclideanDistance(double x, double y, double x1, double y1){ + return Math.sqrt((x1 - x) * (x1 - x) + (y1 - y) * (y1 - y)); + } + + // FlatMap function to process input data and generate key-value pairs + private static Iterator, Tuple2>> parseInputData(String line) { + System.out.println("Debug: Processing Line: " + line); + + String[] distFavArray = line.split("\\s+"); + List, Tuple2>> result = new ArrayList<>(); + + if (distFavArray.length > 0) { + String facilityName = distFavArray[0]; + int matrixRowNumber = Integer.parseInt(distFavArray[1]); + + // Convert the strings to a list of strings + List binMatrixValues = Arrays.asList(Arrays.copyOfRange(distFavArray, 2, distFavArray.length)); + + double[] leftDistance = new double[binMatrixValues.get(0).length()]; + double[] rightDistance = new double[binMatrixValues.get(0).length()]; + + Arrays.fill(leftDistance, Double.MAX_VALUE); + Arrays.fill(rightDistance, Double.MAX_VALUE); + + leftDistance = getLeftDistance(leftDistance, binMatrixValues); + rightDistance = getRightDistance(rightDistance, binMatrixValues); + + for (int i = 0; i < binMatrixValues.get(0).length(); i++) { + result.add(new Tuple2<>(new Tuple2<>(facilityName, i + 1), + new Tuple2<>(matrixRowNumber, Double.min(leftDistance[i], rightDistance[i])))); + } + } + + // Debug: Print the generated key-value pairs + System.out.println("Debug: Generated Key-Value Pairs:"); + result.forEach(tuple -> System.out.println(tuple)); + + return result.iterator(); + } + + private static double[] getLeftDistance(double[] leftDistance, List gridRows) { + boolean isFavlFound = false; + for (int i = 0; i < gridRows.get(0).length(); i++) { + if (gridRows.get(0).charAt(i) == '1') { + leftDistance[i] = 0; + isFavlFound = true; + } else if (isFavlFound) { + leftDistance[i] = leftDistance[i - 1] + 1; + } + } + return leftDistance; + } + + private static double[] getRightDistance(double[] rightDistance, List gridRows) { + boolean isFavrFound = false; + for (int i = gridRows.get(0).length() - 1; i >= 0; --i) { + if (gridRows.get(0).charAt(i) == '1') { + rightDistance[i] = 0; + isFavrFound = true; + } else if (isFavrFound) { + rightDistance[i] = rightDistance[i + 1] + 1; + } + } + return rightDistance; + } + + // Comparator for Tuple2 +static class Tuple2Comparator implements Comparator>, Serializable { + static final Tuple2Comparator INSTANCE = new Tuple2Comparator(); + + @Override + public int compare(Tuple2 tuple1, Tuple2 tuple2) { + int compareResult = tuple1._1().compareTo(tuple2._1()); + if (compareResult == 0) { + // If the first elements are equal, compare the second elements + return Integer.compare(tuple1._2(), tuple2._2()); + } + return compareResult; + } +} + + private static Tuple2 calcBisectorProjections(double x, double y, double x1, double y1) { + double xx = ((y1 * y1) - (y * y) + (x1 * x1) - (x * x)) / (2 * (x1 - x)); + double yy = 0; + return new Tuple2<>(xx, yy); + } + +// Add this method to your class +private static Iterable iterableToJava(scala.collection.Iterable scalaIterable) { + List javaList = new ArrayList<>(); + scala.collection.Iterator scalaIterator = scalaIterable.iterator(); + while (scalaIterator.hasNext()) { + javaList.add(scalaIterator.next()); + } + return javaList; +} + +}