In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
import json
import time
import sys
import math #Needed for floor/ceil
from scipy.stats import fisher_exact, ttest_ind
import numpy as np

conf = (SparkConf()
         .setMaster("local[8]")
       )
sc.stop()
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")


DataFrame[key: string, value: string]

In [2]:
#Input is vector patient, chr, pos, ref, alt, gene_symbol, zygosity
def createKey_VariantGene(variantData):
    #ID is chr:pos:ref:alt
    ID=variantData[1]+":"+str(variantData[2])+":"+variantData[3]+":"+variantData[4]
    
    #return ID, gene_symbol, patient, zygosity
    zygosity=1
    if variantData[6]=="Homozygous":
    #if variantData[6]==2:
        zygosity=2
    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientIndex=patientsID_dictionnary[variantData[0]]
    return ((ID,variantData[5]),(patientIndex,zygosity))

#variantGeneEntry: key is (variantID,gene), value is (patientIndex,zygosity)
def geneAsKey(variantGeneEntry):    
    return (variantGeneEntry[0][1],(variantGeneEntry[0][0],variantGeneEntry[1]))

def getVariantID(key_VariantGene):
    return key_VariantGene[0]

def f(splitIndex ,v): 
    return [(splitIndex,list(v))]

def toSQLString(strList):
    strList="','".join(strList)
    return "('"+strList+"')"

In [3]:
#Transform sparse data (list of (sample_id,zygozity)) into vector z_i
def vectorize(genotypeDataList):
    genotypeDataList=list(genotypeDataList)
    genotypeVector=[0]*len(patientsID_dictionnary_b.value)
    if len(genotypeDataList)>0:
        for j in range(0,len(genotypeDataList)):
            genotypeVector[genotypeDataList[j][0]]=genotypeDataList[j][1]
        
        sumCase=float(sum([int(x>0) for x in genotypeVector[0:patientsID_split_index_b.value]]))
        sumControl=float(sum([int(x>0) for x in genotypeVector[patientsID_split_index_b.value:len(patientsID_dictionnary_b.value)]]))
    
        ratioCase=sumCase/patientsID_split_index_b.value
        ratioControl=sumControl/(len(patientsID_dictionnary_b.value)-patientsID_split_index_b.value)
        
        if (ratioCase>float(caseMAF_b.value)) or (ratioControl>float(controlMAF_b.value)):
            genotypeVector=[0]*len(patientsID_dictionnary_b.value)
        
    return genotypeVector        


In [4]:
#Compute burden for variantList
def burden(geneID_variantList):
    (geneID,variantList)=geneID_variantList
    variantList=list(variantList)
    burden=[0]*len(patientsID_dictionnary_b.value)
    
    if len(variantList)>0:
        #Go through list of variants
        for i in range(0,len(variantList)):
            #Get variant ID, and list of sample_index,genotype
            (variantID,genotypeDataList)=variantList[i]
            #if genotypeDataList.__class__==tuple:
            #    genotypeDataList=[genotypeDataList]
            #else:
            #    genotypeDataList=list(genotypeDataList)
            
            #Get genotype vector for current variantID
            genotypeDataVector=vectorize(genotypeDataList)
            #And sum with previous genotype vectors
            burden=[x+y for x,y in zip(burden,genotypeDataVector)]
    
    return (geneID,burden)

