In [ ]:
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContextExtensions._
import scala.reflect.runtime.universe._
import comp.bio.aging.playground.extras.uniprot._
import org.apache.spark.storage.StorageLevel

import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContextExtensions._
import scala.reflect.runtime.universe._
import comp.bio.aging.playground.extras.uniprot._
import org.apache.spark.storage.StorageLevel


In [ ]:
def sparkHadoopConf(sc: SparkContext, acountName: String, accountKey: String) = {
  sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
  sc.hadoopConfiguration.set("fs.azure.account.key." + acountName + ".blob.core.windows.net", accountKey)
  sc
}

sparkHadoopConf: (sc: org.apache.spark.SparkContext, acountName: String, accountKey: String)org.apache.spark.SparkContext


In [ ]:
def azurize(container: String, accountName: String, blobFile: String): String = "wasbs://"+container+"@"+accountName+".blob.core.windows.net/"+blobFile 

def writeText2Azure[T]( rdd: RDD[T], container: String, accountName: String, blobFile: String ): String =
{
  val url = azurize(container, accountName, blobFile)
  rdd.saveAsTextFile(url)
  url
}

def writeTsv2Azure( df: DataFrame, container: String, accountName: String, blobFile: String ): String =
{
  val url = azurize(container, accountName, blobFile)
  df.write.option("sep","\t").option("header","true").csv(url)
  url
}

azurize: (container: String, accountName: String, blobFile: String)String
writeText2Azure: [T](rdd: org.apache.spark.rdd.RDD[T], container: String, accountName: String, blobFile: String)String
writeTsv2Azure: (df: org.apache.spark.sql.DataFrame, container: String, accountName: String, blobFile: String)String


In [ ]:
val connString = "DefaultEndpointsProtocol=https;AccountName=pipelines1;AccountKey=;EndpointSuffix=core.windows.net"
val account = "pipelines1"
val key = ""
def az(path: String): String = azurize("storage", account, path)

connString: String = DefaultEndpointsProtocol=https;AccountName=pipelines1;AccountKey=;EndpointSuffix=core.windows.net
account: String = pipelines1
key: String = 
az: (path: String)String


In [ ]:
sparkHadoopConf(sparkContext, account, key)
  
val spark = SparkSession
  .builder()
  .appName("mapping_models")
  .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@70287b35


In [ ]:
import org.apache.spark.sql.functions._
import spark.implicits._

val toDouble = udf[Double, String]( _.toDouble)

import org.apache.spark.sql.functions._
import spark.implicits._
toDouble: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))


In [ ]:
//val base = "/batch/quant"
//val base = "/storage/expressions/models"
val base = "/samples"

base: String = /samples


In [ ]:
//val human_liver_path = az(base + "/Homo sapiens_totalRNA_GSM1698568_quant.sf")
//val human_kidney_path = az(base + "/Homo sapiens_totalRNA_GSM1698570_quant.sf")
def quant(gse: String, gsm: String) = az(base + s"/${gse}/${gsm}/transcripts_quant/quant.sf")

val human_liver_path = quant("GSE69360", "GSM1698568")
val human_kidney_path = quant("GSE69360", "GSM1698570")

val mouse_liver_totalRNA_path  = quant("GSE58089", "GSM1400574")
val mouse_kidney_totalRNA_path = quant("GSE83144", "GSM2195188")
val mouse_liver_mRNA_path  = quant("GSE108990", "GSM2927683")
val mouse_kidney_mRNA_path = quant("GSE108990", "GSM2927750")

val cow_liver_mRNA_path = quant("GSE77020", "GSM2042593")
val cow_kidney_mRNA_path = quant("GSE77020", "GSM2042596")
val cow_liver_totalRNA_path = quant("GSE41637","GSM1020724")
val cow_kidney_totalRNA_path = quant("GSE41637","GSM1020723")



