Preload of a personal library

In [8]:
sc.addPyFile("https://raw.githubusercontent.com/creggian/creggian-python/master/dist/creggian.zip")
from creggian.main import coords2bin, to_tsv_line
from creggian.bed import leftjoin_overlap, overlaps_any
from creggian.parse import split_tsv

### Local preprocessing

- Download BAM file from encode, eg: https://www.encodeproject.org/files/ENCFF753KWK/@@download/ENCFF753KWK.bam
- Calculate the coverage BDG file using bedtools 'bam2bdg.sh' (bedtools genomecov -bga -split ...). '-bga' option will fill non-covered regions with 0 values, hence the entire chromosome is represented in the resulting files. Chromosome border coverage are special cases that we can neglect for the purpose of this project. Output format: chr, start, end, value. 0-based coordinate system.
- Calculate the delta coverages using bedtools 'bdg2deltacoverage.sh' (bedtools closest -io -iu ...). Output format: chr, pos_0based, delta, distance. If 'pos_0based' is 10001, then its correspondence to 'pos_1based' is 10001-10002. Value 'delta' corresponds to prev_region - next_region (left - right, coordinate-wise). Value 'distance' is indeed the distance between the two regions, it has always to be 1, otherwise there is an error.
- Move everything to HDFS, that means: bam, bdg, bdg.dc
- Use ADAM framework to transform bam, bdg, bdg.dc files into parquet files ('bam2adam.sh', 'bdg2parquet.sh', 'bdgdc2parquet.sh')

### Load the data

In [2]:
def hdfs_ls(basepath, regex='.*'):
    import re
    
    hdfsResult = !hdfs dfs -ls {basepath}
    hdfsResultList = hdfsResult.list

    fullpaths = []
    for x in hdfsResultList:
        pos = x.find(basepath)
        if (pos >= 0):
            fullpaths += [x[pos:len(x)]]

    filenames = []
    for x in fullpaths:
        fname = x.split("/")[-1]
        if re.search(regex, fname):
            filenames += [fname]

    return filenames

basepath = '/user/creggian/ENCODE/hg19/brain/fetal/'
parquets = hdfs_ls(basepath, regex='bdg.dc.parquet')
parquets

['ENCFF015DEA.bdg.dc.parquet',
 'ENCFF041THY.bdg.dc.parquet',
 'ENCFF113PDT.bdg.dc.parquet',
 'ENCFF220QDT.bdg.dc.parquet',
 'ENCFF226SQK.bdg.dc.parquet',
 'ENCFF229XLD.bdg.dc.parquet',
 'ENCFF243QVH.bdg.dc.parquet',
 'ENCFF286ZEF.bdg.dc.parquet',
 'ENCFF331ZDV.bdg.dc.parquet',
 'ENCFF391XTO.bdg.dc.parquet',
 'ENCFF397QLJ.bdg.dc.parquet',
 'ENCFF434OIG.bdg.dc.parquet',
 'ENCFF513CAU.bdg.dc.parquet',
 'ENCFF602BYA.bdg.dc.parquet',
 'ENCFF633PEY.bdg.dc.parquet',
 'ENCFF677URC.bdg.dc.parquet',
 'ENCFF741AGL.bdg.dc.parquet',
 'ENCFF746HOG.bdg.dc.parquet',
 'ENCFF753KWK.bdg.dc.parquet',
 'ENCFF803AAV.bdg.dc.parquet',
 'ENCFF892QFE.bdg.dc.parquet',
 'ENCFF907ILS.bdg.dc.parquet',
 'ENCFF927WKN.bdg.dc.parquet',
 'ENCFF954GHY.bdg.dc.parquet']

In [3]:
bdg_ss_dict = {}
for filename in parquets:
    data = spark.read.parquet(basepath + filename)
    bdg_ss_dict[filename] = data
    
bdg_ss_dict["ENCFF015DEA.bdg.dc.parquet"].show()

