# Cross matching of 3D data sets

The idea is to take two different data sets, and perform a cross-match of objects between the two.

Perfect cross-match
--------

In the case of a perfect cross-match, we have $a \in A$ matches with $b \in B$ if $a = b$. 

Cross-match with uncertainty
--------

Say now A and B have some uncertainty in the position of their objects, and their cross-match is subject to a definition of neighbourhood, or minimum "distance" between them (keep in mind, we are in the 3 dimensional space!).
Depending on how we define the neighbourhood (that is which metric to compare two objects a priori close but not identical), the objects matching can be very different.

Let's illustrate this with an example.

![crossmatch_sphere](images/crossmatch_sphere.jpg)

Question: Does A match with B? Does C match with D?
--------

**Using the distance to centers: **
Let's suppose our metric is the (euclidean) distance between the centers of objects, and two points are said identical if their distance is smaller than $\epsilon$. We have $||AB|| < \epsilon$ and $||CD|| > \epsilon$, therefore we would say that A matches with B, but C does not match with D.

**Using the angular separation: **
Let's suppose our metric is the angular separation between the centers of objects (at same radius), and two points are said identical if their angular separation is smaller than $\beta_c$ . Both points have the same angular separation $\beta < \beta_c$, therefore we would say that A matches with B, and C matches with D.

We see in this simple example that depending on the metric chosen, the results are different.

Number of matched pairs
--------

Let A and B be two finite sets of points $\{ (r, \theta, \phi) \}$ with $r \in U(0, \infty), \, \theta \in U(0, \pi), \, \phi \in U(0, 2\pi)$, and $U(x, y)$ is the uniform distribution over [x, y].
Let denote $x$ the cross-match between points according to the distance to centers, and $o$ the cross-match between points according to the angular separation. We have 

$$\begin{align}
\text{card}(AxB)(r) &\propto 1/r \\
\text{card}(AoB)(r) &\propto cste
\end{align}$$

Of course this scaling depends on how we sample the parameters and fill the space. Instead for example, we could have sampled $\cos(\theta)$ from $U(-1, 1)$, and the results would have changed. But all of that is to illustrate the fact that the scaling of the cardinal for the cross-match depends upon the metric chosen.

What has been implemented in spark3D so far?
--------

So far, two sets of methods have been implemented:
* `PixelCrossMatch`, and in particular `CrossMatchHealpixIndex` which performs the cross-match between two sets A and B according to the healpix index of the objects at a given radial position (space is partitioned in redshift shells). (angular position)
* `CenterCrossMatch`, and in particular `CrossMatchPosition` which performs the cross-match between two sets A and B according to the distance between object centers. (distance to centers)

![crossmatch_neighbours](images/crossmatch_neighbours.jpg)

In this notebook, we show the use of both.

In [1]:
// Package to read data from FITS file
%AddDeps com.github.astrolabsoftware spark-fits_2.11 0.6.0

// Smile provides visualisation tools
%AddDeps com.github.haifengl smile-plot 1.5.1
%AddDeps com.github.haifengl smile-math 1.5.1
%AddDeps com.github.haifengl smile-core 1.5.1
%AddDeps com.github.haifengl smile-scala_2.11 1.5.1

// Contains extensions to the Swing GUI toolkit
%AddDeps org.swinglabs swingx 1.6.1

// Add the spark3d JAR. To generate it, run `sbt ++2.11.8 package` at the root of the package
%AddJar file:/Users/julien/Documents/workspace/myrepos/spark3D/target/scala-2.11/spark3d_2.11-0.2.2.jar

// Add healpix JAR
%AddJar file:/Users/julien/Documents/workspace/myrepos/spark3D/lib/jhealpix.jar

Marking com.github.astrolabsoftware:spark-fits_2.11:0.6.0 for download
Preparing to fetch from:
-> file:/var/folders/my/lfvl285927q2hzk545f39sy40000gn/T/toree_add_deps5098665649624919110/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/my/lfvl285927q2hzk545f39sy40000gn/T/toree_add_deps5098665649624919110/https/repo1.maven.org/maven2/com/github/astrolabsoftware/spark-fits_2.11/0.6.0/spark-fits_2.11-0.6.0.jar
Marking com.github.haifengl:smile-plot:1.5.1 for download
Preparing to fetch from:
-> file:/var/folders/my/lfvl285927q2hzk545f39sy40000gn/T/toree_add_deps5098665649624919110/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/my/lfvl285927q2hzk545f39sy40000gn/T/toree_add_deps5098665649624919110/https/repo1.maven.org/maven2/com/github/haifengl/smile-plot/1.5.1/smile-plot-1.5.1.jar
Marking com.github.haifengl:smile-math:1.5.1 for download
Preparing to fetch from:
-> file:/var/folders/my/lfvl285927q2hzk545f39sy40000gn/T/toree_add_deps5098665649624919110/
-> htt