quant: (gse: String, gsm: String)String
human_liver_path: String = wasbs://storage@pipelines1.blob.core.windows.net//samples/GSE69360/GSM1698568/transcripts_quant/quant.sf
human_kidney_path: String = wasbs://storage@pipelines1.blob.core.windows.net//samples/GSE69360/GSM1698570/transcripts_quant/quant.sf
mouse_liver_totalRNA_path: String = wasbs://storage@pipelines1.blob.core.windows.net//samples/GSE58089/GSM1400574/transcripts_quant/quant.sf
mouse_kidney_totalRNA_path: String = wasbs://storage@pipelines1.blob.core.windows.net//samples/GSE83144/GSM2195188/transcripts_quant/quant.sf
mouse_liver_mRNA_path: String = wasbs://storage@pipelines1.blob.core.windows.net//samples/GSE108990/GSM2927683/transcripts_quant/quant.sf
mouse_kidney_mRNA_path: String = wasbs://storage@pipelines1.blob.core.w...

In [ ]:
def load_liver(path: String) = spark.readTSV(path, true).withColumn("liver", toDouble($"TPM")).drop("TPM").withColumnRenamed("Name","transcript")
def load_kidney(path: String) = spark.readTSV(path, true).withColumn("kidney", toDouble($"TPM")).drop("TPM").withColumnRenamed("Name","transcript")
def join_liver_kidney(liver: DataFrame, kidney: DataFrame): DataFrame =
    liver.join(kidney.withColumnRenamed("transcript", "kidney_transcript"), $"transcript" === $"kidney_transcript")
    .select($"transcript", $"liver", $"kidney", ($"liver" + $"kidney").as("avg_expression"))    
    .sort($"avg_expression".desc)

load_liver: (path: String)org.apache.spark.sql.DataFrame
load_kidney: (path: String)org.apache.spark.sql.DataFrame
join_liver_kidney: (liver: org.apache.spark.sql.DataFrame, kidney: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [ ]:
val human = join_liver_kidney(load_liver(human_liver_path), load_kidney(human_kidney_path))
val mouse_totalRNA = join_liver_kidney(load_liver(mouse_liver_totalRNA_path), load_kidney(mouse_kidney_totalRNA_path))
val mouse_mRNA = join_liver_kidney(load_liver(mouse_liver_mRNA_path), load_kidney(mouse_kidney_mRNA_path))
val cow_totalRNA = join_liver_kidney(load_liver(cow_liver_totalRNA_path), load_kidney(cow_kidney_totalRNA_path))
val cow_mRNA = join_liver_kidney(load_liver(cow_liver_mRNA_path), load_kidney(cow_kidney_mRNA_path))

human: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_totalRNA: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_mRNA: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
cow_totalRNA: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
cow_mRNA: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]


In [ ]:
/*
def simplify(dataFrame: DataFrame): Dataset[(String, Double, Double, Double)] = dataFrame.map{ row=>
  val tran = row.getAs[String]("transcript")
  val tr = tran.substring(0,  Math.min(tran.indexOf('|'), tran.length))
  (tr.substring(0, Math.min(tr.indexOf('.'),tr.length)),	row.getAs[Double]("liver"), row.getAs[Double]("kidney"), row.getAs[Double]("avg_expression"))
}
*/

def simplify(dataFrame: DataFrame): Dataset[(String, Double, Double, Double)] = dataFrame.map{ row=>
  val tran = row.getAs[String]("transcript")
  val i = tran.indexOf('|')
  val tr = if(i> 0) tran.substring(0, tran.indexOf('|')) else tran
  val transcript =  tr.substring(0, Math.min(tr.indexOf('.'),tr.length))
  (transcript,	(row.getAs[Double]("liver"), row.getAs[Double]("kidney"), row.getAs[Double]("avg_expression")))
}.rdd.reduceByKey{ case ((a1, b1, c1), (a2, b2,c2)) => (a1+a2, b1+b2, c1 + c2)}
    .map{ case (a, (b ,c, d)) => (a, b ,c ,d)}
  .toDS()

simplify: (dataFrame: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[(String, Double, Double, Double)]
human_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_totalRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_mRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]


In [ ]:
val human_simple = simplify(human).toDF("transcript", "liver", "kidney", "avg_expression")
val mouse_totalRNA_simple = simplify(mouse_totalRNA).toDF("transcript", "liver", "kidney", "avg_expression")
val mouse_mRNA_simple = simplify(mouse_mRNA).toDF("transcript", "liver", "kidney", "avg_expression")
val cow_totalRNA_simple = simplify(cow_totalRNA).toDF("transcript", "liver", "kidney", "avg_expression")
val cow_mRNA_simple = simplify(cow_mRNA).toDF("transcript", "liver", "kidney", "avg_expression")

