### Collinear points

Definition of collinearity[1]: In geometry, collinearity of a set of points is the property of their lying on a single line. A set of points with this property is said to be collinear.

![](non-collinear-points.jpg)

Here, points P,Q,R and A,R,B are collinear. However, points A,B,C are non-collinear. For more, refer [2].

1. https://en.wikipedia.org/wiki/Collinearity
2. http://www.mathcaptain.com/geometry/collinear-points.html

In [3]:
import os

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

from pyspark import SparkContext, SparkConf

#We can create a SparkConf() object and use it to initialize the spark context
conf = SparkConf().setAppName("Collinear Points").setMaster("local[4]") #Initialize spark context using 4 local cores as workers
sc = SparkContext(conf=conf)    

from pyspark.rdd import RDD

In [8]:
#Helper Functions
def format_result(x):
    """
    Changes the format to tuple.
    """
    x[1].append(x[0][0])
    return tuple(x[1])


def to_sorted_points(x):
    """
    Sorts and returns a tuple of points for further processing.
    """
    return tuple(sorted(x))

def to_tuple(x):
    """
    Changes format from string to tuple.
    """
    return(tuple ( int(i) for i in x.split(' ')))

def non_duplicates(x):
    """ 
    Use this function inside the get_cartesian() function to 'filter' out pairs with duplicate points
    """
    if(x[0]==x[1]):
            return False
    else:
            return True  
        
def get_cartesian(rdd):
    """
    Performs the Cartesian product of the rdd with the specified input rdd
    """
    rdd=rdd.cartesian(rdd).filter(lambda x: x[0] != x[1])
    return rdd

def find_slope(x):
    """
    Determines the slope given two points
    """
    (ax, ay), (bx, by) = x
    slope = "inf" if ax == bx else (by - ay)/(bx - ax)
    return(((ax, ay), slope), (bx, by))



In [5]:
def find_collinear(rdd):
    return rdd.map(find_slope).groupByKey().map(lambda x:(x[0][0],) + tuple(x[1])).filter(lambda x: len(x) > 2)

def verify_collinear_sets(collinearpointsRDD, testlist):
    collinearpoints = [tuple(sorted(x)) for x in list(set(collinearpointsRDD.collect()))]
    testlist = [tuple(sorted(x)) for x in list(set(testlist))]
    return set(collinearpoints) == set(testlist)

In [6]:
def build_collinear_set(rdd):
    
    crdd=rdd.map(lambda x: to_tuple(x))
    crdd=get_cartesian(crdd)
    rdd=find_collinear(crdd)
    
    # Sorting each of your returned sets of collinear points. This is for grading purposes. 
    # YOU MUST NOT CHANGE THIS.
    rdd = rdd.map(to_sorted_points)
    
    return rdd

In [7]:
def process(filename):
    """
    This is the process function used for finding collinear points using inputs from different files
    Input: Name of the test file
    Output: Set of collinear points
    """
    # Load the data file into an RDD
    rdd = sc.textFile(filename)
    
    rdd = build_collinear_set(rdd)
    
    # Collecting the collinear points RDD in a set to remove duplicate sets of collinear points. This is for grading purposes. You may ignore this.
    res = set(rdd.collect())
    
    return res

In [10]:
process("../resource/asnlib/public/data.txt")

{((-2, -2), (1, 1), (2, 2), (3, 3)),
 ((0, -3), (0, 1), (0, 5)),
 ((0, 1), (3, 4), (5, 6))}