# GenoSpark for Singapore population genotypic frequency

## First, generate the output files with the command syntax below:
```
$ vcftools --gzvcf chr[#].consolidate.eff.PPH.vcf.gz --freq --chr [#] --out chr[#]_analysis
$ bcftools query -f '%CHROM\t%POS\t%ID\n' chr[#].consolidate.eff.PPH.vcf.gz -o chr[#]_rsID
```

#### * You may try freqGenerator.sh to generate the above files

In [1]:
#!/usr/bin/env python3

__author__ = 'mdc_hk'
version = '1.0'

# Description: To build the database on the pyspark DataFrame
# Usage: -
# Example: -

In [2]:
import datetime, multiprocessing, os, re, shutil, sys, subprocess, time, logging

import pyspark.sql.types as typ
from pyspark.sql.functions import lit


# Specify schemas

schema_Freq = typ.StructType([
    typ.StructField("CHROM", typ.IntegerType(), False),
    typ.StructField("POS", typ.IntegerType(), False),
    typ.StructField("N_ALLELES", typ.IntegerType(), False),
    typ.StructField("N_CHR", typ.IntegerType(), False),
    typ.StructField("ALLELE_FREQ_1", typ.StringType(), False),
    typ.StructField("ALLELE_FREQ_2", typ.StringType(), False),
])

schema_rsID = typ.StructType([
    typ.StructField("CHROM", typ.IntegerType(), False),
    typ.StructField("POS", typ.IntegerType(), False),
    typ.StructField("ID", typ.StringType(), True),
])

schema_Freq_DF = typ.StructType([
    typ.StructField("CHROM", typ.IntegerType(), False),
    typ.StructField("POS", typ.IntegerType(), False),
    typ.StructField("N_ALLELES", typ.IntegerType(), False),
    typ.StructField("N_CHR", typ.IntegerType(), False),
    typ.StructField("ALLELE_FREQ_1", typ.StringType(), False),
    typ.StructField("ALLELE_FREQ_2", typ.StringType(), False),
    typ.StructField("ID", typ.StringType(), True),
])


In [21]:
# Setting up File Paths and Lists

from hdfs import Config
client = Config().get_client('dev')

workingFolder_Indian = "SgIndian_vcf/dataFreeze_Feb2013/SNP/biAllele/"

workingFolder_Malay = "SgMalay_vcf/2012_05/snps/"

# Filing number of unique samples found in the working folder...

freqFiles_Indian = [f for f in client.list(workingFolder_Indian) if re.match(r'chr\d+_analysis\.frq', f)]
rsIDFiles_Indian = [f for f in client.list(workingFolder_Indian) if re.match(r'chr\d+_rsID', f)]
freqFiles_Malay = [f for f in client.list(workingFolder_Malay) if re.match(r'chr\d+_analysis\.frq', f)]
rsIDFiles_Malay = [f for f in client.list(workingFolder_Malay) if re.match(r'chr\d+_rsID', f)]

freqFilesID_pre = re.compile(r'(chr\d+)_analysis\.frq')
freqFilesID = []
for file in freqFiles_Indian:
    freqFilesID.append(freqFilesID_pre.findall(file))

print(freqFilesID)

[['chr10'], ['chr11'], ['chr12'], ['chr13'], ['chr14'], ['chr15'], ['chr16'], ['chr17'], ['chr18'], ['chr19'], ['chr1'], ['chr20'], ['chr21'], ['chr22'], ['chr2'], ['chr3'], ['chr4'], ['chr5'], ['chr6'], ['chr7'], ['chr8'], ['chr9']]


In [23]:
# Obtain dataset

# suffixFreqID = ['_analysis.frq', '_rsID']
freqDF_Indian = spark.createDataFrame([], schema_Freq_DF)
freqDF_Malay = spark.createDataFrame([], schema_Freq_DF)

for ID in freqFilesID:
    df1 = spark.read.csv(workingFolder_Indian + ID[0] + "_analysis.frq", header=True, schema=schema_Freq, sep='\t').alias('df1')
    df2 = spark.read.csv(workingFolder_Indian + ID[0] + "_rsID", header=False, schema=schema_rsID, sep='\t').alias('df2')
    freqChrN_working = df2.join(df1, df2.POS == df1.POS).select('df1.*','df2.ID')
    freqDF_Indian = freqDF_Indian.union(freqChrN_working)
    
for ID in freqFilesID:
    df1 = spark.read.csv(workingFolder_Malay + ID[0] + "_analysis.frq", header=True, schema=schema_Freq, sep='\t').alias('df1')
    df2 = spark.read.csv(workingFolder_Malay + ID[0] + "_rsID", header=False, schema=schema_rsID, sep='\t').alias('df2')
    freqChrN_working = df2.join(df1, df2.POS == df1.POS).select('df1.*','df2.ID')
    freqDF_Malay = freqDF_Malay.union(freqChrN_working)
    

In [24]:
freqDF_Indian_working = freqDF_Indian.withColumn("ETHNIC", lit("Indian"))
freqDF_Malay_working = freqDF_Malay.withColumn("ETHNIC", lit("Malay"))
freqDF_working = freqDF_Malay_working.union(freqDF_Indian_working)

In [None]:
freqDF_working.show(10)
#freqDF_working.select("ETHNIC", "ID", "CHROM", "ALLELE_FREQ_1", "ALLELE_FREQ_2").filter("ID == 'rs4109479'").show()

In [None]:
# spark.sql("select * from freqDF_working").show(10)

In [None]:
print('Count of rows: {0}'.format(freqDF_working.count()))
freqDF_working.printSchema()

In [None]:
freqDF_working.select(freqDF_working.ID, freqDF_working.ALLELE_FREQ_1, freqDF_working.ALLELE_FREQ_2).filter(freqDF_working.ID == "rs4109479").show()
freqDF_working.select("ID", "CHROM", "ALLELE_FREQ_1", "ALLELE_FREQ_2").filter("ID == 'rs4109479'").show()
spark.sql("select ID, ALLELE_FREQ_1, ALLELE_FREQ_2 from freqDF_working where ID = 'rs4109479'").show()

In [1]:
# print('Count of rows: {0}'.format(freqDF_working.count()))
# print('Count of distinct rows: {0}'.format(freqDF_working.distinct().count()))
# freqDF_working = freqDF_working.dropDuplicates()
# print('Count of rows: {0}'.format(freqDF_working.count()))
# print('Count of distinct rows: {0}'.format(freqDF_working.distinct().count()))
# print('Count of IDs: {0}'.format(freqDF_working.count()))
# print('Count of distinct IDs: {0}'.format(freqDF_working.select(freqDF_working.ID).distinct().count()))
# freqDF_working.where("ID = '.'").show(10)