human_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_totalRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
mouse_mRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
cow_totalRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]
cow_mRNA_simple: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]


In [ ]:
val headers = List("uniprot_ac", "uniprot_id", "entrez", "refSeq", "gi", "pdb", "go", 
  "uniref100", "uniref90", "uniref50", "uniparc", "pir", 
  "taxon", "mim", "unigene", "pubmed", "embl", "embl_cds", 
  "ensembl", /*"ensembl_trs"*/ "transcript", "ensembl_pro", "additional_pubmed") 

headers: List[String] = List(uniprot_ac, uniprot_id, entrez, refSeq, gi, pdb, go, uniref100, uniref90, uniref50, uniparc, pir, taxon, mim, unigene, pubmed, embl, embl_cds, ensembl, transcript, ensembl_pro, additional_pubmed)


In [ ]:
val columns = List("transcript", "uniref90", "go", "liver","kidney", "avg_expression", "uniprot_ac", "taxon", "uniprot_id", "entrez", "refSeq", "gi", "uniparc", "pubmed", "embl")

def joinMapping(mapping: Dataset[UniprotMapping], df: DataFrame): DataFrame = {
  val mdf = mapping.flatMap{ u=> if(u.ensembl_trs == null) Nil else  u.ensembl_trs.split(';').map(trs=>u.copy(ensembl_trs = trs)) }.toDF(headers:_*)
  mdf.join(df, "transcript").select(columns.head, columns.tail:_*)
}

