In [12]:
import re
import os
import numpy as np
from numpy import array
from math import sqrt, log
from seaborn import color_palette
from lightning import Lightning
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.mllib.clustering import KMeans, KMeansModel

conf = SparkConf()
conf.setAppName("Mgf Spark KNN").set("spark.executor.memory", "6g")
# Stop any previous SC
try:
    sc.stop()
except NameError:
    print "SparkContext not defined"

sc = SparkContext(conf=conf)
sc

<pyspark.context.SparkContext at 0x520c9d0>

In [13]:
host='http://daas:daastesting1234@hwlogin.labs.uninett.no:3000'
lgn = Lightning(ipython=True, host=host)
lgn.create_session('mgfspark_knn')

Session number: 46, name: mgfspark_knn

In [14]:
trainPath='hdfs://daas/promec/mgf/train'
testPath='hdfs://daas/promec/mgf/test'
partitions=45
iterations = 1
runs = 1
knnmode='random'

In [22]:
_comments = '#;!/'

def parseMGF(m_data):
    data = m_data.split('\n')
    reading_spectrum = False
    params = {}
    masses = []
    intensities = []
    charges = []
    out = {}
    cnt = 0
    fname = None
    pep_mass = 0
    pep_intensity = 0
    for line in data:
        if not reading_spectrum:
            if line.strip() == 'BEGIN IONS':
                reading_spectrum = True
                # otherwise we are not interested; do nothing, just move along
        else:
            if not line.strip() or any(
                    line.startswith(c) for c in _comments):
                pass
            elif line.strip() == 'END IONS':
                reading_spectrum = False
                if fname is None:
                    title = params['title'].split()[0]
                    fname = title.split('.')[0]
                    out[fname] = {}
                if 'pepmass' in params:
                    try:
                        pl = params['pepmass'].split()
                        if len(pl) > 1:
                            pep_mass = float(pl[0])
                            pep_intensity = float(pl[1])
                        elif len(pl) == 1:
                            pep_mass = float(pl[0])
                    except ValueError:
                        print "Error in parsing pepmass value"
                out[fname][cnt] = {'pep_mass': pep_mass,
                                   'pep_intensity': pep_intensity,
                                   'rtinseconds': params['rtinseconds'],
                                   'title': params['title'],
                                   'mz_array': np.array(masses),
                                   'intensity_array': np.array(intensities)}
                
                pep_mass = 0
                pep_intensity = 0
                params = {}
                masses = []
                intensities = []
                charges = []
                cnt += 1
            else:
                l = line.split('=', 1)
                if len(l) > 1:  # spectrum-specific parameters!
                    params[l[0].lower()] = l[1].strip()
                elif len(l) == 1:  # this must be a peak list
                    l = line.split()
                    if len(l) >= 2:
                        try:
                            masses.append(float(l[0]))  # this may cause
                            intensities.append(float(l[1]))  # exceptions...
                            # charges.append(aux._parse_charge(l[2]) if len(l) > 2 else 0)
                        except ValueError:
                            print "Error in parsing line "+line
        
    return out


def process_data(raw_data):
    mgf_data = raw_data.map(lambda x: parseMGF(x[1]))
    return mgf_data

def get_parsedata(mgf, key):
    m =mgf.values()[0]
    return map(lambda c: m[c][key], range(len(m)))


In [16]:
def knn_data(path):
    raw_data = sc.wholeTextFiles(path, use_unicode=False)
    mgf_data = process_data(raw_data).repartition(partitions).cache()
    parseMass = mgf_data.flatMap(lambda x: get_parsedata(x, 'pep_mass'))
    parseIntensity = mgf_data.flatMap(lambda x: get_parsedata(x, 'pep_intensity'))
    return parseMass.zip(parseIntensity).cache()
    # massMean = parseMass.mean()
    # massStd = parseMass.stdev()
    # intensityMean = parseIntensity.mean()
    # intensityStd = parseIntensity.stdev()
    # scaledMass = parseMass.map(lambda x: (x-massMean)/massStd)
    # scaledIntensity = parseIntensity.map(lambda x: (x-intensityMean)/intensityStd)
    # scaledData = scaledMass.zip(scaledIntensity).cache()
    # sampleMass = scaledMass.take(100)
    # sampleIntensity = scaledIntensity.take(100)


trainData = knn_data(trainPath)

In [17]:
def error(clusters,point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

def wssse_kmeans(data, k):
    clusters = KMeans.train(data, k, maxIterations=iterations, runs=runs, initializationMode=knnmode)
    return data.map(lambda point: error(clusters,point)).reduce(lambda x, y: x + y)

In [21]:
wssse = {}
k = 1
cluster_k = range(1, k+1)
#wssse = map(lambda k: wssse_kmeans(sampleData, k, iterations, runs), cluster_k)
for k in cluster_k:
    wssse[k] = wssse_kmeans(trainData, k, iterations, runs)

TypeError: wssse_kmeans() takes exactly 2 arguments (4 given)

In [20]:
lgn.line(wssse)

In [None]:
model = KMeans.train(trainData, 23, maxIterations=iterations, runs=runs, initializationMode=knnmode)

In [None]:
def test_knn(model):
    testData = knn_data(testPath)
    predict = testData.map(lambda x: model.predict(x))
    return predict.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)

testedMgf = test_knn(model).collect()

In [None]:
testedMgf[0][1]

In [None]:
#lgn.scatter(clusters.clusterCenters)
r = range(len(model.clusterCenters))
mass_cluster = map(lambda l: model.clusterCenters[l][0], r)
intensity_cluster = map(lambda l: model.clusterCenters[l][1], r)
sizeC = map(lambda l: float(testedMgf[l][1]), r)
maxS = float(max(sizeC))
scaledSizeC = map(lambda l: (sizeC[l]/maxS)*150, r)
scaledSizeC
#clusters.clusterCenters

In [19]:
lgn.scatter(mass_cluster, intensity_cluster, size=scaledSizeC)

NameError: name 'mass_cluster' is not defined