# From raw data RDD to Point3D RDD

Load data from the test files provided in the spark3d repo.
Our raw data contains points with 3D coordinates (spherical: r, theta, phi) sampled from uniform distributions. Let's transform it into a Point3D RDD

In [4]:
import com.astrolabsoftware.spark3d.spatial3DRDD._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Xmatch").getOrCreate()

val fnA = "../../src/test/resources/astro_obs.fits"
val fnB = "../../src/test/resources/astro_obs2.fits"
val options = Map("hdu" -> "1")
val columns = "Z_COSMO,RA,DEC"
val spherical = true

// Load the data
val pointRDDA = new Point3DRDD(spark, fnA, columns, spherical, "fits", options)
val pointRDDB = new Point3DRDD(spark, fnB, columns, spherical, "fits", options)

# Repartitioning of the space

By default, the pointRDD is partitioned randomly (i.e. Spark made partition regardless to the content of the file).
Let's repartition our data based on the distance to the center (Onion Space). In addition, to ease the cross-match between the two data sets, let's partition A and B the same way.

In [5]:
import com.astrolabsoftware.spark3d.utils.GridType
import com.astrolabsoftware.spark3d.spatialPartitioning.SpatialPartitioner

// As we are in local mode, and the file is very small, the RDD pointRDD has only 1 partition.
// For the sake of this example, let's increase the number of partition to 100.
val pointRDD_partA = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100).cache()
// Get the partitioner of A
val partitionerA = pointRDD_partA.partitioner.get.asInstanceOf[SpatialPartitioner]
// Repartition B as A
val pointRDD_partB = pointRDDB.spatialPartitioning(partitionerA).cache()

# Cross matching using Healpix index

Cross match 2 RDD based on the healpix index of geometry center.
You have to choice to return:
* (1) Elements of (A, B) matching (returnType="AB")
* (2) Elements of A matching B (returnType="A")
* (3) Elements of B matching A (returnType="B")
* (4) Healpix pixel indices matching (returnType="healpix")

Which one you should choose? That depends on what you need:
(1) gives you all elements but is slow.
(2) & (3) give you all elements only in one side but is faster.
(4) gives you only healpix center but is even faster.

In [6]:
import com.astrolabsoftware.spark3d.spatialOperator.PixelCrossMatch

// Shell resolution
val nside = 512
// Keeping only elements from A with counterpart in B
val xMatchA = PixelCrossMatch.CrossMatchHealpixIndex(pointRDD_partA, pointRDD_partB, nside, "A")
// Keeping only elements from B with counterpart in A
val xMatchB = PixelCrossMatch.CrossMatchHealpixIndex(pointRDD_partA, pointRDD_partB, nside, "B")
// Keeping all elements with counterparts in both A and B
val xMatchAB = PixelCrossMatch.CrossMatchHealpixIndex(pointRDD_partA, pointRDD_partB, nside, "AB")

println("Keeping only elements from A with counterpart in B: ", xMatchA.count(), " points")
println("Keeping only elements from B with counterpart in A: ", xMatchB.count(), " points")
println("Keeping all elements with counterparts in both A and B: ", xMatchAB.count(), " points")

(Keeping only elements from A with counterpart in B: ,1337, points)             
(Keeping only elements from B with counterpart in A: ,1278, points)
(Keeping all elements with counterparts in both A and B: ,1337, points)         


# Cross matching using distance between object centers

Cross match 2 RDD based on based on the object centers.
You have to choice to return:
* (1) Elements of (A, B) matching (returnType="AB")
* (2) Elements of A matching B (returnType="A")
* (3) Elements of B matching A (returnType="B")

Which one you should choose? That depends on what you need:
(1) gives you all elements but is slow.
(2) & (3) give you all elements only in one side but is faster.