columns: List[String] = List(transcript, uniref90, go, liver, kidney, avg_expression, uniprot_ac, taxon, uniprot_id, entrez, refSeq, gi, uniparc, pubmed, embl)
joinMapping: (mapping: org.apache.spark.sql.Dataset[comp.bio.aging.playground.extras.uniprot.UniprotMapping], df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [ ]:
val human_tax_id = "9606"
val mouse_tax_id = "10090"
val cow_tax_id = "9913"

val ind = az("/indexes/uniprot")
val human_mapping = spark.readTypedTSV[UniprotMapping](ind + "/HUMAN_9606_idmapping_selected.tab")
val mouse_mapping = spark.readTypedTSV[UniprotMapping](ind + "/MOUSE_10090_idmapping_selected.tab")
val cow_mapping = spark.readTypedTSV[UniprotMapping](ind + s"/COW_${cow_tax_id}_idmapping_selected.tab")
val all_mapping =  spark.readTypedTSV[UniprotMapping]("file:///pipelines/indexes/uniprot/idmapping_selected.tab")

human_tax_id: String = 9606
mouse_tax_id: String = 10090
cow_tax_id: String = 9913
ind: String = wasbs://storage@pipelines1.blob.core.windows.net//indexes/uniprot
human_mapping: org.apache.spark.sql.Dataset[comp.bio.aging.playground.extras.uniprot.UniprotMapping] = [uniprot_ac: string, uniprot_id: string ... 20 more fields]
mouse_mapping: org.apache.spark.sql.Dataset[comp.bio.aging.playground.extras.uniprot.UniprotMapping] = [uniprot_ac: string, uniprot_id: string ... 20 more fields]
cow_mapping: org.apache.spark.sql.Dataset[comp.bio.aging.playground.extras.uniprot.UniprotMapping] = [uniprot_ac: string, uniprot_id: string ... 20 more fields]
all_mapping: org.apache.spark.sql.Dataset[comp.bio.aging.playground.extras.uniprot.UniprotMapping] = [uniprot_ac: string, uniprot_id: string ... 20...

In [ ]:
(human_mapping.count, mouse_mapping.count, cow_mapping.count)

res30: (Long, Long, Long) = (162191,83601,32206)


In [ ]:
import org.bdgenomics.adam.rdd.ADAMContextExtensions._
import comp.bio.aging.playground.extras.uniprot._
def writeTSV(dataFrame: DataFrame, path: String, header: Boolean = true, sep: String = "\t"): Unit =
      dataFrame.write.option("sep", sep).option("header",header).csv(path)
//val cow_mapping = all_mapping.filter(u=>u.taxon == cow_tax_id)
//writeTSV(cow_mapping.toDF().coalesce(1), ind + s"/COW_${cow_tax_id}_idmapping_selected.tab")

import org.bdgenomics.adam.rdd.ADAMContextExtensions._
import comp.bio.aging.playground.extras.uniprot._
writeTSV: (dataFrame: org.apache.spark.sql.DataFrame, path: String, header: Boolean, sep: String)Unit


In [ ]:
val humanExpressions = joinMapping(human_mapping, human_simple)
humanExpressions.count

humanExpressions: org.apache.spark.sql.DataFrame = [transcript: string, uniref90: string ... 13 more fields]
res37: Long = 68560


In [ ]:
val mouse_totalRNA_Expressions = joinMapping(mouse_mapping, mouse_totalRNA_simple)
val mouse_mRNA_Expressions = joinMapping(mouse_mapping, mouse_mRNA_simple)
(mouse_totalRNA_Expressions.count , mouse_mRNA_Expressions.count)

mouse_totalRNA_Expressions: org.apache.spark.sql.DataFrame = [transcript: string, uniref90: string ... 13 more fields]
mouse_mRNA_Expressions: org.apache.spark.sql.DataFrame = [transcript: string, uniref90: string ... 13 more fields]
res39: (Long, Long) = (50956,50956)


In [ ]:
val cow_totalRNA_Expressions = joinMapping(cow_mapping, cow_totalRNA_simple)
val cow_mRNA_Expressions = joinMapping(cow_mapping, cow_mRNA_simple)
(cow_totalRNA_Expressions.count, cow_mRNA_Expressions.count)


cow_totalRNA_Expressions: org.apache.spark.sql.DataFrame = [transcript: string, uniref90: string ... 13 more fields]
cow_mRNA_Expressions: org.apache.spark.sql.DataFrame = [transcript: string, uniref90: string ... 13 more fields]
res91: (Long, Long) = (21857,21857)


In [ ]:
human_simple

res31: org.apache.spark.sql.DataFrame = [transcript: string, liver: double ... 2 more fields]


In [ ]:
writeTSV(humanExpressions.coalesce(1), az("/expressions/transcripts/" + "human"+"_transcripts_all.tab"), true)
writeTSV(mouse_totalRNA_Expressions.coalesce(1), az("/expressions/transcripts/" + "mouse"+"_totalRNA"+"_transcripts_all.tab"), true)
writeTSV(mouse_mRNA_Expressions.coalesce(1), az("/expressions/transcripts/" + "mouse"+"_mRNA"+"_transcripts_all.tab"), true)
writeTSV(cow_totalRNA_Expressions.coalesce(1), az("/expressions/transcripts/" + "cow"+"_totalRNA"+"_transcripts_all.tab"), true)
writeTSV(cow_mRNA_Expressions.coalesce(1), az("/expressions/transcripts/" + "cow"+"_mRNA"+"_transcripts_all.tab"), true)

In [ ]:
//val go_inner = spark.readTSV(az("/expressions/go/gray_whale_with_bowhead_with_minke_with_NMR_small_full_inner.tsv"), true)
val go_outer_counts = spark.readTSV(az("/expressions/go/gray_whale_with_bowhead_with_minke_with_NMR_full_outer_counts.tsv"), true)

go_outer_counts: org.apache.spark.sql.DataFrame = [go: string, label: string ... 13 more fields]


In [ ]:
def byGo(df: DataFrame, species: String) =
  df.select("go", "liver", "kidney").flatMap{row=>
    val g = row.getAs[String]("go")
    val goes = if(g==null) Array[String]() else g.split(";").map(go=>go.trim).filter(go=>go!="")
    goes.map{ go =>
      go -> (1, row.getAs[Double]("liver"), row.getAs[Double]("kidney"))
    }
  }.rdd.reduceByKey{ case ((i1, l1, k1), (i2, l2, k2)) => (i1 + i2, l1 + l2, k1 + k2)}
    .map{ case (go, (i, liver, kidney)) => (go, i,  liver, kidney)}
    .toDF("go", s"${species}_transcripts_count", s"${species}_liver", s"${species}_kidney")

byGo: (df: org.apache.spark.sql.DataFrame, species: String)org.apache.spark.sql.DataFrame


In [ ]:
val goHuman = byGo(humanExpressions, "human")
val goMouse_totalRNA = byGo(mouse_totalRNA_Expressions, "mouse_totalRNA")
.withColumnRenamed("mouse_totalRNA_transcripts_count", "mouse_transcripts_count")
val goMouse_mRNA = byGo(mouse_mRNA_Expressions, "mouse_mRNA").drop("mouse_mRNA_transcripts_count")
val goCow_totalRNA = byGo(cow_totalRNA_Expressions, "cow_totalRNA")
 .withColumnRenamed("cow_totalRNA_transcripts_count", "cow_transcripts_count")
val goCow_mRNA = byGo(cow_mRNA_Expressions, "cow_mRNA").drop("cow_mRNA_transcripts_count")


goHuman: org.apache.spark.sql.DataFrame = [go: string, human_transcripts_count: int ... 2 more fields]
goMouse_totalRNA: org.apache.spark.sql.DataFrame = [go: string, mouse_transcripts_count: int ... 2 more fields]
goMouse_mRNA: org.apache.spark.sql.DataFrame = [go: string, mouse_mRNA_liver: double ... 1 more field]
goCow_totalRNA: org.apache.spark.sql.DataFrame = [go: string, cow_transcripts_count: int ... 2 more fields]
goCow_mRNA: org.apache.spark.sql.DataFrame = [go: string, cow_mRNA_liver: double ... 1 more field]


In [ ]:
//val go_inner_ext = go_inner.join(goHuman, Seq("go"), "inner").join(goMouse, Seq("go"), "inner").sort($"gray_whale_avg_expression".desc)
val toDouble = udf[Double, String]( v=> if(v=="" || v==null) 0.0 else v.toDouble)

val go_outer_ext = go_outer_counts.na.fill("")
  .join(goHuman, Seq("go"), "full_outer")
  .join(goMouse_totalRNA, Seq("go"), "full_outer")
  .join(goMouse_mRNA, Seq("go"), "full_outer")
  .join(goCow_totalRNA, Seq("go"), "full_outer")
  .join(goCow_mRNA, Seq("go"), "full_outer")
  .withColumn("gray_whale_average", toDouble($"gray_whale_avg"))
  .drop("gray_whale_avg")
  .sort($"gray_whale_average".desc)

toDouble: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))
go_outer_ext: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [go: string, label: string ... 26 more fields]


