# Ensembl downloader



<div><b>Getting species data from Ensembl</b><br></div>

In [2]:
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import org.apache.spark.sql.types.StructType
import scala.reflect.runtime.universe._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd._
import org.apache.spark.sql.functions._

In [3]:
import org.apache.spark.sql.expressions._
import group.research.aging.spark.extensions._
import group.research.aging.spark.extensions.functions._

In [4]:
val species = spark.readTSV("/data/ensembl/99/species_EnsemblVertebrates.txt",comment="", header=true)
.withColumnRenamed("#name", "name")
species.show()

+--------------------+--------------------+------------------+-----------+--------------------+------------------+--------------------+---------+-----------+---------------+-----------------+----------------+--------------------+----------+
|                name|             species|          division|taxonomy_id|            assembly|assembly_accession|           genebuild|variation|pan_compara|peptide_compara|genome_alignments|other_alignments|             core_db|species_id|
+--------------------+--------------------+------------------+-----------+--------------------+------------------+--------------------+---------+-----------+---------------+-----------------+----------------+--------------------+----------+
|       Spiny chromis|acanthochromis_po...|EnsemblVertebrates|      80966|         ASM210954v1|   GCA_002109545.1|2018-05-Ensembl/2...|        N|          N|              Y|                Y|               Y|acanthochromis_po...|         1|
|Eurasian sparrowhawk|     accipiter

**Preparing the download**<br>




In [6]:
def prefix(prefix: String, sufix: String) =  udf[String, String]{ str=> prefix + str + sufix}
//val b = prefix("wget -t 4 -m -nH --cut-dirs=100 ftp://ftp.ensembl.org/pub/release-96/gtf/", "/*")

def command(path: String, ftp: String) = udf[String, String]{ species => {
    val dir = path + "/" + species
    val wget = "wget -t 4 -m -nH --cut-dirs=100 -P " + dir + " "
    val and = " && "
    "mkdir -p " + dir + and + wget + ftp + "/gtf/" + species + "/*" + and + wget + ftp + "/fasta/" + species + "/*" + and + "gunzip -f " + dir + "/*.gz" 
    
}
}
val download = command("/data/ensembl/99/species", "ftp://ftp.ensembl.org/pub/release-99")


In [7]:
class Fixer{
    import ammonite.ops._
   def no_dot(assembly: String) = { assembly.lastIndexOf(".") match { case -1 => assembly ; case i => assembly.substring(0, i) } }
 
    //def assembly2(prefix: String) = udf[String, String, String]{ (one, two) => prefix + one + "/" + two }
    def fix(str: String, assembly: String) = {
         if(exists! Path(str)) str else {
             val str2 = str.replace(assembly, no_dot(assembly))
             if(exists! Path(str2)) str2 else {
                 if(str.contains(" "))
                    str.replace(" ", "_")
                 else str
             }
         }
    }
}

In [8]:
def genome =  udf[String, String, String] { (species, assembly) => 
   val f = new Fixer(); import f._
    import ammonite.ops._    
    val p = "/data/ensembl/99/species/" + species + "/" + s"${species.head.toUpper + species.tail}.${assembly}.dna.primary_assembly.fa".replace(" ", "")
     
    //if(new java.io.File(p).exists) p else  if(exists! Path(fix(p, assembly))) fix(p, assembly) else {
    
    if(exists! Path(p)) p else  if(exists! Path(fix(p, assembly))) fix(p, assembly) else {
    val str = "/data/ensembl/99/species/" + species + "/" + s"${species.head.toUpper + species.tail}.${assembly}.dna.toplevel.fa".replace(" ", "") 
    fix(str, assembly)
    }
}

def cdna = udf[String, String, String] { (species, assembly) => 
val f = new Fixer(); import f._
    val str = "/data/ensembl/99/species/" + species + "/" + s"${species.head.toUpper + species.tail}.${assembly}.cdna.all.fa".replace(" ", "") 
    fix(str, assembly)
}

def gtf = udf[String, String, String] { (species, assembly) => 
    val f = new Fixer(); import f._
    val str = "/data/ensembl/99/species/" + species + "/" + s"${species.head.toUpper + species.tail}.${assembly}.99.gtf".replace(" ", "") 
    fix(str, assembly)
}
def pep =  udf[String, String, String] { (species, assembly) => 
val f = new Fixer(); import f._

    val str = "/data/ensembl/99/species/" + species + "/" + s"${species.head.toUpper + species.tail}.${assembly}.pep.all.fa".replace(" ", "") 
    fix(str, assembly)
}