In [7]:
import com.astrolabsoftware.spark3d.spatialOperator.CenterCrossMatch

// Distance threshold for the match
val epsilon = 0.004
// Keeping only elements from A with counterpart in B
val CxMatchA = CenterCrossMatch.CrossMatchCenter(pointRDD_partA, pointRDD_partB, epsilon, "A")
// Keeping only elements from B with counterpart in A
val CxMatchB = CenterCrossMatch.CrossMatchCenter(pointRDD_partA, pointRDD_partB, epsilon, "B")
// Keeping all elements with counterparts in both A and B
val CxMatchAB = CenterCrossMatch.CrossMatchCenter(pointRDD_partA, pointRDD_partB, epsilon, "AB")

println("Keeping only elements from A with counterpart in B: ", CxMatchA.count(), " points")
println("Keeping only elements from B with counterpart in A: ", CxMatchB.count(), " points")
println("Keeping all elements with counterparts in both A and B: ", CxMatchAB.count(), " points")

(Keeping only elements from A with counterpart in B: ,695, points)
(Keeping only elements from B with counterpart in A: ,497, points)
(Keeping all elements with counterparts in both A and B: ,6121, points)


# Display the results

In [8]:
import smile.plot._
import java.awt.Color
import java.awt.{GridLayout, Dimension}

import javax.swing.JFrame
import javax.swing.JPanel

import com.astrolabsoftware.spark3d.utils.Utils.sphericalToCartesian
import org.apache.spark.rdd.RDD
import com.astrolabsoftware.spark3d.geometryObjects._


/** Define palette of colors */
def colors : Array[java.awt.Color] = {
    Array(
        Color.BLACK, Color.RED, Color.GREEN, Color.BLUE,
        Color.ORANGE, Color.YELLOW, Color.DARK_GRAY, Color.PINK,
        Color.MAGENTA, Color.CYAN)
}

/** Define markers */
def markers : Array[Char] = {
    val strings = Array(
        "o", "s", "x", "+", "@", "q", 
        "-", "|", "O", "S", "#", "Q", "."
    )
    strings.map(x => x.toCharArray).flatten
    
}

/** 
  * format the data for smile.
  * The data for ScatterPlot must be Array[Array[Double]] (=Array[point3d])
  * We add one more dimension which is the partition.
  *
  * @param rdd : (RDD[Point3D])
  *   RDD whose elements are Point3D instances.
  * @return (Array[Array[Array[Double]]]) data as partitions -> points -> point -> coordinate 
  * 
  */
def format_data_for_smile(rdd: RDD[Point3D]) : Array[Array[Array[Double]]] = {
    rdd.map(
        x=> sphericalToCartesian(x).center.getCoordinate.toArray)
    .glom.collect().toArray
}

/** 
  * Plot 3D data sets.
  * 
  * @param display : (String)
  *   Either show or save. If save, extension will be given in the outname.
  * @param rddArr : (Array(RDD[Point3D]))
  *   Array containing RDD for data sets X whose elements are instances of Point3D.
  * part : (Int)
  *   Partition index to plot.
  * @param outname : (String)
  *   If save mode, name (incl. extenstion) for the out file.
  * @param title : (String)
  *   Title of the window.
  *
  */
def MyScatterPlotCross(
    display: String, rddArr: Array[RDD[Point3D]], 
    part: Int, outname: String, title: String) : Unit = {
    
    // Re-arange the data for plotting
    val dataInit = format_data_for_smile(rddArr(0))
    
    // Plot the results
    val window = ScatterPlot.plot(dataInit(part), markers(0), colors(0))
    
    for (pos <- 1 to rddArr.size - 1) {
      val dataOther = format_data_for_smile(rddArr(pos))
      window.points(dataOther(part), markers(pos), colors(pos))   
    }
    
    display match {
      case "show" => {
        val partFrame = new JFrame(title)
        partFrame.setLocationRelativeTo(null)
        partFrame.getContentPane().add(window)
        partFrame.setVisible(true)
        partFrame.setSize(new Dimension(500, 500))
      }
      case "save" => {
        val partHeadless = new Headless(window);
        partHeadless.pack();
        partHeadless.setVisible(true);
        partHeadless.setSize(new Dimension(500, 500))
        window.save(new java.io.File(outname))
      }
      case _ => throw new AssertionError("""
        I do not understand the kind of display you want.
        Choose between "show" and "save".
        """)
    }
}

