Skip to content

Commit

Permalink
Added AAT
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrszul committed Dec 23, 2015
1 parent f52c7c2 commit 45b8aa1
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
38 changes: 38 additions & 0 deletions py-tca-rf.py
@@ -0,0 +1,38 @@
import csv
import numpy as np
import scipy.sparse
import sklearn.ensemble

rows = np.array([],dtype=int)
cols = np.array([],dtype=int)
vals = np.array([],dtype=int)

llabels = []

header = None
irow = 0
icol = 0
with open('data/tcga_csv_1k.csv') as f:
reader = csv.reader(f)
for row in reader:
if header:
llabels.append(int(float(row[1])))
data = map(lambda s:int(float(s)),row[2:])
vals = np.append(vals, filter(lambda v: v != 0, data))
rows = np.append(rows, map(lambda v:irow,filter(lambda v: v != 0, data)))
cols = np.append(cols, map(lambda (i,v):i, filter(lambda (i,v): v !=0, enumerate(data))))
irow += 1
else:
header = row
icol = len(row) - 2

mtx = scipy.sparse.csr_matrix((vals,(rows,cols)), shape=(irow,icol))
print(mtx)
labels = np.array(llabels,dtype=int)
print(labels)

rf = sklearn.ensemble.RandomForestClassifier(n_estimators=500, oob_score=True,verbose=1, n_jobs=4)
rf.fit(mtx.toarray(),labels)
print(rf.oob_score_)
print(rf.feature_importances_)

17 changes: 17 additions & 0 deletions py-test-sparse.py
@@ -0,0 +1,17 @@
import numpy as np
import scipy.sparse


row = np.array([0, 0, 1, 2, 2, 2])
col = np.array([0, 2, 2, 0, 1, 2])
data = np.array([1, 2, 3, 4, 5, 6])
mtx = scipy.sparse.csr_matrix((data, (row, col)), shape=(3, 3))


print(row)
print(len(row))
print(col)
print(len(col))
print(data)
print(len(data))
print(mtx)
@@ -0,0 +1,65 @@
package au.csiro.obr17q.variantspark

import au.csiro.obr17q.variantspark.CommonFunctions._
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vector
import scala.io.Source
import au.csiro.obr17q.variantspark.algo.WideKMeans
import au.csiro.pbdava.sparkle.LoanUtils
import com.github.tototoshi.csv.CSVReader
import java.io.File
import java.io.FileReader
import com.github.tototoshi.csv.CSVWriter
import breeze.linalg.DenseVector
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.SparseVector
import breeze.linalg.DenseMatrix

object SparseWideAAT extends SparkApp {
conf.setAppName("VCF cluster")

def aggTTA(colNo:Int)(m:DenseMatrix[Int], v:Vector):DenseMatrix[Int] = {
val sv = v.asInstanceOf[SparseVector]
for (rowIdx <- 0 until sv.indices.length; colIdx <- 0 until sv.indices.length) {
val col = sv.indices(rowIdx)
val row = sv.indices(colIdx)
m(row,col)+=sv.values(rowIdx).toInt * sv.values(colIdx).toInt
}
m
}

def main(args:Array[String]) {


if (args.length < 1) {
println("Usage: CsvClusterer <input-path>")
}

val inputFiles = args(0)
val output = args(1)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sparseVariat = sqlContext.parquetFile(inputFiles)
println(sparseVariat.schema)


val data:RDD[Vector] =
sparseVariat.rdd
.map(r=> Vectors.sparse(r.getInt(1),
r.getSeq[Int](2).toArray, r.getSeq[Double](3).toArray))
val test = data.cache().count()
println(test)

val colNo = data.first().size
println(s"Col no: ${colNo}")
// essentially we need to create a sparse vector
val result = data.aggregate(DenseMatrix.zeros[Int](colNo,colNo))(SparseWideAAT.aggTTA(colNo), (v1,v2) => {v1+=v2})
LoanUtils.withCloseable(CSVWriter.open(output)) { cswWriter =>
for (row <- 0 until colNo ) {
cswWriter.writeRow(result(row,::).inner.toArray)
}
}
}
}

0 comments on commit 45b8aa1

Please sign in to comment.