In [5]:
#variantList is [(locusID,[genotype])]
def scoreVariant(ID_genotypeDataList):
    ((variantID,geneID),genotypeDataList)=ID_genotypeDataList
    #genotypeList=list(value_GenotypeList)
    
    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientsID_split_index=patientsID_split_index_b.value
    
    genotypeDataVector=vectorize(genotypeDataList)
    
    #sumCase=float(sum([int(x>0) for x in genotypeDataVector[0:patientsID_split_index]]))
    #sumControl=float(sum([int(x>0) for x in genotypeDataVector[patientsID_split_index:len(patientsID_dictionnary)]]))
    sumCase=float(sum([x for x in genotypeDataVector[0:patientsID_split_index]]))
    sumControl=float(sum([x for x in genotypeDataVector[patientsID_split_index:len(patientsID_dictionnary)]]))
    
    ratioCase=sumCase/patientsID_split_index
    ratioControl=sumControl/(len(patientsID_dictionnary)-patientsID_split_index)
        
    score=ratioCase-ratioControl
    #pvalue=fisher_exact([[sumCase,patientsID_split_index-sumCase],[sumControl,len(patientsID_dictionnary)-patientsID_split_index]],'greater')[1]
    pvalue=ttest_ind(genotypeDataVector[0:patientsID_split_index],genotypeDataVector[patientsID_split_index:len(patientsID_dictionnary)])[1]/2
    
    
    if score>0:
        return (variantID,(score,pvalue,ratioCase,ratioControl,sumCase,sumControl))#variantList is [(locusID,[genotype])]


In [6]:
def scoreVariantPair(variantIDpair,value_GenotypeListPair):
    
    genotypeListPair=list(value_GenotypeListPair)
    
    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientsID_split_index=patientsID_split_index_b.value
    
    score=0
    if len(genotypeListPair)==2:
        (variantID,genotypeList1)=genotypeListPair[0]
        (variantID,genotypeList2)=genotypeListPair[1]
        
        variantID1=variantID[0]
        variantID2=variantID[1]
        
        genotypeList1=list(genotypeList1)
        genotypeList2=list(genotypeList2)
        
        genotypeVector1=getGenotypeVector(genotypeList1)
        genotypeVector2=getGenotypeVector(genotypeList2)
        
        genotypeVector=[int(x>0 and y>0) for x,y in zip(genotypeVector1,genotypeVector2)]
        
        sumCase=float(sum([int(x>0) for x in genotypeVector[0:patientsID_split_index]]))
        ratioCase=sumCase/patientsID_split_index
        sumControl=float(sum([int(x>0) for x in genotypeVector[(patientsID_split_index+1):len(patientsID_dictionnary)]]))
        ratioControl=sumControl/(len(patientsID_dictionnary)-patientsID_split_index)
        
        score=ratioCase-ratioControl
        pvalue=fisher_exact([[sumCase,patientsID_split_index-sumCase],[sumControl,len(patientsID_dictionnary)-patientsID_split_index]],'greater')[1]
        
        #if score>0:
        return (variantIDpair,((variantID1,variantID2),score,pvalue,ratioCase,ratioControl,sumCase,sumControl))



In [7]:
def scoreVariantPair(block_i,block_k,i,k):
    block_k=list(block_k)
    len_i=len(block_i)
    len_k=len(block_k)
    scores=[]

    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientsID_split_index=patientsID_split_index_b.value
    
    start_k=0
    skip_last=0
    if i==k:
        skip_last=1
        if len_i==1:
            len_i=0
    
    if len_i>0 and len_k>0:
        for it_i in range(0,len_i-skip_last):
            if i==k:
                start_k=it_i+1
            for it_k in range(start_k,len_k):
                listLoadBlock_i=block_i[it_i]
                listLoadBlock_k=block_k[it_k]
                
                genoSum=[int(x>0 and y>0) for x,y in zip(vectorize(listLoadBlock_i[1]),vectorize(listLoadBlock_k[1]))]
                sumCase=float(sum([int(x>0) for x in genoSum[0:patientsID_split_index]]))
                sumControl=float(sum([int(x>0) for x in genoSum[(patientsID_split_index):len(patientsID_dictionnary)]]))
        
                ratioCase=sumCase/patientsID_split_index
                ratioControl=sumControl/(len(patientsID_dictionnary)-patientsID_split_index)
        
                score=ratioCase-ratioControl
                pvalue=fisher_exact([[sumCase,patientsID_split_index-sumCase],[sumControl,len(patientsID_dictionnary)-patientsID_split_index]],'greater')[1]
        
                if score>=0:
                    scores.append(((listLoadBlock_i[0],listLoadBlock_k[0]),(score,pvalue,ratioCase,ratioControl,sumCase,sumControl)))
    return scores


