In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import json
import time
import sys

#analysisName=sys.argv[1]
#indexControl=int(sys.argv[2])
#scope=sys.argv[3]
#scale=sys.argv[4]

sqlCase="sample_id IN('ZH128472','ZH136914','ZH141272','ZH141483','ZH108301','ZH135914','ZH136155','ZH136587','ZH137071','ZH141455','ZH135614') AND snpeff_impact = 'HIGH'"
sqlControl="sample_id IN('ZH136913','ZH136915','ZH141389','ZH141390','ZH142274','ZH142276','ZH1428','ZH1429','ZH135907','ZH135909','ZH136156','ZH136157','ZH136586','ZH137070','ZH137072','ZH137703','ZH142270','ZH96867','ZH141454','ZH141456','ZH135613','ZH135615') AND snpeff_impact = 'HIGH'"

nPartitions=4
conf = (SparkConf()
         .setMaster("local["+str(nPartitions)+"]")
         .setAppName("Ranking")
#         .set("spark.executor.memory", "5g")
#         .set("spark.driver.memory", "5g")
#         .set("spark.python.worker.memory", "5g")
       )
sc.stop()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")

parquetFile = sqlContext.read.parquet("/user/hive/warehouse/gvr4.db/variantsulb")
parquetFile.registerTempTable("parquetFile");

In [2]:
analysisName="neurodev_control_digenic"
group1name="ulb_control_damaging"
group2name="ulb_neurodev_damaging"
indexControl=5
scope="digenic"
scale="gene"
nPartitions=4

In [3]:
def splitByVariantID(variantData):
    ID=variantData[1]+":"+str(variantData[2])+":"+variantData[3]+":"+variantData[4]
    return (ID,(variantData[5],variantData[0],variantData[6]))

def buildVariantVector(ID,variantData,samplesID):
    variantData=list(variantData)
    result=[0]*len(samplesID)
    
    #Get sampleID/Genotype for each variant
    sID=[]
    geno=[]
    for i in range(0,len(variantData)):
        if variantData[i][2]=="Homozygous":
            geno.append(2)
        else:
            geno.append(1)
        sID.append(variantData[i][1])
    
    #Sort according to sampleID
    sIDsorted=[v for v in sorted(enumerate(sID), key=lambda x:x[1])]
    sID=[v[1] for v in sIDsorted]
    geno=[geno[v[0]] for v in sIDsorted]                             
    
    curID=0
    for i in range(0,len(samplesID)):
        if sID[curID]==samplesID[i]:
            result[i]=geno[curID]
            curID=curID+1
        if curID==len(sID):
            break;
    
    return ((ID,variantData[0][0]),result)



In [4]:
def splitValues(variantData):    
    return (variantData[0][1],(variantData[0][0],variantData[1]))

def makePairParts(k,v,nbPart):
    result=[]
    for i in range(0,nbPart):
        result.append(((k,i),v))
        
    return [(str(sorted([k,i])),(v)) for i in range(0,nbPart)]

def f(splitIndex ,v): 
    return [(splitIndex,list(v))]

In [5]:
def scoreVariantUnivariate(k,variantData):
    variantData=list(variantData)
    
    score=0
    sumControl=0
    
    sumCase=sum([int(x>0) for x in variantData[0]])
    sumControl=sum([int(x>0) for x in variantData[1]])
    
    score=sumCase#-sumControl
    if sumControl>0:
        score=0
        
    if score>0:
        return (k,(score,(sumCase,sumControl),variantData))

In [6]:
def scoreGeneUnivariate(k,variantList):
    variantList=list(variantList)
    result=[k,(0,0,0)]
    sumCase=0
    sumControl=0
    score=0
    genosumcase=[]
    genosumcontrol=[]
    if len(variantList)>0:
        for i in range(0,len(variantList)):
            (locus,geno)=variantList[i]
            if genosumcase==[]:
                genosumcase=[int(x) for x in geno[0]]
                genosumcontrol=[int(x) for x in geno[1]]
            else:
                genosumcase=[int(x)+int(y) for x,y in zip(genosumcase,geno[0])]
                genosumcontrol=[int(x)+int(y) for x,y in zip(genosumcontrol,geno[1])]
                
        sumCase=sum([int(x>0) for x in genosumcase])
        sumControl=sum([int(x>0) for x in genosumcontrol])
        score=sumCase-sumControl

    if score>0:
        result=[k,(score,sumCase,sumControl)]
        return result