def check_file_simple = udf[Boolean, String] { str =>
    new java.io.File(str).exists 
   // exists! ammonite.ops.Path(str)
}

def new_index = udf[String, String] { str => "/data/indexes/salmon/1.1.0/ensembl_99/" + str.head.toUpper + str.tail }
    

<div><b>Setting up Salmon index</b></div><div>---------------------------------<br></div>

In [10]:
def gentrome( species: String, 
    genome: String, 
    transcriptome: String,
    version: String,
    subversion: String) = 
     s"""
     {
      "species": "${species}",
      "genome": "${genome}",
      "transcriptome": "${transcriptome}",
      "version": "${version}",
      "subversion": "${subversion}"
    }"""

def salmonIndex(
    species: String, 
    genome: String, 
    transcriptome: String,
    version: String,
    subversion: String,
    folder: String = ""    
    ) = s"""
{""" +
  (if(folder!="") s""" "quant_index.indexes_folder": "${folder}", """ else "") +
  s""" "quant_index.references": [
    ${gentrome(species, genome, transcriptome, version, subversion)}
  ]
}
""".replace("\t", "  ")
salmonIndex("Homo sapiens", "/data/ensembl/99/species/homo_sapiens/Homo_sapiens.GRCh38.dna.primary_assembly.fa", 
"/data/ensembl/97/species/homo_sapiens/Homo_sapiens.GRCh38.cdna.all.fa", "GRCh38", 
"ensembl_97" ,"/data/indexes/salmon/1.1.0/ensembl_99"
)


