In [None]:
import pandas
import re
import tempfile
import os
import gzip
import shutil
import subprocess
from pyspark.sql.functions import udf, col, lit
from pyspark.sql import functions as F

"""
Import Parquet As a DataFrame
"""

##Read in parquet file from public S3 bucket
parquet_s3 = "s3://steichenetalpublicdata/analyzed_sequences/parquet"
df_spark = spark.read.parquet(parquet_s3)

##Verify count 
df_spark.count()

## Make a query class
The query class can hold our spark query until it's time to execute

In [None]:
class Query():
    
    '''An example query class to hold query parameters'''
    
    def __init__(self,q_name,length='',v_fam="",v_gene="",d_gene="",j_gene="",regex="",ez_donor=""):
        self.query_name = q_name
        self.v_fam = v_fam
        self.v_gene = v_gene
        self.j_gene = j_gene
        self.d_gene = d_gene
        self.ez_donor = ez_donor
        
        if not length:
            raise Exception("Length must be supplied")
        self.length = length
        self.regular_expression = regex
    
    
    
    def apply(self,df):
        
        '''Apply function will take in spark dataframe and apply query parameters to it if they exist
        
           Returns a filtered dataframe
        '''
        self.queried_dataframe = ""
        
        ##Lets get length
        
        self.queried_dataframe = df.filter(F.length(df.cdr3_aa) > self.length)
        
        ##If the rest of these were specified, add them to the filter
        if self.v_fam:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.v_fam == self.v_fam)
        
        if self.v_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.v_gene == self.v_gene)
     
        if self.d_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.d_gene == self.d_gene)       
        
        if self.j_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.j_gene == self.j_gene)       
        

        if self.regular_expression:
             self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.cdr3_aa.rlike(self.regular_expression))
        
        if self.ez_donor:
             self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.ez_donor == self.ez_donor)
            
        print("Found {} sequences".format(self.queried_dataframe.count()))
        return self.queried_dataframe

## Frequency 1: ≥22

In [None]:
# make query class and pass the input object from above to apply
my_query_1 = Query('BG18_search.1',length=21)
queried1_df = my_query_1.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas1_df = queried1_df.select('_id','ez_donor').toPandas()
counts = pandas1_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 2: ≥22, D3-3

In [None]:
# make query class and pass the input object from above to apply
my_query_2 = Query('BG18_search.2',d_gene='IGHD3-3',length=21)
queried2_df = my_query_2.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas2_df = queried2_df.select('_id','ez_donor').toPandas()
counts = pandas2_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 3: ≥22, D3-3, FGV anywhere

In [None]:
# make query class and pass the input object from above to apply
my_query_3 = Query('BG18_search.3',d_gene='IGHD3-3',length=21, regex=r'FGV')
queried3_df = my_query_3.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas3_df = queried3_df.select('_id','ez_donor').toPandas()
counts = pandas3_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 4: ≥22, D3-3, FGV starting at position 7, 8, 9

In [None]:
# make query class and pass the input object from above to apply
my_query_4_7 = Query('BG18_search.4.7',d_gene='IGHD3-3',length=21, regex=r'^......FGV')
my_query_4_8 = Query('BG18_search.4.8',d_gene='IGHD3-3',length=21, regex=r'^.......FGV')
my_query_4_9 = Query('BG18_search.4.9',d_gene='IGHD3-3',length=21, regex=r'^........FGV')
queried4_7_df = my_query_4_7.apply(df_spark)
queried4_8_df = my_query_4_8.apply(df_spark)
queried4_9_df = my_query_4_9.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas47_df = queried4_7_df.select('_id','ez_donor').toPandas()
pandas48_df = queried4_8_df.select('_id','ez_donor').toPandas()
pandas49_df = queried4_9_df.select('_id','ez_donor').toPandas()

pandas4_df = pandas.concat([pandas47_df,pandas48_df,pandas49_df])

counts = pandas4_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 5: ≥22, D3-3, FGV starting at position 7, 8, 9, E at position 7 past FGV (example:.......FGV....E)

In [None]:
# make query class and pass the input object from above to apply
my_query_5_7 = Query('BG18_search.5.7',d_gene='IGHD3-3',length=21, regex=r'^......FGV....E')
my_query_5_8 = Query('BG18_search.5.8',d_gene='IGHD3-3',length=21, regex=r'^.......FGV....E')
my_query_5_9 = Query('BG18_search.5.9',d_gene='IGHD3-3',length=21, regex=r'^........FGV....E')
queried5_7_df = my_query_5_7.apply(df_spark)
queried5_8_df = my_query_5_8.apply(df_spark)
queried5_9_df = my_query_5_9.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas57_df = queried5_7_df.select('_id','ez_donor').toPandas()
pandas58_df = queried5_8_df.select('_id','ez_donor').toPandas()
pandas59_df = queried5_9_df.select('_id','ez_donor').toPandas()

pandas5_df = pandas.concat([pandas57_df,pandas58_df,pandas59_df])

counts = pandas5_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 6: ≥22, D3-3, FGV anywhere, E at position 7 past FGV (example:.......FGV....E)

In [None]:
# make query class and pass the input object from above to apply
my_query_6 = Query('BG18_search.6',d_gene='IGHD3-3',length=21, regex=r'FGV....E')
queried6_df = my_query_6.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas6_df = queried6_df.select('_id','ez_donor').toPandas()
counts = pandas6_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 7: D3-3

In [None]:
# make query class and pass the input object from above to apply
my_query_7 = Query('BG18_search.7',d_gene='IGHD3-3',length=1)
queried7_df = my_query_7.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas7_df = queried7_df.select('_id','ez_donor').toPandas()
counts = pandas7_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]