# Problem: 
Given a set of N coordinates as (X,Y) pairs, we want to compute how many
coordinates are within R meters of an (X,Y) centroid where the distance metric is Euclidean.
Goal: Design and write a class that is able to solve instances of this problem. The interface
should be simple, documented, and allow a typical developer to use your API to efficiently query
coordinates and centroids to find coordinate counts in proximity to centroids.
Using this class and the sample data provided to provide solutions to the following questions.
1. How many coordinates are within 5 meters of at least one of the centroids?
2. How many coordinates are within 10 meters of at least one of the centroids?
3. What is the minimum radius R such that 80 percent of the coordinates are within R meters of at least one of K centroids?
4. Bonus: What is the maximum radius R such that the number of coordinates within a distance strictly less than R of any centroid is at most 1000?

Files:
- coordinates.csv: contains 1 million X,Y pairs with a header, units in meters
- centroids.csv: contains 1000 cluster centroids as X,Y pairs with a header, units in meters

Deliverable:
- State your assumptions. Provide direction to run your code and to recreate the solutionsto the questions. This includes installing all the dependencies, specifying path, or running the executable. Assume the developer executing and validating your code using Linux distribution.
- Please provide simple unit tests for your software.
- Provide solutions along with runtimes and peak memory usage for each question.
- Document the computation and memory complexity of each API call in your class as a function of the K centroids and N coordinates.

In [1]:
import os
"""
Since the testing suite doesn't support testing on Dataproc clusters,
the testing environment is setup to replicate Dataproc via the following steps:
"""
JAVA_VER = "8u332-b09"
JAVA_FOLDER = "/tmp/java"
FILE_NAME = f"openlogic-openjdk-{JAVA_VER}-linux-x64"
TAR_FILE = f"{JAVA_FOLDER}/{FILE_NAME}.tar.gz"
DOWNLOAD_LINK = f"https://builds.openlogic.com/downloadJDK/openlogic-openjdk/{JAVA_VER}/openlogic-openjdk-{JAVA_VER}-linux-x64.tar.gz"
! rm -rf $JAVA_FOLDER
! mkdir $JAVA_FOLDER
# Download Open JDK 8. Spark requires Java to execute.
! wget -P $JAVA_FOLDER $DOWNLOAD_LINK
os.environ["JAVA_HOME"] = f"{JAVA_FOLDER}/{FILE_NAME}"
! tar -zxf $TAR_FILE -C $JAVA_FOLDER
! echo $JAVA_HOME


--2024-01-05 09:49:39--  https://builds.openlogic.com/downloadJDK/openlogic-openjdk/8u332-b09/openlogic-openjdk-8u332-b09-linux-x64.tar.gz
Resolving builds.openlogic.com (builds.openlogic.com)... 54.192.150.64, 54.192.150.116, 54.192.150.37, ...
Connecting to builds.openlogic.com (builds.openlogic.com)|54.192.150.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 105558622 (101M) [application/x-gzip]
Saving to: ‘/tmp/java/openlogic-openjdk-8u332-b09-linux-x64.tar.gz’


2024-01-05 09:49:41 (59.8 MB/s) - ‘/tmp/java/openlogic-openjdk-8u332-b09-linux-x64.tar.gz’ saved [105558622/105558622]

/tmp/java/openlogic-openjdk-8u332-b09-linux-x64


In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("PairwiseDistanceExample").getOrCreate()




In [20]:
# spark.stop()

In [21]:

# Read CSV files
centroids_df = spark.read.csv("Data/centroids.csv.bz2", header=True, inferSchema=True)

centroids_df.printSchema()
# Show the DataFrame
centroids_df.show()

root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)

+-------------------+-------------------+
|                  X|                  Y|
+-------------------+-------------------+
| -115.1140124595813| -498.9879464124197|
|-0.5227499155640025| -186.5801101930945|
|  -39.4848803518455| -459.0236264356627|
| -4.918948978550608| 145.66092744140258|
|  -60.7369212403257|  264.4701308991166|
|-331.46583985621436| 320.76423216748515|
|-135.58374696646203| 26.575119653588942|
|-158.66413434688965| -209.2160760552777|
| 296.83864590885764| 391.02348464829294|
|-119.31911868661338| 239.12040824321247|
|-30.051884126232498| -56.15738478405685|
| 350.49023203314675|-461.99648415979146|
|-17.254590642131085| -9.502172560105105|
|  34.60422416929665|  191.3128473521155|
|-194.41418640528352| -249.8722183939046|
|   356.608984177928| 187.69701273208983|
|  321.3381541837739|  9.179356344668044|
|-127.88076183785635| -271.3613821858672|
| 204.34446941315863|  -476.189158735473|
|  3