+----------+---+-----+--------+
|       chr|pos|delta|distance|
+----------+---+-----+--------+
|ERCC-00002|  1|   -5|       1|
|ERCC-00002|  2|  -24|       1|
|ERCC-00002|  3|  -19|       1|
|ERCC-00002|  4|   -5|       1|
|ERCC-00002|  7|   -3|       1|
|ERCC-00002|  9|  -21|       1|
|ERCC-00002| 10|  -12|       1|
|ERCC-00002| 11|   -5|       1|
|ERCC-00002| 12|   -2|       1|
|ERCC-00002| 13|   -6|       1|
|ERCC-00002| 14|   -4|       1|
|ERCC-00002| 15|   -5|       1|
|ERCC-00002| 16|   -6|       1|
|ERCC-00002| 18|   -3|       1|
|ERCC-00002| 19|   -8|       1|
|ERCC-00002| 20|  -72|       1|
|ERCC-00002| 21|  -57|       1|
|ERCC-00002| 22| -111|       1|
|ERCC-00002| 23| -145|       1|
|ERCC-00002| 24|  -99|       1|
+----------+---+-----+--------+
only showing top 20 rows



In [4]:
bdg_ss_view_appendix = "_bdg_ss"
for filename in parquets:
    bam_filename = filename.split(".")[0]  # ENCFF513CAU.bdg.parquet => ENCFF513CAU
    bdg_ss_dict[filename].createOrReplaceTempView(bam_filename + bdg_ss_view_appendix)

Filter according to the delta coverage and distance. Create the auxiliary column to group by: "chr_pos"

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *


def df_ss_filtering(fview, dc=0, dist=1):
    query  = "SELECT * FROM " + fview
    query += " WHERE abs(delta) >= " + str(dc)
    query += " AND distance == " + str(dist)
    
    return spark.sql(query)


def concat_chr_pos(chrom, pos):
    return str(chrom) + "_" + str(pos)


def is_neg_delta(delta):
    ret = 0
    if delta < 0:
        ret = 1
    return ret


def is_pos_delta(delta):
    ret = 0
    if delta > 0:
        ret = 1
    return ret


udf_concat_chr_pos = udf(concat_chr_pos, StringType())
udf_is_neg_delta = udf(is_neg_delta, LongType())
udf_is_pos_delta = udf(is_pos_delta, LongType())

bdg_ss_dict_targets = {}
for filename in parquets:
    fview = filename.split(".")[0] + bdg_ss_view_appendix
    bdg_ss_dict_target = df_ss_filtering(fview, dc=20, dist=1)
    bdg_ss_dict_target = bdg_ss_dict_target.withColumn("chr_pos", udf_concat_chr_pos("chr", "pos"))
    bdg_ss_dict_target = bdg_ss_dict_target.withColumn("is_neg_delta", udf_is_neg_delta("delta"))
    bdg_ss_dict_target = bdg_ss_dict_target.withColumn("is_pos_delta", udf_is_pos_delta("delta"))
    bdg_ss_dict_targets[filename] = bdg_ss_dict_target

bdg_ss_dict_targets["ENCFF041THY.bdg.dc.parquet"].show()

+----+-----+-----+--------+----------+------------+------------+
| chr|  pos|delta|distance|   chr_pos|is_neg_delta|is_pos_delta|
+----+-----+-----+--------+----------+------------+------------+
|chr1|10000|  -29|       1|chr1_10000|           1|           0|
|chr1|10001|  -79|       1|chr1_10001|           1|           0|
|chr1|10002|  -22|       1|chr1_10002|           1|           0|
|chr1|10003| -124|       1|chr1_10003|           1|           0|
|chr1|10004| -179|       1|chr1_10004|           1|           0|
|chr1|10005|  -79|       1|chr1_10005|           1|           0|
|chr1|10006|  -26|       1|chr1_10006|           1|           0|
|chr1|10007|  -85|       1|chr1_10007|           1|           0|
|chr1|10009|  -24|       1|chr1_10009|           1|           0|
|chr1|10013|  -24|       1|chr1_10013|           1|           0|
|chr1|10016|  -44|       1|chr1_10016|           1|           0|
|chr1|10034|  -21|       1|chr1_10034|           1|           0|
|chr1|10077|   27|       

