# Find collinear points with pyspark

* Collinearity of a set of points is the property of their lying on a single line. A set of points with this property is said to be collinear. 
* To determine whether a set of points all lie on the same line we need a standard way to define (or parametrize) a line.

#### We will find collinear points by:

* List all pairs of points. We can do that efficiently in spark by computing cartesian product of the list of points with itself. 
* Remove the pairs in which the same point appears twice such as  ((2,0),(2,0)).
* For each pair of points, find the parameterization  (𝑎,𝑏)  of the line connecting them as described above.
* Group the pairs according to their parameters. Clearly, if two pairs have the same  (𝑎,𝑏)  values, all points in the two pairs lie on the same line.
* Eliminate the groups that contain only one pair (any pair of points defines a line).
* In each of the remaining groups, unpack the point-pairs to identify the individual points.
* Output the sets of 3 or more colinear points.

### Initialize spark context using 4 local cores as workers

In [604]:
from pyspark import SparkContext, SparkConf

#We can create a SparkConf() object and use it to initialize the spark context
conf = SparkConf().setAppName("Collinear Points").setMaster("local[4]") 
sc = SparkContext(conf=conf)    

from pyspark.rdd import RDD

In [605]:
def format_result(x):
    x[1].append(x[0][0])
    return tuple(x[1])

In [606]:
def to_sorted_points(x):
    """
    Sorts and returns a tuple of points for further processing.
    """
    return tuple(sorted(x))

In [603]:
#sc.stop()

### Format to tuple

In [607]:
## use this function to change into tuple

def to_tuple(x): 
    
    return(tuple(map(int, x.split(' '))))


In [608]:
# extra cell to check to_tuple 
my_list = ('1 1')[0]
to_tuple(my_list)


(1,)

### Remove duplicate tuple 

In [612]:

def del_duplicates(x):
    """ 
    Will use this function to delete duplicate tuple from the tuple list
    """
    
         
    return [t for t in (set(tuple(i) for i in x))] 
          
        

In [613]:
lst = [(1, 2), (5, 7), (3, 6), (1, 2)]
print(non_duplicates(lst))

True


In [614]:
def non_duplicates(x):
    """ 
    Use this function inside the get_cartesian() function to 'filter' out pairs with duplicate points
    """
 
    mapping = {}
    for i in range(len(x)):
        if x[i] in mapping:
            return False
        mapping[x[i]] = True
    return True
      

In [615]:
# extra cell to check non_duplicates

my_input = ((0,0),(1,2))
non_duplicates(my_input)

False

### Get cartesian product

In [619]:
## use this function to get cartesian product.

def get_cartesian(rdd):
       
    rdd  = rdd.cartesian(rdd).filter(lambda x: x[0] != x[1])
    
    return rdd
    

### Get the slope

In [622]:
## Use this function to get the slope

def find_slope(x):
    
  
    #return (x[0][1]-x[1][1])/(x[1][1]-x[1][1])

    
    p1 = x[0]
    p2 = x[1]
    if p1[0] == p2[0] :
        slope = "inf"
    else:
       slope = (p2[1] - p1[1]) / (p2[0] - p1[0])
    t1 = tuple([x[0], slope])
    t2 = tuple([t1, x[1]])
    return t2

   

In [623]:
# extra cell to check find_slope function
x = ((1,2),(3,4))
print(find_slope(x))

(((1, 2), 1.0), (3, 4))


### Get the collinear points

In [679]:
def find_collinear(rdd):
   
    rdd1 = rdd.map(find_slope) 
    rdd2 = rdd1.groupByKey().mapValues(list) 
    rdd = rdd2.map(lambda x: format_result(x)).filter(lambda x: len(x) > 2) 

    return rdd
    
   

### Build  collinear sets 

In [662]:
def build_collinear_set(rdd):
    
   
    rdd = rdd.map(lambda x: to_tuple(x))
      
    rdd = get_cartesian(rdd)
    
    rdd = find_collinear(rdd)
            
    
    return rdd

    

### The main Process

In [665]:
def process(filename):
    """
    This is the process function used for finding collinear points using inputs from different files
      
    """
    # Load the data file into an RDD
    rdd = sc.textFile(filename)    #Input: Name of the test file
    
    rdd = build_collinear_set(rdd) # Output: Set of collinear points
      
    return rdd

In [None]:
# sc.stop()