In [None]:
def scoreVariantPair(block_i,block_k,i,k):

In [8]:
#variantList is [(locusID,[sample_index,genotype])]
def scoreGene(geneID_burden):
    (geneID,burden)=geneID_burden
    
    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientsID_split_index=patientsID_split_index_b.value
    
    sumCase=float(sum([int(x>0) for x in burden[0:patientsID_split_index]]))
    sumControl=float(sum([int(x>0) for x in burden[patientsID_split_index:len(patientsID_dictionnary)]]))
    
    ratioCase=sumCase/patientsID_split_index
    ratioControl=sumControl/(len(patientsID_dictionnary)-patientsID_split_index)
        
    score=ratioCase-ratioControl
    pvalue=fisher_exact([[sumCase,patientsID_split_index-sumCase],[sumControl,len(patientsID_dictionnary)-patientsID_split_index]],'greater')[1]
    #pvalue=ttest_ind(genotypeVectorByGene[0:patientsID_split_index],genotypeVectorByGene[patientsID_split_index:len(patientsID_dictionnary)])[1]/2
        
    if score>=0:
        return (geneID,(score,pvalue,ratioCase,ratioControl,sumCase,sumControl))

In [9]:
def scoreGenePair(block_i,block_k,i,k):
    block_k=list(block_k)
    len_i=len(block_i)
    len_k=len(block_k)
    scores=[]

    patientsID_dictionnary=patientsID_dictionnary_b.value
    patientsID_split_index=patientsID_split_index_b.value
    
    start_k=0
    skip_last=0
    if i==k:
        skip_last=1
        if len_i==1:
            len_i=0
    
    if len_i>0 and len_k>0:
        for it_i in range(0,len_i-skip_last):
            if i==k:
                start_k=it_i+1
            for it_k in range(start_k,len_k):
                listLoadBlock_i=block_i[it_i]
                listLoadBlock_k=block_k[it_k]
                genoSum=[int(x>0 and y>0) for x,y in zip(listLoadBlock_i[1],listLoadBlock_k[1])]
                sumCase=float(sum([int(x>0) for x in genoSum[0:patientsID_split_index]]))
                sumControl=float(sum([int(x>0) for x in genoSum[(patientsID_split_index):len(patientsID_dictionnary)]]))
        
                ratioCase=sumCase/patientsID_split_index
                ratioControl=sumControl/(len(patientsID_dictionnary)-patientsID_split_index)
        
                score=ratioCase-ratioControl
                pvalue=fisher_exact([[sumCase,patientsID_split_index-sumCase],[sumControl,len(patientsID_dictionnary)-patientsID_split_index]],'greater')[1]
        
                if score>=0:
                    scores.append(((listLoadBlock_i[0],listLoadBlock_k[0]),(score,pvalue,ratioCase,ratioControl,sumCase,sumControl)))
    return scores


In [54]:
[[0,1]]+[[2,3]]

[[0, 1], [2, 3]]

