# aggregating by go

This is a text cell. Start editing!

In [1]:
import org.apache.spark._
import org.apache.spark.sql.types._
import scala.reflect.runtime.universe._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import group.research.aging.spark.extensions._
import group.research.aging.spark.extensions.functions._
import kernel.display.html

In [2]:
val whalePath = "/data/results/gray-whale/"
val expressionsPath = whalePath + "Expressions/"
val unirefPath = expressionsPath + "uniref90/"
val transcriptsPath = expressionsPath + "Transcripts/"
val codingPath = transcriptsPath + "coding/"

val comparisonsPath = expressionsPath + "Comparisons/"
val comparisonsUniref = comparisonsPath + "uniref90_comparisons/"
val annotationsPath = comparisonsPath + "annotations/"

In [3]:
def loadTranscripts(subpath: String, prefix: String) = {
    val path = if(subpath.startsWith("/")) subpath else transcriptsPath + subpath
    spark.readTSV(path, header=true).select($"Name".as("transcript"), $"NumReads".as(prefix + "_reads"), $"TPM".as(prefix + "_TPM")).cache 
}

Opening Uniref90 mapping file




In [16]:
val mapping_cols = Seq("UniProtKB-AC","UniProtKB-ID","Entrez","RefSeq","GI","PDB","GO",
"UniRef100","UniRef90","UniRef50","UniParc","PIR",
"NCBI-taxon","MIM","UniGene","PubMed",
"EMBL","EMBL-CDS","Ensembl","Ensembl_TRS","Ensembl_PRO","Additional PubMed")

In [17]:
val mapping = spark.readTSV("/data/indexes/uniprot/idmapping_selected.tab").toDF(mapping_cols:_*)
mapping.limit(20).show(20, 1000)

+------------+------------+-------+-----------+-------------------------------+----+----------------------------------+----------------+---------------+---------------+-------------+----+----------+----+-------+------------------+--------+----------+-------+-----------+-----------+-----------------+
|UniProtKB-AC|UniProtKB-ID| Entrez|     RefSeq|                             GI| PDB|                                GO|       UniRef100|       UniRef90|       UniRef50|      UniParc| PIR|NCBI-taxon| MIM|UniGene|            PubMed|    EMBL|  EMBL-CDS|Ensembl|Ensembl_TRS|Ensembl_PRO|Additional PubMed|
+------------+------------+-------+-----------+-------------------------------+----+----------------------------------+----------------+---------------+---------------+-------------+----+----------+----+-------+------------------+--------+----------+-------+-----------+-----------+-----------------+
|      Q6GZX4|  001R_FRG3G|2947773|YP_031579.1|             81941549; 49237298|null|             

### Processing Bat expressions




In [8]:
val quants_base = "/data/samples/de_novo/bat/quants/ours"
val bat_liver_1 = spark.readTSV(quants_base + "/quant_bat_liver_active_1/quant.sf", header = true)
val bat_kidney_1 = spark.readTSV(quants_base + "/quant_bat_kidney_active_1/quant.sf", header = true)
val bat_liver_2 = spark.readTSV(quants_base + "/quant_bat_liver_active_2/quant.sf", header = true)
val bat_kidney_2 = spark.readTSV(quants_base + "/quant_bat_kidney_active_2/quant.sf", header = true)
bat_liver_1.show(10,10000)

+-----------------------------------------+------+---------------+---------+--------+
|                                     Name|Length|EffectiveLength|      TPM|NumReads|
+-----------------------------------------+------+---------------+---------+--------+
| NODE_1_length_17988_cov_580.222903_g0_i0| 17988|      25996.137| 2.849032|2131.249|
|  NODE_2_length_17947_cov_75.308981_g1_i0| 17947|      23591.296| 5.768584|3916.057|
|  NODE_3_length_17149_cov_48.936455_g2_i0| 17149|      16480.623| 2.802971|1329.291|
| NODE_4_length_16953_cov_143.141987_g3_i0| 16953|      16648.804|11.308167|5417.553|
|  NODE_5_length_16927_cov_28.452499_g4_i0| 16927|      15432.613| 1.503753| 667.796|
|  NODE_6_length_16768_cov_80.455725_g5_i0| 16768|      22827.061| 0.206226| 135.463|
|  NODE_7_length_16714_cov_42.414072_g6_i0| 16714|       13824.43| 1.240009| 493.287|
|  NODE_8_length_16515_cov_85.093674_g5_i1| 16515|      22310.009| 0.971752| 623.855|
|  NODE_9_length_16383_cov_85.107160_g5_i2| 16383|    

In [12]:
val bat_mappings = spark.read.parquet("/data/results/gray-whale/best_bap_mappings.parquet")
bat_mappings.count

41829

In [13]:
import org.apache.spark.sql.functions._
val bat_unirefs = bat_mappings.select($"uniref90".as("UniRef90"),$"bat_liver_1",$"bat_kidney_1",$"bat_liver_2",$"bat_kidney_2")
.groupBy("UniRef90").agg(
    sum($"bat_liver_1").as("bat_liver_1"), 
    sum($"bat_kidney_1").as("bat_kidney_1"), 
    sum($"bat_liver_2").as("bat_liver_2"), 
    sum($"bat_kidney_2").as("bat_kidney_2")
    ).orderBy($"bat_liver_1".desc, $"bat_kidney_1".desc)
bat_unirefs.show(10,1000)

