# MDAnalysis and Pilot-In-Memory


The main performance bottleneck of the current MDAnalysis implementation is the construction of the graph using NetworkX taking ~78% of the overall runtime.


**Beckstein Profiling:**

    47        10           33      3.3      0.0      if adj is None:
    48        10        66544   6654.4      0.0          x = atoms.positions

    54        10     58689221 5868922.1     18.8          adj = (MDAnalysis.core.parallel.distances.distance_array(x, x, box=box) < cutoff)
    
    58        10           78      7.8      0.0      adjk = adj if Nmax is None else adj[:Nmax, :Nmax] 
    59        10    243009076 24300907.6   77.9      graph = nx.Graph(adjk)
    60        10      4346636 434663.6      1.4      subgraphs = nx.connected_components(graph)
    61        49        83597   1706.1      0.0      indices = [np.sort(g) for g in subgraphs]
    62        49      5694698 116218.3      1.8      return [atoms[group].residues for group in indices]



## 1. LeafletFinder NetworkX Implementation Profiling

see https://code.google.com/p/mdanalysis/

Profile default implementation based on [NetworkX](https://networkx.github.io/)

In [None]:
FILENAME="../data/mdanalysis/small/graph_edges_95_215.csv"
!head -n 5 {FILENAME}

In [None]:
%matplotlib inline
%time
import networkx as NX
import time
import datetime
import sys

start = time.time()
nxg = NX.read_edgelist(FILENAME, delimiter=",")
end_read = time.time()
NX.draw(nxg, pos=NX.spring_layout(nxg))

In [None]:
import matplotlib.pyplot as plt
degree_sequence=sorted(NX.degree(nxg).values(),reverse=True) # degree sequence
print "Degree sequence", degree_sequence
print "Length: %d" % len(degree_sequence)

dmax=max(degree_sequence)

plt.loglog(degree_sequence,'b-',marker='o')
plt.title("Degree Histogram")
plt.ylabel("Degree")
plt.xlabel("Node")

In [None]:
start = time.time()
components = NX.connected_components(nxg)
end_created = time.time()
count = 0
for component in components:
    print str(sorted(component))
    count = count + 1
end_connected = time.time()
print ("Number of Nodes: " + str(NX.number_of_nodes(nxg)))
print ("Number of Edges: " + str(NX.number_of_edges(nxg)))
print ("Connected Components: " + str(count))
print ("Runtime: " + str((end_connected-start)))
print ("Graph Creation Runtime: " + str((end_created-start)))
print ("Connected Components Runtime: " + str((end_connected - end_created)))

In [None]:
import os
from pilot_hadoop import PilotComputeService
from IPython.display import HTML

os.environ["SAGA_VERBOSE"]="100"

## 2. Pilot-Spark and Pilot-InMemory Implementation

Setup Spark cluster on local machine or HPC resource. Execute **either** 2.1.1 or 2.1.2

### 2.1.1 Start Spark Cluster using Pilot-Spark

see https://github.com/drelu/saga-hadoop

In [None]:
pilot_compute_description = {
                            "resource_url":"fork://localhost",
                            "number_cores": 1,
                      k      "cores_per_node":1,
                            "type":"spark"
                            }
pilot = PilotComputeService.create_pilot(pilot_compute_description);

# print out details of Pilot-Spark
details = pilot.get_details()
HTML("<a target='blank' href='%s'>Spark Web UI</a>"%details["web_ui_url"])

### 2.1.2 Start Spark Cluster inside YARN

In [1]:
%run env.py
%run util/init_spark.py

from pilot_hadoop import PilotComputeService as PilotSparkComputeService

pilotcompute_description = {
    "service_url": "yarn-client://yarn-aws.radical-cybertools.org",
    "number_of_processes": 16
}

print "SPARK HOME: %s"%os.environ["SPARK_HOME"]
print "PYTHONPATH: %s"%os.environ["PYTHONPATH"]

pilot_spark = PilotSparkComputeService.create_pilot(pilotcompute_description=pilotcompute_description)
sc = pilot_spark.get_spark_context()

SPARK Home: /usr/hdp/2.3.2.0-2950/spark-1.5.2-bin-hadoop2.6
SPARK HOME: /usr/hdp/2.3.2.0-2950/spark-1.5.2-bin-hadoop2.6
PYTHONPATH: /usr/hdp/2.3.2.0-2950/spark-1.5.2-bin-hadoop2.6/python:/usr/hdp/2.3.2.0-2950/spark-1.5.2-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip


### 2.2. Utilize Spark Native

In [2]:
!hadoop fs -ls /data/mdanalysis/large

Found 1 items
-rw-r--r--   3 luckow hdfs   29855041 2015-11-25 00:03 /data/mdanalysis/large/graph_edges_145746_1012872.csv


In [5]:
sc.version
rdd = sc.textFile("/data/mdanalysis/large/graph_edges_145746_1012872.csv")

In [6]:
rdd.count()

1012872

## 2.3 Distance Computation

In [2]:
import numpy as np
coord = np.loadtxt("vesicle_1_5M_373_P_145746.np_txt")

In [4]:
import pyspark.mllib.linalg.distributed
coord_matrix=pyspark.mllib.linalg.distributed.RowMatrix(sc.parallelize(coord, 16))

In [5]:
row_rdd=coord_matrix.rows

In [6]:
sample=row_rdd.sample(False, 0.01, 81)
sample.count()

1458

In [9]:
start = time.time()
distances=  row_rdd.cartesian(row_rdd).\
            map(lambda a: (a[0].squared_distance(a[1]))).\
            filter(lambda a: a>15.0).\
            saveAsTextFile("distances.csv")
print "ComputeDistance, %.2f"%(time.time()-start)

Py4JJavaError: An error occurred while calling o108.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://radical-5:8020/user/luckow/Distances.csv already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1430)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1409)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:522)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


In [10]:
!hadoop fs -rmr Distances.csv

rmr: DEPRECATED: Please use 'rm -r' instead.
15/11/26 03:48:14 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes.
Moved: 'hdfs://radical-5:8020/user/luckow/Distances.csv' to trash at: hdfs://radical-5:8020/user/luckow/.Trash/Current


## 2.4 Leaflet Finder Pilot-InMemory Implementation (Graph)

In [None]:
from distributed_inmem.dataunit_spark import DistributedInMemoryDataUnit
import time

FILENAME="../data/mdanalysis/small/graph_edges_95_215.csv"

FILENAME_ALL_EDGES="../data/mdanalysis/small/graph_edges_95_215_alledges.csv"
du = DistributedInMemoryDataUnit(name="LeafletFinderGraph", sc=sc)

#DistributedInMemoryDataUnit.spark_context.version

f = open(FILENAME_ALL_EDGES)
graph = f.readlines()
du.load(graph)
f.close()

def identityMapper(edge, args):
    #print edge
    #comp = edge.strip().split(",")
    #return (int(comp[0]), int(comp[1]))
    return eval(str(edge))

def groupByVertex(data):
    print("Call reduce on: " + str(data))
    

new_iteration_needed = du.sc.accumulator(0)

# check for smaller keys in each set
def process_vertex(vertex):
    """ pass single vertex and its adjecent vertices
        e.g.: (0, [0, 67, 14])
    """
    global new_iteration_needed
    vertex = eval(vertex)
    source = int(vertex[0])
    dest= sorted([int(i) for i in vertex[1]])
    local_max = False
    
    first_edge_destination = int(dest[0])
    new_vertices = []    
    print "*********Source: %d First Edge Dest: %d"%(source, first_edge_destination) 
    if source <= first_edge_destination:
        local_max = True
        new_vertices.append((source, first_edge_destination))
            

    print "Process: " + str(vertex) + " Local Max: " + str(local_max)
    last_edge_destination = first_edge_destination

    for current_destination in vertex[1]:
        print "Current destination: %s"%str(current_destination)
        current_destination = int(current_destination)
        if current_destination == last_edge_destination: 
            continue
        
        if local_max == True:
            edge = (source, current_destination)
            new_vertices.append(edge)
        else:
            new_vertices.append((first_edge_destination, current_destination))
            new_vertices.append((current_destination, first_edge_destination))
            print "Add 1 to accumulator"
            new_iteration_needed.add(1)

        last_edge_destination = current_destination
    
    if ((not local_max) and (source < last_edge_destination)):
        new_vertices.append((source, first_edge_destination))
    
    print "Return new vertices: " + str(new_vertices)
    return new_vertices


#process_vertex("('19', ['19', '7', '9', '41'])")
num_iterations=0
start = time.time()
while True:
    old_accum_value = new_iteration_needed.value
    print "*********** Start iteration: %d " % num_iterations
    future_result = du.map_pilot(identityMapper, None, number_of_compute_units=2)
    result_du=future_result.result()[0]
    future_result = result_du.reduce_pilot(process_vertex, number_of_compute_units=2)
    output = future_result.result()
    output.export()   
    du = output
    num_iterations = num_iterations + 1
    print "New iteration accum: %d old value: %d"%(new_iteration_needed.value, old_accum_value)
    if old_accum_value < new_iteration_needed.value:
        #print "Accumulator value was increased. New iteration."
        continue        
        #pass
    else:
        break
    break
end = time.time()
print "Final results: "
num_components=du.data.groupByKey().count()
print "Finished after %d Iterations. Found %d components. Time: %.2f"%(num_iterations, num_components, (end-start)) 


In [None]:
du.data.groupByKey().count()

## 2.5 Native Spark Implementation

#### 2.3.1 Load data from text file 

In [8]:
%%time
FILENAME="/data/mdanalysis/small/graph_edges_95_215.csv"
data = sc.textFile(FILENAME).map(lambda line: [int(i) for i in line.split(",")])
# add backward edges
data = data.flatMap(lambda v: [(v[0],v[1]),(v[1],v[0])])

#data.saveAsTextFile("../data/mdanalysis/small/graph_edges_95_215_alledges.csv")
#data = data.filter(lambda v: v[0] != v[1])
#print data.collect()

data_grouped = data.groupByKey().mapValues(lambda a: sorted(set(a)))
print data_grouped.collect()

[(0, [0, 14, 67]), (2, [2, 13, 34, 62]), (4, [4, 27, 33, 68, 94]), (6, [6, 57, 64, 67]), (8, [8, 46, 69, 88, 93]), (10, [10, 31, 40, 71]), (12, [7, 12, 30, 50, 91]), (14, [0, 14, 48, 64]), (16, [16, 40]), (18, [18]), (20, [20, 85]), (22, [22, 54]), (24, [24, 62]), (26, [26, 49, 57, 70]), (28, [11, 17, 28, 69]), (30, [9, 12, 23, 30, 50]), (32, [32, 53]), (34, [2, 34]), (36, [36, 47, 84, 92]), (38, [9, 23, 38]), (40, [10, 16, 31, 40, 71]), (42, [42, 73]), (44, [44, 68]), (46, [8, 46]), (48, [14, 48, 49]), (50, [12, 23, 30, 50]), (52, [29, 52, 79, 89]), (54, [22, 54, 87]), (56, [56, 76]), (58, [58, 81]), (60, [17, 60, 69, 81]), (62, [2, 24, 25, 62, 79, 89]), (64, [6, 14, 57, 64, 67]), (66, [51, 66]), (68, [4, 44, 68, 72, 94]), (70, [26, 70, 79, 86]), (72, [68, 72]), (74, [43, 47, 53, 74, 82]), (76, [56, 76]), (78, [78, 92]), (80, [43, 80]), (82, [74, 77, 82]), (84, [36, 63, 84]), (86, [70, 79, 86]), (88, [8, 69, 88, 93]), (90, [11, 55, 75, 90]), (92, [36, 47, 78, 92]), (94, [3, 4, 33, 59,

#### 2.3.2 Connected Component Implementation

In [9]:
new_iteration_needed = sc.accumulator(0)
# check for smaller keys in each set
def process_vertex(vertex):
    """ pass single vertex and its adjecent vertices
        e.g.: (0, [0, 67, 14])
    """
    global new_iteration_needed
    source = vertex[0]
    local_max = False
    
    first_edge_destination = vertex[1][0]
    new_vertices = []    
    print "*********Source: %d First Edge Dest: %d"%(source, first_edge_destination) 
    if source <= first_edge_destination:
        local_max = True
        new_vertices.append((source, first_edge_destination))
            
    #pdb.set_trace()
    print "Process: " + str(vertex) + " Local Max: " + str(local_max)
    last_edge_destination = first_edge_destination

    #if vertex[1]==None or len(vertex[1])<=1:
    #    new_vertices.append((source, source))   
    for current_destination in vertex[1]:
        #print "Current destination: %s"%str(current_destination)
        if current_destination == last_edge_destination: 
            continue
        
        if local_max == True:
            edge = (source, current_destination)
            new_vertices.append(edge)
        else:
            new_vertices.append((first_edge_destination, current_destination))
            new_vertices.append((current_destination, first_edge_destination))
            print "Add 1 to accumulator"
            new_iteration_needed.add(1)

        last_edge_destination = current_destination
    
    if ((not local_max) and (source < last_edge_destination)):
        new_vertices.append((source, first_edge_destination))
    
    #print "Return new vertices: " + str(new_vertices)
    return new_vertices


#process_vertex((19, [7, 9, 19, 41]))
num_iterations=0
cc = data_grouped
start = time.time()
while True:
    old_accum_value = new_iteration_needed.value
    print "*********** Start iteration: %d " % num_iterations
    #print "Accum before iteration: " + str(old_accum_value)
    cc = cc.flatMap(lambda v: process_vertex(v))\
           .groupByKey()\
           .mapValues(lambda a: sorted(set(a)))
    cc.collect()
    num_iterations = num_iterations + 1
    #print "New iteration accum: %d old value: %d"%(new_iteration_needed.value, old_accum_value)
    if old_accum_value < new_iteration_needed.value:
        #print "Accumulator value was increased. New iteration."
        continue
    else:
        break
end = time.time()

print "Finished after %d Iterations. Found %d components. Time: %.2f"%(num_iterations, cc.count(), (end-start))

*********** Start iteration: 0 
*********** Start iteration: 1 
*********** Start iteration: 2 
*********** Start iteration: 3 
*********** Start iteration: 4 
*********** Start iteration: 5 
Finished after 6 Iterations. Found 10 components. Time: 2.39


## 3. Benchmark

---
## 4. Scratch Space

In [None]:
#start = df_grouped
result=start.flatMap(lambda v: (v[0], v[1])).map(lambda v: v<start_index).countByValue()

print sttrresult

local_max = not result.has_key(True)

print "Local Max: " + str(local_max) + " Smaller Index: " + str(result.has_key(True))

In [None]:
schema = StructType([
            StructField("source", IntegerType(), True),
            StructField("destination", IntegerType(), True)
        ])
df = sqlCtx.createDataFrame(data, schema)
df.explain()
schema_grouped = StructType([
            StructField("source", IntegerType(), True),
            StructField("destination", ArrayType(IntegerType()), True)
        ])
df_grouped = sqlCtx.createDataFrame(data_grouped, schema_grouped)

In [None]:
from pyspark.sql.functions import udf, lit
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType

t = udf(lambda s: str(s), StringType())
slen = udf(lambda s: Column(len(s)), IntegerType())

#df.groupBy("source").collect()
#df.groupBy("source").agg(df.source, t(df.source))

c = df.groupBy(df.source).agg(col("source"), slen(df.destination))

#c = df.agg(col("source"), t(df.destination).alias('counts'))
c.head(5)


#c = df.groupBy(df.source).lit(df.destination)


In [None]:
vertices = df.select(df["source"]).unionAll(df.select(df["destination"]))
vertices = di_source.distinct()

print "Number of vertices: %d"%(vertices.count())

# GraphLab

In [None]:
from graphlab import SGraph, SFrame
from graphlab import connected_components

PROBLEM={"small": "./data/mdanalysis/small/graph_edges_95_215.csv",
         "medium":"./data/mdanalysis/medium/graph_edges_24056_71826.csv"}

d =datetime.datetime.now()
RESULTSFILE = "results-" + d.strftime("%Y%m%d-%H%M%S") + ".csv"
REPEATS=5

start = time.time()
data = SFrame.read_csv(filename, header=False)
sg = SGraph().add_edges(data, src_field="X1", dst_field="X2")
end_read=time.time()
cc = connected_components.create(sg)
s=cc["component_size"]
end_connected = time.time()
print cc
print s