In [82]:
def ranking(sqlCase,sqlControl,scale,scope,p):
    start_time = time.time()
    nvariants=0
    l_blocks=[]
    
    variants_case = sqlContext.sql("SELECT sample_id,chr,pos,ref,alt,gene_symbol,zygosity FROM variantData "+sqlCase)
    variants_control= sqlContext.sql("SELECT sample_id,chr,pos,ref,alt,gene_symbol,zygosity FROM variantData "+sqlControl)

    variants=variants_control.unionAll(variants_case)
    variants_grouped=variants.rdd.map(createKey_VariantGene).groupByKey(p)

    if scope=='monogenic':
        if scale=='variant':
            ntests=variants_grouped.count()
            nvariants=ntests
            finish_load_time=time.time()
            runtime_load=finish_load_time - start_time
            scores=variants_grouped.map(scoreVariant).filter(lambda x:x is not None).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)

        if scale=='gene':
            variants_grouped_by_gene=variants_grouped.map(geneAsKey).groupByKey(p)
            ntests=variants_grouped_by_gene.count()
            finish_load_time=time.time()
            runtime_load=finish_load_time - start_time
            burden_by_gene=variants_grouped_by_gene.map(burden)
            burden_by_gene.count()
            finish_burden_time=time.time()
            runtime_burden=finish_burden_time - finish_load_time
            scores=burden_by_gene.map(scoreGene).filter(lambda x:x is not None).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)

    if scope=='digenic':
        if scale=='variant':
            ntests=variants_grouped.count()
            nvariants=ntests
            finish_load_time=time.time()
            runtime_load=finish_load_time - start_time
            
            variants_grouped_with_partitions=variants_grouped.mapPartitionsWithIndex(lambda splitIndex,v: [(splitIndex,list(v))])
            scores=[]
            
            n_pair_blocks=float(p*(p+1)/2)
            bound_low=int(math.floor(n_pair_blocks/p))
            bound_high=int(math.ceil(n_pair_blocks/p))

            for i in range(0,p):
                block_i=variants_grouped_with_partitions.filter(lambda (k,v):k==i).collect()[0][1]
                if i<bound_low:
                    list_blocks_to_pair=range(0,i+1)+range(p-bound_high+i+1,p)
                else:
                    list_blocks_to_pair=range(i-bound_low+1,i+1)
                l_blocks=[l_blocks]+[list_blocks_to_pair]
                score=variants_grouped_with_partitions.filter(lambda (k,v):k in list_blocks_to_pair).flatMap(lambda (k,v):scoreVariantPair(block_i,v,i,k)).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)
                scores=scores+score
            scores=sc.parallelize(scores,p).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)
            ntests=ntests*(ntests+1)/2
   
        if scale=='gene':
            variants_grouped_by_gene=variants_grouped.map(geneAsKey).groupByKey(p)
            ntests=variants_grouped_by_gene.count()
            finish_load_time=time.time()
            runtime_load=finish_load_time - start_time
            burden_by_gene=variants_grouped_by_gene.map(burden)
            burden_by_gene.count()
            finish_burden_time=time.time()
            runtime_burden=finish_burden_time - finish_load_time
            burden_by_gene_with_partitions=burden_by_gene.mapPartitionsWithIndex(lambda splitIndex,v: [(splitIndex,list(v))])
            #burden_by_gene_with_partitions.cache()
            scores=[]
            for i in range(0,p):
                block_i=burden_by_gene_with_partitions.filter(lambda (k,v):k==i).collect()[0][1]
                score=burden_by_gene_with_partitions.filter(lambda (k,v):k>=i).flatMap(lambda (k,v):scoreGenePair(block_i,v,i,k)).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)
                scores=scores+score
            scores=sc.parallelize(scores,p).takeOrdered(1000, key=lambda (k,(v1,v2,v3,v4,v5,v6)): -v1)
            ntests=ntests*(ntests+1)/2
    
    end_time=time.time()
    if scale=='variant':
        runtime_score=end_time - finish_load_time
        all_times=[runtime_load,0,runtime_score]
    if scale=="gene":
        runtime_score=end_time - finish_burden_time
        all_times=[runtime_load,runtime_burden,runtime_score]

    return (all_times,scores,ntests,nvariants,l_blocks)


In [11]:
caseMAF=1.0
controlMAF=1.0

#pathVariants='/user/hive/warehouse/digest.db/exomes_1000g_p'
pathVariants='/Users/yalb/Projects/Github/digest/exomes_1000g_p'
parquetFile = sqlContext.read.parquet(pathVariants)
parquetFile.registerTempTable("variantData");