In [7]:
def scoreDigenicGene(k,variantLists):
    variantLists=list(variantLists)
    result=[]
    geno1sumcase=[]
    geno1sumcontrol=[]
    geno2sumcase=[]
    geno2sumcontrol=[]
    score=0
    gene1=""
    gene2=""
    sumCase=-1
    sumControl=-1
    if len(variantLists)==2:
        (genes,variantList1)=list(variantLists[0])
        (genes,variantList2)=list(variantLists[1])
        gene1=genes[0]
        gene2=genes[1]
        variantList1=list(variantList1)
        variantList2=list(variantList2)
        for i in range(0,len(variantList1)):
            (locus1,geno1)=variantList1[i]
            if geno1sumcase==[]:
                geno1sumcase=[int(x) for x in geno1[0]]
                geno1sumcontrol=[int(x) for x in geno1[1]]
            else:
                geno1sumcase=[int(x)+int(y) for x,y in zip(geno1sumcase,geno1[0])]
                geno1sumcontrol=[int(x)+int(y) for x,y in zip(geno1sumcontrol,geno1[1])]
                
        for i in range(0,len(variantList2)):
            (locus2,geno2)=variantList2[i]
            if geno2sumcase==[]:
                geno2sumcase=[int(x) for x in geno2[0]]
                geno2sumcontrol=[int(x) for x in geno2[1]]
            else:
                geno2sumcase=[int(x)+int(y) for x,y in zip(geno2sumcase,geno2[0])]
                geno2sumcontrol=[int(x)+int(y) for x,y in zip(geno2sumcontrol,geno2[1])]
                
        genosumcase=[x+y for x,y in zip(geno1sumcase,geno2sumcase)]
        genosumcontrol=[x+y for x,y in zip(geno1sumcontrol,geno2sumcontrol)]
        sumCase=sum([int(x>0) for x in genosumcase])
        sumControl=sum([int(x>0) for x in genosumcontrol])
        score=sumCase-sumControl
        #if sumControl>0:
        #    score=0
        
    if score>0:
        return (k,((gene1,gene2),score,sumCase,sumControl))

def getGene(variantData):
    variantGene=variantData[0][1]
    
    return (variantGene)

def createPairsGenes(k,v,genes):
    return [(str(sorted([k,gene])),(sorted([k,gene]),v)) for gene in genes]

def fillMissing(k,v):
    v=list(v)
    if v[1] is None:
        v[1]=[0]*len(sample_id_control_b.value)
        
    return (k,v)

In [8]:


RDDcase = sqlContext.sql("SELECT sample_id,chr,position,reference,alternative,gene_symbol,zygosity FROM parquetFile where "+sqlCase)
RDDcontrol= sqlContext.sql("SELECT sample_id,chr,position,reference,alternative,gene_symbol,zygosity FROM parquetFile where "+sqlControl)

sample_id_case=sorted(RDDcase.map(lambda v:v[0]).distinct().collect())
sample_id_control=sorted(RDDcontrol.map(lambda v:v[0]).distinct().collect())

sample_id_case_b = sc.broadcast(sample_id_case)
sample_id_control_b = sc.broadcast(sample_id_control)

genoMatCase=RDDcase.map(splitByVariantID).groupByKey()
genoMatCase=genoMatCase.map(lambda (k,v):buildVariantVector(k,v,sample_id_case))

genoMatControl=RDDcontrol.map(splitByVariantID).groupByKey()
genoMatControl=genoMatControl.map(lambda (k,v):buildVariantVector(k,v,sample_id_control))

genoMat=genoMatCase.leftOuterJoin(genoMatControl).map(lambda (k,v): fillMissing(k,v))

In [9]:
genoMat.count()

1946

In [20]:
genoMat2=sc.parallelize(genoMat.take(1000))

In [23]:
start_time = time.time()


if scope=='monogenic':
    if scale=='variant':
        scores=genoMat.map(lambda (k,v):scoreVariantUnivariate(k,v)).filter(lambda x:x is not None).takeOrdered(1000, key=lambda (k,(v1,v2,v3)): -v1)

    if scale=='gene':
        scores=genoMat.map(splitValues).groupByKey().map(lambda (k,v):scoreGeneUnivariate(k,v)).filter(lambda x:x is not None).takeOrdered(1000, key=lambda (k,(v1,v2,v3)): -v1)
    
if scope=='digenic':
    genes=genoMat.map(getGene).distinct().takeOrdered(10000)#.flatMap(lambda (k,v):scoreCompound(k,v)).takeOrdered(100000, key=lambda (k,v1,v2,v3): -v1)
    scores=genoMat.map(splitValues).groupByKey().flatMap(lambda (k,v):createPairsGenes(k,v,genes)).groupByKey().map(lambda (k,v):scoreDigenicGene(k,v)).filter(lambda x:x is not None).takeOrdered(1000, key=lambda (k,(genes,v1,v2,v3)): -v1)

end_time=time.time()
runtime=end_time - start_time
print(runtime)


32.8956751823


In [24]:
len(genes)

1326

In [25]:
sample_id_control

[u'ZH135613',
 u'ZH135615',
 u'ZH135907',
 u'ZH135909',
 u'ZH136156',
 u'ZH136157',
 u'ZH136586',
 u'ZH136913',
 u'ZH136915',
 u'ZH137070',
 u'ZH137072',
 u'ZH137703',
 u'ZH141389',
 u'ZH141390',
 u'ZH141454',
 u'ZH141456',
 u'ZH142270',
 u'ZH142274',
 u'ZH142276',
 u'ZH1428',
 u'ZH1429',
 u'ZH96867']

In [26]:
scores=[analysisName,scale,scope,start_time,end_time,runtime,scores,sample_id_case,sample_id_control,group1name,group2name]

with open("analyses/"+analysisName+'.txt', 'w') as outfile:
    json.dump(scores, outfile)
    

In [None]:
sc.stop()