# Main code

At some points we could not make the rdd.map function work so we switched to regular map. Unfortunately then we don't really use pyspark's efficiency but at least we can answer the research questions.


#### HDFS
Could be used using: 

data = sc.textFile('hdfs://scomp1334:9000/user/steen176/csv/butterflies.csv')


#### addPyFile
Adds the classes to store species and observations to the workers

sc.addPyFile("classes.py")


In [7]:
from classes import Species, Observation
import subprocess as sp

if __name__ == "__main__":
    global species
    
    #This adds the classes.py to the pyspark context so it will be available for the worker nodes
    sc.addPyFile("classes.py")

    data = sc.textFile("/home/WUR/steen176/Downloads/butterflies.csv")
    corrected = data.map(lambda line: correctLine(line))
    species = extractSpecies(corrected)
    
    
    #We take the corrected dataset and parse each species+observations
    #rdd.map did not work here
    map(getObservation, corrected.toLocalIterator())
    
    #rdd.map did not work here
    clusters = map(cluster, species.keys())
    
    #GetList of rare species. Change top to another value to increase the number of rare species added
    top = 1
    rare_specs =  sorted([(specie, len(species[specie].observations)) for specie in species.keys()], 
                         key = lambda x: x[1])[0:top]
    
    
    #currently only for the rarest specie. If you want to do this for more than 1 species you need to loop over
    # rare_specs and do everything below here (until the end of the cell) in that loop (except for the subprocess line).
    # rare_specs[i] is each species that is rare
    # rare_specs[i][0] is the name of the rare species
    # rare_specs[i][1] is the KmeansCluster Object
    rarest_name = rare_specs[0][0]
    rarest_cluster = cluster(rarest_name)[1].clusterCenters
    rarest = (rarest_name, cluster(rarest_name)[1].clusterCenters)
    
    #Removes the rare_specie from the list with all cluster means, so we can compare the rare one with the rest without
    #Comparing the rare specie with itself.
    #The resulting list contains tuples as  values in the format of (speciesName, [center1,center2, etc.]])
    filtered_clusters = [(x[0], [x[1].clusterCenters]) for x in clusters if 
                       [tuple(y.tolist()) for y in x[1].clusterCenters] != 
                       [tuple(y.tolist()) for y in rarest_cluster]
                        ]
   
    
    
    #output appends to the result file out.tsv in the sp.call below the results are filtered and writen to the real output
    # file called output.tsv (out.tsv is then deleted)
    with open("out.tsv", "a") as out_file:
        for x in findCombinations(rarest, filtered_clusters):
            out_file.write(x)
            
    
    #Removes duplicates that could be create in the findCombinations function
    sp.call("echo 'rare_species\tclusters_with\tlongtitude\tlattitude' > output.tsv; sort -u out.tsv >> output.tsv;rm out.tsv", shell = True)
   
    

# Utility

In [4]:
def getSpecies(line):
    """Returns the species as string for the entered line"""
    line = line.split(',')
    return line[0]

def correctLine(line):
    """Returns the line with fixed colums.
    Sometimes people that use database enter a comma in a field and then when the line is being.
    Split on commas there will be a column to many. This method fixes that error.
    """
    line = line.split(',')
    if len(line) != 10:
        return ','.join(line[0:4] + [" ".join(line[4:6])]+ line[6:])
    else:
        return ','.join(line)

def getObservation(line):
    """Extract observations from a line.
    
        Keyword Arguments:
            line -- Comma seperated observation line.
            
    A line can contain multiple observations from the same species. If multiple are present
    Observations are created based on the corresponding numbers. The created Observation
    objects are added to the corresponding species object.    
    """
    line = line.split(',')
    specie = line[0]
    for i in xrange(int(line[5])):  #line[5] is the number of observations
        longti = line[8]
        lati = line[9]
        obs = Observation(longti,lati)
        species[specie].addObservation(obs)
    
    
def createSpecieObjects(species_list):
    """Create a dictionary with all species from the database in the format {species_name: species_object}.
        
        Keyword Arguments:
            species_list -- A list containing all individual specie names.
        Returns:
            species_objects -- A dictionary containing species and species Objects
    """
    species_objects = {}
    for specie in species_list:
        obj = Species(specie)
        species_objects[specie] = obj
    return species_objects


def extractSpecies(rdd):
    species = rdd.map(getSpecies)
    species_list  = list(set([x for x in species.toLocalIterator()]))
    return createSpecieObjects(species_list)


# Clustering

In [3]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import SparseVector
from math import sqrt
NUM_CLUSTER = 4

def kmeans(sparse_vectors):
    """Clusters the sample using k-means with a random initalization mode
    
        Keyword Arguments:
            sparse_vectors -- An RDD object containing 
        Returns:
            The Kmeans object
    """
    return KMeans.train(sparse_vectors, NUM_CLUSTER, maxIterations=20, runs=100, initializationMode="random")

def cluster(specie):
    """Prepare the samples for clustering and call the clustering function
    
        Keyword Arguments:
            specie -- The name of the specie to be clustered
        Returns:
            A tuple of (specie_name, KmeansObject)
            
        The species name has to be provided, next the species object is extracted from
        the species dictionary. the species object is then used to get an RDD containing SparseVectors 
        of all observations of that species.
    """
    vectors = species[specie].getVectorRDD()
    return (specie, kmeans(sc.parallelize(vectors)))


    
def error(point):
    """ Was in the example code, not used in this code"""
    
    center = clusters.centers[clusters.predict(point)]
    denseCenter = DenseVector(numpy.ndarray.tolist(center))
    return sqrt(sum([x**2 for x in (DenseVector(point.toArray()) - denseCenter)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print(WSSSE)

# Comparing clusters

In [6]:
from itertools import combinations
import numpy 
def centerdiffs(centerA, centerB, DIFF=0.1):
    """Calculates the difference between longtitude and lattitude
        
        Keyword Arguments:
            centerA -- A numpy array of the longtitude and lattitude as floats
            centerB -- A numpy array of the longtitude and lattitude as floats
        Returns:
            True if the Differences are between -DIFF and DIFF
            False if the Difference are not between -DIFF and DIFF
            
        If the difference is smaller than DIFF the cluster centers are overlapping
    """
    longA = centerA[0]
    longB = centerB[0]
    lattA = centerA[1]
    lattB = centerB[1]
    
    longDiff = numpy.subtract(longA, longB)
    lattDiff = numpy.subtract(lattA, lattB)
    
    #print("Longdiff: {}\t\tLattdiff: {}".format(str(longDiff), str(lattDiff)))

    if (longDiff < DIFF and longDiff > DIFF*-1) and (lattDiff < DIFF and lattDiff > DIFF*-1):
        return True
    else:
        return False
    

def findCombinations(rare_specie, all_species_centers):
    """Compare a rare specie with all other species and see if some clusters overlap.
    
        Keyword Arguments:
            rare_specie_centers -- The np.array of cluster centers
                    format: (u'Cuculus canorus', Array())
            all_species_centers -- The np.array of cluster centers for all other species
        Yields:
            All overlapping clusters with name and location, ready for writing to the output file
    """
    rare_name = rare_specie[0]
    rare_centers = rare_specie[1]
    for one in all_species_centers:
        other_name = one[0]
        for center in rare_centers:
            for one_center in one[1][0]:
                if centerdiffs(center, one_center):
                    yield "{}\t{}\t{}\t{}\n".format(rare_name, other_name, str(center[0]), str(center[1]))
                else:
                    pass