patientsID_all = sqlContext.sql("SELECT distinct sample_id FROM variantData order by sample_id asc").collect()
patientsID_all = [patient[0] for patient in patientsID_all]

genes_all = sqlContext.sql("SELECT distinct gene_symbol FROM variantData order by gene_symbol asc").collect()
genes_all = [gene[0] for gene in genes_all]
genes_all.pop(0) ##First element is NULL


In [111]:
np.random.seed(133)

n_genes=30
n_samples=50

scale='variant'
scope='digenic'

p=5

runtimes=[]
n_variants=[]
    
for n_genes in [n_genes]:
    #np.random.shuffle(genes_all)
    genes=toSQLString(genes_all[0:n_genes])
    
    #np.random.shuffle(patientsID_all)
    patientsID_case=patientsID_all[0:n_samples]
    patientsID_control=patientsID_all[n_samples:(2*n_samples)]
                     
    patientsID=patientsID_case+patientsID_control
    patientsID_dictionnary=dict(zip(patientsID,range(len(patientsID))))
        
    patientsID_split_index_b = sc.broadcast(len(patientsID_case))
    patientsID_dictionnary_b = sc.broadcast(patientsID_dictionnary)

    controlMAF_b=sc.broadcast(controlMAF)
    caseMAF_b=sc.broadcast(caseMAF)

    sqlCase="where sample_id in "+toSQLString(patientsID_case)+" and gene_symbol in "+genes
    sqlControl="where sample_id in "+toSQLString(patientsID_control)+" and gene_symbol in "+genes
#    sqlCase="where sample_id in "+toSQLString(patientsID_case)+" and chr='1'"
#    sqlControl="where sample_id in "+toSQLString(patientsID_control)+" and chr='1'"
    
    for i in range(0,1):
        (all_times,scores,ntests,nvariants,l_blocks)=ranking(sqlCase,sqlControl,scale,scope,p)
        runtimes.append(all_times)
        n_variants.append(nvariants)
        if all_times[2]>3600:
            break


In [105]:
nvariants

709

In [112]:
all_times

[1.1071438789367676, 0, 28.229923009872437]

In [98]:
all_times

[1.0633659362792969, 0, 21.333357095718384]

In [99]:
scores[0:3]

[(((u'1:12776344:A:T', u'AADACL3'), (u'12:9248332:T:C', u'A2M')),
  (0.26, 0.00095976017759529094, 0.42, 0.16, 21.0, 8.0)),
 (((u'1:12776344:A:T', u'AADACL3'), (u'22:43088549:C:T', u'A4GALT')),
  (0.26, 0.00081907089681844901, 0.46, 0.2, 23.0, 10.0)),
 (((u'12:9021053:C:CGT', u'A2ML1'), (u'12:9248332:T:C', u'A2M')),
  (0.26, 0.00081907089681844901, 0.46, 0.2, 23.0, 10.0))]

In [93]:
scores[0:3]

[(((u'12:9013755:C:T', u'A2ML1'), (u'16:8832515:C:T', u'ABAT')),
  (0.33999999999999997, 1.9039589026397727e-05, 0.6, 0.26, 30.0, 13.0)),
 (((u'16:8841834:G:T', u'ABAT'), (u'16:2349371:T:C', u'ABCA3')),
  (0.32, 0.00010536058650236864, 0.46, 0.14, 23.0, 7.0)),
 (((u'12:9013755:C:T', u'A2ML1'), (u'16:8832485:G:T', u'ABAT')),
  (0.31999999999999995, 4.5621735126025119e-05, 0.58, 0.26, 29.0, 13.0))]

In [73]:
variants_case = sqlContext.sql("SELECT sample_id,chr,pos,ref,alt,gene_symbol,zygosity FROM variantData "+sqlCase+" and pos=9021053")

In [74]:
variants_case.collect()

