In [1]:
chr22RDD =  sc.textFile('data/ALL.chr22.phase3_1000.vcf.bz2', 4)

In [2]:
chr22RDD.take(100)

[u'##fileformat=VCFv4.1',
 u'##FILTER=<ID=PASS,Description="All filters passed">',
 u'##fileDate=20150218',
 u'##reference=ftp://ftp.1000genomes.ebi.ac.uk//vol1/ftp/technical/reference/phase2_reference_assembly_sequence/hs37d5.fa.gz',
 u'##source=1000GenomesPhase3Pipeline',
 u'##contig=<ID=1,assembly=b37,length=249250621>',
 u'##contig=<ID=2,assembly=b37,length=243199373>',
 u'##contig=<ID=3,assembly=b37,length=198022430>',
 u'##contig=<ID=4,assembly=b37,length=191154276>',
 u'##contig=<ID=5,assembly=b37,length=180915260>',
 u'##contig=<ID=6,assembly=b37,length=171115067>',
 u'##contig=<ID=7,assembly=b37,length=159138663>',
 u'##contig=<ID=8,assembly=b37,length=146364022>',
 u'##contig=<ID=9,assembly=b37,length=141213431>',
 u'##contig=<ID=10,assembly=b37,length=135534747>',
 u'##contig=<ID=11,assembly=b37,length=135006516>',
 u'##contig=<ID=12,assembly=b37,length=133851895>',
 u'##contig=<ID=13,assembly=b37,length=115169878>',
 u'##contig=<ID=14,assembly=b37,length=107349540>',
 u'##c

In [3]:
header = chr22RDD.filter(lambda line: line.startswith("#CHROM")).map(lambda line:line.split()).first()
print(header[0:10])
columnNames = header[0:9]
sampleNames = header[9:]
print(sampleNames[0:10])

[u'#CHROM', u'POS', u'ID', u'REF', u'ALT', u'QUAL', u'FILTER', u'INFO', u'FORMAT', u'HG00096']
[u'HG00096', u'HG00097', u'HG00099', u'HG00100', u'HG00101', u'HG00102', u'HG00103', u'HG00105', u'HG00106', u'HG00107']


In [4]:
from pyspark.ml.linalg import Vectors, VectorUDT

encodeGenotype = lambda genotype: float(sum( allele!='0' for allele in  genotype.split('|')))
encodeGenotypes = lambda genotypes: Vectors.dense(map(encodeGenotype, genotypes))

def splitGenotypeLine(line):
    columns = line.split()
    return columns[0:9] + [ encodeGenotypes(columns[9:]) ]

df = spark.createDataFrame(chr22RDD.filter(lambda line: not line.startswith("#")).map(splitGenotypeLine), 
                          schema = columnNames + ['encoded_genotypes'])
df.cache()

df.count()

1000

In [5]:
df.printSchema()

root
 |-- #CHROM: string (nullable = true)
 |-- POS: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- REF: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- QUAL: string (nullable = true)
 |-- FILTER: string (nullable = true)
 |-- INFO: string (nullable = true)
 |-- FORMAT: string (nullable = true)
 |-- encoded_genotypes: vector (nullable = true)



In [6]:
display(df.limit(10))

Unnamed: 0,#CHROM,POS,ID,REF,ALT,QUAL,FILTER,INFO,FORMAT,encoded_genotypes
0,22,16050075,rs587697622,A,G,100,PASS,AC=1;AF=0.000199681;AN=5008;NS=2504;DP=8012;EA...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,22,16050115,rs587755077,G,A,100,PASS,AC=32;AF=0.00638978;AN=5008;NS=2504;DP=11468;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,22,16050213,rs587654921,C,T,100,PASS,AC=38;AF=0.00758786;AN=5008;NS=2504;DP=15092;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,22,16050319,rs587712275,C,T,100,PASS,AC=1;AF=0.000199681;AN=5008;NS=2504;DP=22609;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,22,16050527,rs587769434,C,A,100,PASS,AC=1;AF=0.000199681;AN=5008;NS=2504;DP=23591;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
5,22,16050568,rs587638893,C,A,100,PASS,AC=2;AF=0.000399361;AN=5008;NS=2504;DP=21258;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,22,16050607,rs587720402,G,A,100,PASS,AC=5;AF=0.000998403;AN=5008;NS=2504;DP=20274;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,22,16050627,rs587593704,G,T,100,PASS,AC=2;AF=0.000399361;AN=5008;NS=2504;DP=21022;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,22,16050646,rs587670191,G,T,100,PASS,AC=1;AF=0.000199681;AN=5008;NS=2504;DP=22073;E...,GT,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,22,16050654,esv3647175;esv3647176;esv3647177;esv3647178,A,"<CN0>,<CN2>,<CN3>,<CN4>",100,PASS,"AC=9,87,599,20;AF=0.00179712,0.0173722,0.11960...",GT,"[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."


In [7]:
from pyspark.ml.clustering import KMeans
kMeans = KMeans(featuresCol='encoded_genotypes', k=10, initMode='random')
kMeansModel = kMeans.fit(df)

In [22]:
import pandas as pd
clusterCentersPD = pd.DataFrame.from_records(kMeansModel.clusterCenters()).T
clusterCentersPD.insert(0, 'Sample ID', sampleNames)
clusterCentersPD


Unnamed: 0,Sample ID,0,1,2,3,4,5,6,7,8,9
0,HG00096,0.000000,0.000000,0.000000,0.003282,0.00,0.000000,0.5625,1.00,0.863636,0.0
1,HG00097,0.000000,0.083333,0.800000,0.006565,0.00,0.000000,0.1250,0.00,1.409091,0.0
2,HG00099,0.000000,0.000000,0.800000,0.006565,0.00,0.000000,0.1250,0.00,1.136364,0.0
3,HG00100,0.666667,0.000000,0.000000,0.002188,0.00,0.000000,1.0000,0.00,1.727273,1.0
4,HG00101,0.777778,0.000000,0.000000,0.005470,0.00,0.000000,1.0000,0.00,1.954545,1.0
5,HG00102,0.000000,0.083333,0.933333,0.003282,0.00,0.000000,0.1250,0.00,1.363636,0.0
6,HG00103,0.000000,0.000000,0.000000,0.004376,0.00,0.000000,0.9375,1.00,1.227273,0.0
7,HG00105,0.000000,0.166667,0.000000,0.006565,0.00,0.000000,0.6875,0.00,0.909091,0.0
8,HG00106,0.666667,0.000000,0.333333,0.006565,0.00,0.000000,0.8125,0.00,1.727273,1.0
9,HG00107,0.888889,0.000000,0.000000,0.006565,0.00,0.000000,0.7500,1.00,1.727273,1.0


In [23]:
clusterCentersDF = spark.createDataFrame(clusterCentersPD)
clusterCentersDF.printSchema()

root
 |-- Sample ID: string (nullable = true)
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)



In [24]:
clusterCentersDF.coalesce(1).write.csv(path='output/cluster_centers.csv', mode='overwrite', header=True)