// Set to "show" or "save"
val display = "show"
val partition = 5

// Display the result for healpix
MyScatterPlotCross(display, Array(pointRDD_partA, pointRDD_partB, 
                    xMatchA.asInstanceOf[RDD[Point3D]]), partition, 
                    "crossmatchAxB.png", "Healpix Cross match (A, B, AxB)")
MyScatterPlotCross(display, Array(xMatchA.asInstanceOf[RDD[Point3D]], 
                     xMatchB.asInstanceOf[RDD[Point3D]]), partition,
                     "crossmatchAxBOnly.png", "Healpix Cross match (AxB, BxA)")

// Display the result for object centers
MyScatterPlotCross(display, Array(pointRDD_partA, pointRDD_partB, 
                    CxMatchA.asInstanceOf[RDD[Point3D]]), partition, 
                    "CcrossmatchAxB.png", "Center Cross match (A, B, AxB)")
MyScatterPlotCross(display, Array(CxMatchA.asInstanceOf[RDD[Point3D]], 
                     CxMatchB.asInstanceOf[RDD[Point3D]]), partition,
                     "CcrossmatchAxBOnly.png", "Center Cross match (AxB, BxA)")

Here is a plot for Partition #5

Cross match based on angular separation (A, B, AxB)    |Cross match based on angular separation (AxB, BxA)   
:-------------------------:|:-------------------------:
![title](images/crossmatchAxB.png)|![title](images/crossmatchAxBOnly.png)

Cross match based on center distance (A, B, BxA)    |Cross match based on center distance (AxB, BxA)   
:-------------------------:|:-------------------------:
![title](images/CcrossmatchAxB.png)|![title](images/CcrossmatchAxBOnly.png)

In [9]:
val display = "show"

/** Define line styles */
def lines : Array[Line.Style] = {
    Array(Line.Style.SOLID, Line.Style.SOLID, 
          Line.Style.SOLID, Line.Style.SOLID, 
          Line.Style.SOLID, Line.Style.SOLID)
}

/** 
  * Plot the number of match per partition.
  * 
  * @param display : (String)
  *   Either show or save. If save, extension will be given in the outname.
  * @param rddArr : (Array(RDD[Point3D]))
  *   Array containing RDD from the Xmatch.
  * part : (Int)
  *   Partition index to plot.
  * @param outname : (String)
  *   If save mode, name (incl. extenstion) for the out file.
  * @param title : (String)
  *   Title of the window.
  *
  */
def plotNumberOfMatch(
    display: String, rddArr: Array[RDD[Point3D]], 
    part: Int, outname: String, title: String) : Unit = {
    
    // Re-arange the data for plotting
    val numberOfMatchInit = rddArr(0).mapPartitions(
      iter => Array(iter.size.toDouble).iterator, true).collect()
    
    // Plot the results
    val window = LinePlot.plot(numberOfMatchInit, lines(0), colors(0))
    
    for (pos <- 1 to rddArr.size - 1) {
      val numberOfMatchOther = rddArr(pos).mapPartitions(
        iter => Array(iter.size.toDouble).iterator, true).collect()
      window.line(numberOfMatchOther, lines(pos), colors(pos))
    }
    
    window.setAxisLabels(
        "partition index (sorted by increasing r)", 
        "Number of pairs")
    
    display match {
      case "show" => {
        val partFrame = new JFrame(title)
        partFrame.setLocationRelativeTo(null)
        partFrame.getContentPane().add(window)
        partFrame.setVisible(true)
        partFrame.setSize(new Dimension(500, 500))
      }
      case "save" => {
        val partHeadless = new Headless(window);
        partHeadless.pack();
        partHeadless.setVisible(true);
        partHeadless.setSize(new Dimension(500, 500))
        window.save(new java.io.File(outname))
      }
      case _ => throw new AssertionError("""
        I do not understand the kind of display you want.
        Choose between "show" and "save".
        """)
    }
}

plotNumberOfMatch(display, 
                  Array(xMatchA.asInstanceOf[RDD[Point3D]], CxMatchA.asInstanceOf[RDD[Point3D]]), 
                  partition, "line.png", "Number of match per partition (black=angular, red=center distance)")

Number of match per partition (black=angular, red=center distance)
![title](images/line.png)