+----------------------+------------------+-----------------+------------------+------------------+
|              UniRef90|       bat_liver_1|     bat_kidney_1|       bat_liver_2|      bat_kidney_2|
+----------------------+------------------+-----------------+------------------+------------------+
|       UniRef90_G1PM73|19697.715009000003|         5.018056|19624.926098999997| 9.537035000000001|
|       UniRef90_S7PWY9|      19688.750414|         5.501245|       15423.90906|          8.739993|
|       UniRef90_S7NKH8|13626.893756000001|         6.533802|14379.784097999996|         11.232051|
|       UniRef90_L5LNJ6|      13496.735069|         0.464551|15308.018013000003|0.6780390000000001|
|       UniRef90_S7MRF2|       8993.596224|         5.323449|       8062.678561|          5.772969|
|UniRef90_UPI000CCC3436|       7970.125527|         4.011104|       8671.107356|          6.352832|
|       UniRef90_S7MHE7|       6585.148337|         1.531405|       5983.654563|          2.328961|


In [9]:
import group.research.aging.spark.extensions.functions.ConcatenateString
val con = new ConcatenateString(";")
val partial_bat_uniref_mappings = mapping.select("UniRef90", "GO").join(bat_unirefs.select("UniRef90"), "UniRef90").distinct.na.fill("")
.groupBy($"UniRef90").agg(con($"GO").as("GO")).cache
partial_bat_uniref_mappings.show(10,1000)

+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|           UniRef90|                                                                                                                                                                                                                                                                                                                                                                    GO|
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
(partial_bat_uniref_mappings.count, partial_bat_uniref_mappings.select("UniRef90").distinct.count, bat_unirefs.count,bat_unirefs.select("UniRef90").distinct.count)

(18459,18459,25503,25503)

Aggregating GOes by Uniref90

In [20]:
def flatGo = udf[Array[String], String]{ str => str.trim.replaceAll("^;", "".replaceAll(" +", "")).split(";").distinct}
val partial_bat_go_mappings = partial_bat_uniref_mappings
.filter($"GO" =!=";")
.withColumn("GO", flatGo($"GO"))
.join(bat_unirefs, Seq("Uniref90"))
.as[(String, scala.collection.mutable.ArrayBuffer[String], Double, Double, Double, Double)]
partial_bat_go_mappings.show(10,1000)

+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------------+------------------+------------------+
|           UniRef90|                                                                                                                                                                                                                                                                                                                     GO|bat_liver_1|      bat_kidney_1|       bat_liver_2|      bat_kidney_2|
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
val bat_go_mappings = partial_bat_go_mappings
.flatMap{ case (a, arr, l1, k1, l2, k2) => arr.toList.filter(f=>f.trim!="").distinct.map(aa=> (aa.trim, l1, k1, l2, k2 ))}
.toDF("GO", "bat_liver_1", "bat_kidney_1", "bat_liver_2", "bat_kidney_2")
.groupBy($"GO")
.agg(
    sum($"bat_liver_1").as("bat_liver_1"), 
    sum($"bat_kidney_1").as("bat_kidney_1"), 
    sum($"bat_liver_2").as("bat_liver_2"), 
    sum($"bat_kidney_2").as("bat_kidney_2")
    )
.orderBy($"bat_liver_1".desc,$"bat_kidney_1".desc)
.withColumnRenamed("GO", "go")
bat_go_mappings.show(10,100)

+----------+------------------+------------------+------------------+------------------+
|        go|       bat_liver_1|      bat_kidney_1|       bat_liver_2|      bat_kidney_2|
+----------+------------------+------------------+------------------+------------------+
|GO:0005615|104813.53711199995| 9633.763691999999|105668.70030599993| 9218.175467000005|
|GO:0005737|      45889.810949| 39124.75226900001| 43063.92459099999| 37351.04243300002|
|GO:0016021| 40258.52125200002| 49371.60719699998| 33799.39783599998| 50314.59441799999|
|GO:0005829|      39701.526823| 48503.28607600002| 35938.73908299998|48467.252029999996|
|GO:0005634|      28991.795084|38791.502254000006|26034.121111999997|36985.395011999986|
|GO:0016020|28193.223352999994|21701.492759999997|28454.926921999995|21109.941520000015|
|GO:0005576|26396.602805999995| 7023.994042999998|      25132.769251| 6699.110292999995|
|GO:0005524|26232.830242999997| 23228.16708199999|22413.050668999993| 22506.39408900001|
|GO:0004869|17508.984

In [23]:
//previos (13694,13694,13694)
(bat_go_mappings.count, bat_go_mappings.select("go").count, bat_go_mappings.select("go").distinct.count)

(12981,12981,12981)

In [21]:
bat_go_mappings.select("go").groupBy("go").agg(count($"go")).orderBy($"count(go)".desc).show(100)

+----------+---------+
|        go|count(go)|
+----------+---------+
|GO:0019900|        1|
|GO:0004087|        1|
|GO:0005827|        1|
|GO:0045738|        1|
|GO:0050890|        1|
|GO:0051571|        1|
|GO:0032689|        1|
|GO:0031491|        1|
|GO:0010390|        1|
|GO:0034012|        1|
|GO:0072053|        1|
|GO:0042535|        1|
|GO:1990314|        1|
|GO:0045639|        1|
|GO:0098779|        1|
|GO:0090240|        1|
|GO:1903531|        1|
|GO:0015293|        1|
|GO:0060702|        1|
|GO:0033186|        1|
|GO:0046512|        1|
|GO:0045910|        1|
|GO:1990456|        1|
|GO:1902260|        1|
|GO:2001106|        1|
|GO:0032049|        1|
|GO:0005372|        1|
|GO:1990726|        1|
|GO:0004723|        1|
|GO:0017075|        1|
|GO:0070384|        1|
|GO:0042053|        1|
|GO:0008297|        1|
|GO:0006501|        1|
|GO:0009032|        1|
|GO:0070724|        1|
|GO:0021692|        1|
|GO:0003165|        1|
|GO:0003976|        1|
|GO:0002025|        1|
|GO:0031085