[Row(sample_id=u'HG03446', chr=u'12', pos=9021053, ref=u'C', alt=u'CGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG03446', chr=u'12', pos=9021053, ref=u'C', alt=u'CGTGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG03446', chr=u'12', pos=9021053, ref=u'C', alt=u'CGTGTGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG03446', chr=u'12', pos=9021053, ref=u'C', alt=u'T', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02952', chr=u'12', pos=9021053, ref=u'C', alt=u'CGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02952', chr=u'12', pos=9021053, ref=u'C', alt=u'CGTGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02952', chr=u'12', pos=9021053, ref=u'C', alt=u'CGTGTGT', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02952', chr=u'12', pos=9021053, ref=u'C', alt=u'T', gene_symbol=u'A2ML1', zygosity=u'Heterozygous'),
 Row(sample_id=u

In [68]:
variants_case.collect()

[Row(sample_id=u'NA19435', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02577', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG02484', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'NA21090', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG03826', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'HG04002', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'NA18634', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Homozygous'),
 Row(sample_id=u'HG02682', chr=u'12', pos=9248332, ref=u'T', alt=u'C', gene_symbol=u'A2M', zygosity=u'Heterozygous'),
 Row(sample_id=u'NA19792', chr=u'12', pos=9248332, ref=u'T

In [63]:
l_blocks

[[[[[[], [0, 3, 4]], [0, 1, 4]], [0, 1, 2]], [1, 2, 3]], [2, 3, 4]]

In [None]:
def expe_gene():
    caseMAF=1.0
    controlMAF=1.0

    pathVariants='/user/hive/warehouse/digest.db/exomes_1000g_p'
    parquetFile = sqlContext.read.parquet(pathVariants)
    parquetFile.registerTempTable("variantData");

    patientsID_all = sqlContext.sql("SELECT distinct sample_id FROM variantData order by sample_id asc").collect()
    genes_all = sqlContext.sql("SELECT distinct gene_symbol FROM variantData order by gene_symbol asc").collect()
    patientsID_all = [patient[0] for patient in patientsID_all]
    genes_all = [gene[0] for gene in genes_all]
    genes_all.pop(0) ##First element is NULL
    
    np.random.seed(133)

    n_genes=10
    n_samples=10

    scale='gene'
    scope='digenic'

    p=100

    
    runtimes=[]
    
    for n_genes in [1000]:
        np.random.shuffle(genes_all)
        genes=toSQLString(genes_all[0:n_genes])
    
        np.random.shuffle(patientsID_all)
        patientsID_case=patientsID_all[0:n_samples]
        patientsID_control=patientsID_all[n_samples:(2*n_samples)]
                     
        patientsID=patientsID_case+patientsID_control
        patientsID_dictionnary=dict(zip(patientsID,range(len(patientsID))))
        
        patientsID_split_index_b = sc.broadcast(len(patientsID_case))
        patientsID_dictionnary_b = sc.broadcast(patientsID_dictionnary)

        controlMAF_b=sc.broadcast(controlMAF)
        caseMAF_b=sc.broadcast(caseMAF)

        sqlCase="where sample_id in "+toSQLString(patientsID_case)+" and gene_symbol in "+genes
        sqlControl="where sample_id in "+toSQLString(patientsID_control)+" and gene_symbol in "+genes

        for i in range(0,5):
            (all_times,scores,ntests)=ranking(sqlCase,sqlControl,scale,scope,p)
            runtimes.append(all_times)
            if all_times[2]>3600:
                break


In [None]:
results=(runtimes,n_variants,sqlCase,sqlControl)
results_rdd=sc.parallelize(results,1)

params=sc._conf.getAll()
app_name=''
for i in range(0,len(params)):
    if params[i][0]=="spark.app.name":
        app_name=params[i][1]

results_rdd.saveAsTextFile("hdfs:/user/yleborgn/"+app_name+"_"+str(time.time()))
   

In [None]:
sc.stop()