In [ ]:
val go_outer_ext_counts = go_outer_ext.select("go", "label", "gray_whale_average", 
"uniref90_count","taxons_count",  "transcripts_count", "human_transcripts_count", "mouse_transcripts_count", "cow_transcripts_count", "type", 
"gray_whale_liver", "bowhead_whale_liver", "minke_liver", "NMR_liver", "human_liver", 
                                              "mouse_totalRNA_liver", "mouse_mRNA_liver", "cow_totalRNA_liver", "cow_mRNA_liver",
"gray_whale_kidney", "bowhead_whale_kidney", "minke_kidney", "NMR_kidney", "human_kidney",  
                                              "mouse_totalRNA_kidney", "mouse_mRNA_kidney",  "cow_totalRNA_kidney", "cow_mRNA_kidney")

go_outer_ext_counts: org.apache.spark.sql.DataFrame = [go: string, label: string ... 26 more fields]


In [ ]:
go_outer_ext_counts

res170: org.apache.spark.sql.DataFrame = [go: string, label: string ... 26 more fields]


In [ ]:
//writeTSV(go_inner_ext.coalesce(1), az("/expressions/go/gray_whale_with_bowhead_with_minke_with_NMR_with_human_with_mouse_small_full_inner.tsv"), true)
writeTSV(go_outer_ext_counts.coalesce(1), 
         az("/expressions/go/gray_whale_with_bowhead_with_minke_with_NMR_with_human_with_mouse_with_cow_full_outer_counts.tsv"), true)