In [22]:

# Read CSV files
coordinates_df = spark.read.csv("Data/coordinates.csv.bz2", header=True, inferSchema=True)

coordinates_df.printSchema()
# Show the DataFrame
coordinates_df.show()




root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)

+-------------------+-------------------+
|                  X|                  Y|
+-------------------+-------------------+
| 133.37999002739875|   70.2259406546376|
| 235.79901946653808|-299.95706910083453|
| 192.55720004008475|  18.26500093427191|
|-17.823904060029953|-1.9179740034930717|
|-441.00303111998005|-425.34627850821437|
| -416.4018644934994|  86.63118699773204|
|  280.6479457858666| -424.0130678291138|
|  -279.730515078068|-205.86880095176195|
| 128.35820257781907| -93.83617444138126|
| 163.62371178554403|  62.64532543769148|
| -46.49656000485959|  339.6043779426013|
|  343.1579293057832|-287.92617948544654|
|-115.70715144878952|  98.46029667186691|
|  506.5424974792967| 199.19433496509072|
|  47.29679680565559| 408.15772702869225|
|  444.9502623068463|   272.445880020846|
|-185.18905424350012|-240.33513312645408|
|-362.65661099184814|-381.71219076824815|
| 436.59890193014843| -400.7481795883354|
|  3

                                                                                

In [23]:
# Define a PySpark UDF for pairwise distance calculation
def pairwise_distance_udf(coord_x, coord_y, centroid_x, centroid_y):
    return F.sqrt((F.pow(coord_x - centroid_x, 2) + F.pow(coord_y - centroid_y, 2)))
pairwise_distance = F.udf(pairwise_distance_udf)


In [29]:
# Create a DataFrame with pairwise distances
distances_df = coordinates_df.crossJoin(centroids_df).withColumn(
    "distance",
    pairwise_distance(
        coordinates_df["X"],
        coordinates_df["Y"],
        centroids_df["X"],
        centroids_df["Y"],
    ),
)


In [30]:
# Define the radius values
radius_5 = 5.0
radius_10 = 10.0

# Count coordinates within 5 meters of at least one centroid
count_within_5 = distances_df.filter(distances_df["distance"] <= radius_5).groupBy(
    "coordinates_X", "coordinates_Y"
).agg(F.countDistinct("centroids_X", "centroids_Y").alias("count_within_5"))


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `coordinates_X` cannot be resolved. Did you mean one of the following? [`distance`, `X`, `X`, `Y`, `Y`].;
'Aggregate ['coordinates_X, 'coordinates_Y], ['coordinates_X, 'coordinates_Y, 'count(distinct 'centroids_X, 'centroids_Y) AS count_within_5#352]
+- Filter (cast(distance#341 as double) <= 5.0)
   +- Project [X#281, Y#282, X#248, Y#249, pairwise_distance_udf(X#281, Y#282, X#248, Y#249)#340 AS distance#341]
      +- Join Cross
         :- Relation [X#281,Y#282] csv
         +- Relation [X#248,Y#249] csv


In [31]:
# Calculate minimum radius R for 80% of coordinates
quantile_value = 0.8
min_radius_df = distances_df.groupBy("coordinates_X", "coordinates_Y").agg(
    F.expr(f"percentile_approx(distance, {quantile_value * 100})").alias("min_radius")
)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `coordinates_X` cannot be resolved. Did you mean one of the following? [`distance`, `X`, `X`, `Y`, `Y`].;
'Aggregate ['coordinates_X, 'coordinates_Y], ['coordinates_X, 'coordinates_Y, percentile_approx(cast(distance#341 as double), cast(80.0 as double), 10000, 0, 0) AS min_radius#358]
+- Project [X#281, Y#282, X#248, Y#249, pairwise_distance_udf(X#281, Y#282, X#248, Y#249)#340 AS distance#341]
   +- Join Cross
      :- Relation [X#281,Y#282] csv
      +- Relation [X#248,Y#249] csv


In [1]:
spark.stop()

NameError: name 'spark' is not defined