{ "quant_index.indexes_folder": "/data/indexes/salmon/1.1.0/ensembl_99",  "quant_index.references": [
    
     {
      "species": "Homo sapiens",
      "genome": "/data/ensembl/99/species/homo_sapiens/Homo_sapiens.GRCh38.dna.primary_assembly.fa",
     

In [11]:
def index_input =  udf[String, String, String, String, String, String, String] {   (folder: String, species: String, genome: String, transcriptome: String, version: String, subversion: String) =>
  s"""
{
  "quant_index.indexes_folder": "${folder}",
  "quant_index.references": [
    {
      "species": "${species}",
      "genome": "${genome}",
      "transcriptome": "${transcriptome}",
      "version": "${version}",
      "subversion": "${subversion}"
    }
  ]
}
""".replace("\t", " ").replace("\n", "  ")
}

**Preparing the main table**




Computing genomes table

------------------------------------<br>




In [14]:
import org.apache.spark.sql.functions._
val genomes = species
  .withColumn("download", download($"species"))
  .withColumn("index", new_index($"species"))
  .withColumn("genome", genome($"species", $"assembly"))
  .withColumn("cdna", cdna($"species", $"assembly"))
  .withColumn("index_input", index_input(lit("/data/indexes/salmon/1.1.0/ensembl_99"), $"species", $"genome", $"cdna", $"assembly", lit("ensembl_99")))
  .select("name", "species", "download", "assembly", "genome", "cdna", "index", "index_input") 
  .withColumn("index_exists", check_file_simple($"index"))
  .withColumn("genome_exists", check_file_simple($"genome"))
  .withColumn("cdna_exists", check_file_simple($"cdna"))
  .withColumn("gtf", gtf($"species", $"assembly"))
  .withColumn("gtf_exists", check_file_simple($"gtf"))
  .withColumn("pep", pep($"species", $"assembly"))
  .withColumn("pep_exists", check_file_simple($"pep"))
  .select("name", "index", "index_exists", "species", "download", "assembly", "genome", "genome_exists", "cdna", "cdna_exists", "gtf", "gtf_exists", "pep", "pep_exists")
  
 genomes.show(100,1000)

+------------------------------+----------------------------------------------------------------------+------------+--------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------+-------------------------------------------------------------------------------------------------------------------------+-------------+---------------------------------------------------------------------------------------------------------------------+-----------+-----------------------------------------------------------------

In [15]:
genomes.where($"genome_exists" === false).show(1000, 10000)

+---------------+------------------------------------------------------------+------------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+---------------------------------------------------------------------------------------------------+-------------+-----------------------------------------------------------------------------------------------+-----------+------------------------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------+----------+
|  

In [16]:
(genomes.select("index", "index_exists").where($"index_exists" === true).count, 
genomes.select("index", "index_exists").where($"index_exists" === false).count)

(264,3)

In [17]:
genomes.select("index", "assembly", "index_exists").withColumn("index2", concat($"index" , lit("/") , $"assembly")).where($"index_exists" === false).show(100,1000)

+---------------------------------------------------------------+----------+------------+--------------------------------------------------------------------------+
|                                                          index|  assembly|index_exists|                                                                    index2|
+---------------------------------------------------------------+----------+------------+--------------------------------------------------------------------------+
|/data/indexes/salmon/1.1.0/ensembl_99/Erpetoichthys_calabaricus|fErpCal1.1|       false|/data/indexes/salmon/1.1.0/ensembl_99/Erpetoichthys_calabaricus/fErpCal1.1|
|              /data/indexes/salmon/1.1.0/ensembl_99/Salmo_salar| ICSASG_v2|       false|               /data/indexes/salmon/1.1.0/ensembl_99/Salmo_salar/ICSASG_v2|
|         /data/indexes/salmon/1.1.0/ensembl_99/Tupaia_belangeri|   tupBel1|       false|            /data/indexes/salmon/1.1.0/ensembl_99/Tupaia_belangeri/tupBel1|
+---------

In [18]:
genomes.columns.toList

List(name, index, index_exists, species, download, assembly, genome, genome_exists, cdna, cdna_exists, gtf, gtf_exists, pep, pep_exists)

**Computing table of files to download**




In [20]:
import ammonite.ops._
val to_download = genomes.where($"cdna_exists" === false).select("download").as[String] 
val already_downloaded = genomes.where($"cdna_exists" === true).select("download").as[String] 
println("ALREADY DOWNLOADED: "+already_downloaded.count)
println("TO DOWNLOAD: "+to_download.count)
val str = to_download.as[String].collect.toList.mkString("\n")
genomes.where($"genome_exists" === false).select("species").sort($"species".asc).show(1000)

ALREADY DOWNLOADED: 260
TO DOWNLOAD: 7
+--------------------+
|             species|
+--------------------+
| erinaceus_europaeus|
|  loxodonta_africana|
|    poecilia_formosa|
|sarcophilus_harrisii|
|       sorex_araneus|
|tetraodon_nigrovi...|
|    tupaia_belangeri|
+--------------------+



In [21]:
write.over(Path("/data/ensembl/99/selected_download.sh"), str)

<div>Writing json-s for salmon indexes</div>

In [23]:
val already_indexed = genomes.where($"index_exists" === true)
val to_index = genomes.where($"index_exists" === false)
println("ALREADY INDEXED: "+already_indexed.count)
println("TO INDEX: "+to_index.count)


ALREADY INDEXED: 264
TO INDEX: 3


In [24]:
val data =  species
  .withColumn("download", download($"species"))
  .withColumn("index", new_index($"species"))
  .withColumn("genome", genome($"species", $"assembly"))
  .withColumn("cdna", cdna($"species", $"assembly"))
  .withColumn("index", concat($"index" , lit("/") , $"assembly"))
  .select("name", "species", "download", "assembly", "genome", "cdna") 
  .as[(String, String, String, String, String , String)].collect.toList
  data

List((Spiny chromis,acanthochromis_polyacanthus,mkdir -p /data/ensembl/99/species/acanthochromis_polyacanthus && wget -t 4 -m -nH --cut-dirs=100 -P /data/ensembl/99/species/acanthochromis_polyacanthus ftp://ftp.ensembl.org/pub/release-99/gtf/acanthochrom

Salmon indexes inputs
-----------------------

In [26]:
val by_name = data.map{ case (name, species, download, assembly, genome, cdna) => s"/data/ensembl/99/inputs/salmon/by_name/${name.replace("/","-")}/${species}_${assembly}.json".replace(" ", "_") ->  salmonIndex("/data/indexes/salmon/1.1.0/ensembl_99", species, genome, cdna, assembly, "ensembl_99") }.toMap
val by_species = data.map{ case (name, species, download, assembly, genome, cdna) => s"/data/ensembl/99/inputs/salmon/by_species/${species}_${assembly}.json".replace(" ", "_") ->  salmonIndex("/data/indexes/salmon/1.1.0/ensembl_99", species, genome, cdna, assembly, "ensembl_99") }.toMap


In [27]:
by_name.take(2)

Map(/data/ensembl/99/inputs/salmon/by_name/Cow/bos_taurus_ARS-UCD1.2.json -> 
{ "quant_index.indexes_folder": "ensembl_99",  "quant_index.references": [
    
     {
      "species": "/data/indexes/salmon/1.1.0/ensembl_99",
      "genome": "bos_taurus",
 

In [28]:
import ammonite.ops._
for{(d, i) <- by_species} write.over(Path(d), i, createFolders = true)
for{(d, i) <- by_name} write.over(Path(d), i, createFolders = true)


In [29]:
by_species.keySet

Set(/data/ensembl/99/inputs/salmon/by_species/fundulus_heteroclitus_Fundulus_heteroclitus-3.0.2.json, /data/ensembl/99/inputs/salmon/by_species/oreochromis_niloticus_O_niloticus_UMD_NMBU.json, /data/ensembl/99/inputs/salmon/by_species/haplochromis_burton

<div><b>Salmon indexes batches</b></div><div>---------------------------------<br></div>

In [31]:
val sl = 3
val batches = data.map{ case (name, species, download, assembly, genome, cdna) => 
    gentrome(species, genome, cdna, assembly, "ensembl_99") 
}.sliding(sl, sl).map{case b=>
val str = b.mkString("[", ",", "]")
s"""
{
  "quant_index_batch.indexes_folder": "/data/indexes/salmon/1.1.0/ensembl_99",
  "quant_index_batch.threads_per_index": ${32 / sl},
  "quant_index_batch.references": ${str}",
  "quant_index_batch.memory_per_index": "24G"
}
"""
}.toList


In [32]:
println(batches.head)


{
  "quant_index_batch.indexes_folder": "/data/indexes/salmon/1.1.0/ensembl_99",
  "quant_index_batch.threads_per_index": 10,
  "quant_index_batch.references": [
     {
      "species": "acanthochromis_polyacanthus",
      "genome": "/data/ensembl/99/species/acanthochromis_polyacanthus/Acanthochromis_polyacanthus.ASM210954v1.dna.toplevel.fa",
      "transcriptome": "/data/ensembl/99/species/acanthochromis_polyacanthus/Acanthochromis_polyacanthus.ASM210954v1.cdna.all.fa",
      "version": "ASM210954v1",
      "subversion": "ensembl_99"
    },
     {
      "species": "accipiter_nisus",
      "genome": "/data/ensembl/99/species/accipiter_nisus/Accipiter_nisus.Accipiter_nisus_ver1.0.dna.toplevel.fa",
      "transcriptome": "/data/ensembl/99/species/accipiter_nisus/Accipiter_nisus.Accipiter_nisus_ver1.0.cdna.all.fa",
      "version": "Accipiter_nisus_ver1.0",
      "subversion": "ensembl_99"
    },
     {
      "species": "ailuropoda_melanoleuca",
      "genome": "/data/ensembl/99/species/

Preparing defaults<br>

In [34]:
import spark.implicits._
val gs = genomes.withColumn("index", concat($"index", lit("/"), $"assembly", lit("_ensembl_99"))).select("species", "index", "gtf").as[(String, String, String)].collect().toList

In [37]:
val salmon_indexes_str = gs.collect{
    case (sp, i, _) => "\"" + sp.head.toUpper + sp.tail.replace("_", " ") +  "\"" +" : " +  "\"" + i  + "\""
}.mkString(",\n")
val salmon_gtfs_str = gs.collect{
    case (sp, _, gtf) => "\"" + sp.head.toUpper + sp.tail.replace("_", " ") +  "\"" +" : " +  "\"" + gtf  + "\""
}.mkString(",\n")
//salmon_gtfs_str

In [39]:
val quant_sample_default = s"""
    {
  "quant_sample.key": "0a1d74f32382b8a154acacc3a024bdce3709",
  "quant_sample.samples_folder": "/data/samples/species",
  "quant_sample.salmon_indexes" : {
    ${salmon_indexes_str}   
  },
  "quant_sample.transcripts2genes" : {
     ${salmon_gtfs_str}
  }
}
"""
write.over(Path("/data/pipelines/quant_sample/quant_sample_default.json"), quant_sample_default, createFolders = true)

In [38]:
val quant_by_runs_default = s"""
    {
  "quant_by_runs.key": "0a1d74f32382b8a154acacc3a024bdce3709",
  "quant_by_runs.samples_folder": "/data/samples/species",
  "quant_by_runs.salmon_indexes" : {
    ${salmon_indexes_str}   
  },
  "quant_by_runs.transcripts2genes" : {
     ${salmon_gtfs_str}
  }
}
"""
write.over(Path("/data/pipelines/quant_by_runs/quant_by_runs_default.json"), quant_by_runs_default, createFolders = true)

In [35]:
val quantification_def = s"""
    {
  "quantification.key": "0a1d74f32382b8a154acacc3a024bdce3709",
  "quantification.samples_folder": "/data/samples/species",
  "quantification.salmon_indexes" : {
    ${salmon_indexes_str}   
  },
  "quantification.transcripts2genes" : {
     ${salmon_gtfs_str}
  }
}
"""
write.over(Path("/data/pipelines/quantification/quantification_default.json"), quantification_def, createFolders = true)