Union all dataframes

In [6]:
def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]
    
dfs = union_all(bdg_ss_dict_targets.values())
#dfs.count()  # 15641227

In [7]:
dfs.show()

+----------+---+-----+--------+--------------+------------+------------+
|       chr|pos|delta|distance|       chr_pos|is_neg_delta|is_pos_delta|
+----------+---+-----+--------+--------------+------------+------------+
|ERCC-00002| 22|  -23|       1| ERCC-00002_22|           1|           0|
|ERCC-00002| 25|  -37|       1| ERCC-00002_25|           1|           0|
|ERCC-00002| 26|  -29|       1| ERCC-00002_26|           1|           0|
|ERCC-00002| 64|  -31|       1| ERCC-00002_64|           1|           0|
|ERCC-00002| 68|  -20|       1| ERCC-00002_68|           1|           0|
|ERCC-00002| 71|  -37|       1| ERCC-00002_71|           1|           0|
|ERCC-00002| 91|  -21|       1| ERCC-00002_91|           1|           0|
|ERCC-00002| 93|  -21|       1| ERCC-00002_93|           1|           0|
|ERCC-00002|101|  -22|       1|ERCC-00002_101|           1|           0|
|ERCC-00002|123|   21|       1|ERCC-00002_123|           0|           1|
|ERCC-00002|127|   22|       1|ERCC-00002_127|     

Group by "chr_pos" and filter for presence of splicing size greater than 0.5

In [28]:
threshold = 0.5


def split_chr_pos_chr(chr_pos):
    return str(chr_pos).split("_")[0]


def split_chr_pos_pos(chr_pos):
    return int(float(str(chr_pos).split("_")[1]))


def reduce_neg_pos_delta(x, y):
    if x[0] != y[0]:
        raise Exception("In 'reduce_neg_pos_delta' reduced 'pos' are not the same !")
    return (x[0], x[1] + y[1], x[2] + y[2])


def sort_rbk(x):
    chr_pos, is_delta = x
    poss, is_neg_delta, is_pos_delta = is_delta
    
    strand = "+"
    n = is_pos_delta
    if (int(is_neg_delta) > int(is_pos_delta)):
        strand = "-"
        n = is_neg_delta
    
    chrom = split_chr_pos_chr(chr_pos)
    pos = split_chr_pos_pos(chr_pos)
    return (chrom, int(pos), strand, int(n))


def filter_threshold(x, th):
    n = int(x[3])
    return n >= th


# reduceByKey => (chr1_10006, (10006, 1, 0))
th = len(bdg_ss_dict_targets)*threshold
dfs_threshold = dfs.rdd\
    .map(lambda x: (x[4], (x[1], x[5], x[6])))\
    .reduceByKey(reduce_neg_pos_delta)\
    .map(sort_rbk)\
    .filter(lambda x: filter_threshold(x, th))\
    .map(to_tsv_line)\
    .saveAsTextFile("/user/creggian/datarepo/fetalBrain_ss_dc20_freq05_20170223.bed")
    
#dfs_threshold.first()  # 434043

In [None]:
#threshold = 0.5
#
#dfs_threshold = dfs.groupBy("chr_pos")\
#    .count()\
#    .withColumnRenamed("count", "n")\
#    .filter("n >= " + str(len(bdg_ss_dict_targets)*threshold))\
#    .withColumn("chr", udf_split_chr_pos_chr("chr_pos"))\
#    .withColumn("pos", udf_split_chr_pos_pos("chr_pos"))
#    
#dfs_threshold.count()  # 434043