In [None]:
import pandas
import re
import tempfile
import os
import gzip
import shutil
import subprocess
from pyspark.sql.functions import udf, col, lit
from pyspark.sql import functions as F

"""
Import Parquet As a DataFrame
"""

##Read in parquet file from public S3 bucket
parquet_s3 = "s3://steichenetalpublicdata/analyzed_sequences/parquet"
df_spark = spark.read.parquet(parquet_s3)

##Verify count 
df_spark.count()

## Make a query class
The query class can hold our spark query until it's time to execute

In [None]:
class Query():
    
    '''An example query class to hold query parameters'''
    
    def __init__(self,q_name,length_min='',length_max='',v_fam="",v_gene="",d_gene="",j_gene="",regex="",ez_donor=""):
        self.query_name = q_name
        self.v_fam = v_fam
        self.v_gene = v_gene
        self.j_gene = j_gene
        self.d_gene = d_gene
        self.ez_donor = ez_donor
        
        if not length_min:
            raise Exception("Minimum length must be supplied")
        self.length_min = length_min
        self.length_max = length_max
        self.regular_expression = regex
    
    
    
    def apply(self,df):
        
        '''Apply function will take in spark dataframe and apply query parameters to it if they exist
        
           Returns a filtered dataframe
        '''
        self.queried_dataframe = ""
        
        ##Lets get length
        
        self.queried_dataframe = df.filter(F.length(df.cdr3_aa) > self.length_min)
        self.queried_dataframe = df.filter(F.length(df.cdr3_aa) < self.length_max)
        
        ##If the rest of these were specified, add them to the filter
        if self.v_fam:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.v_fam == self.v_fam)
        
        if self.v_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.v_gene == self.v_gene)
     
        if self.d_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.d_gene == self.d_gene)       
        
        if self.j_gene:
            self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.j_gene == self.j_gene)       
        

        if self.regular_expression:
             self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.cdr3_aa.rlike(self.regular_expression))
        
        if self.ez_donor:
             self.queried_dataframe = self.queried_dataframe.filter(self.queried_dataframe.ez_donor == self.ez_donor)
            
        print("Found {} sequences".format(self.queried_dataframe.count()))
        return self.queried_dataframe

## Frequency 1: BCRs have a DR motif at the middle of the 19 - 30 aa HCDR3

In [None]:
# make query class and pass the input object from above to apply
my_query_1 = Query('DR',length_min=18,length_max=31,regex=r'........DR........$')
queried1_df = my_query_1.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas1_df = queried1_df.select('_id','ez_donor','d_gene').toPandas()
counts = pandas1_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 2: BCRs have a RD motif at the middle of the 19 - 30 aa HCDR3

In [None]:
# make query class and pass the input object from above to apply
my_query_2 = Query('RD',length_min=18,length_max=31,regex=r'........RD........$')
queried2_df = my_query_2.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas2_df = queried2_df.select('_id','ez_donor','d_gene').toPandas()
counts = pandas2_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 3: Precursors use same D genes in the same reading frame, match the regular expression, and contain D genes flanked by an equal or greater number of amino acids compared to DA03E17 BCRs with 19–30 amino acid HCDR3

In [None]:
# make query class and pass the input object from above to apply
my_query_3 = Query('DA03E17_all',length_min=18,length_max=31, d_gene="IGHD3-10",regex=r'....GSG............')
queried3_df = my_query_3.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas3_df = queried3_df.select('_id','ez_donor').toPandas()
counts = pandas3_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 4: Precursors met the Frequency 3 criteria, with the DR motif positioned at the same location relative to the D gene as found in DA03E17

In [None]:
# make query class and pass the input object from above to apply
my_query_4 = Query('DA03E17_DR',length_min=18,length_max=31, d_gene="IGHD3-10",regex=r'....GSG..DR........')
queried4_df = my_query_4.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas4_df = queried4_df.select('_id','ez_donor').toPandas()
counts = pandas4_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 5: Precursors use same D genes in the same reading frame, match the regular expression, and contain D genes flanked by an equal or greater number of amino acids compared to 1G05 BCRs with 19–30 amino acid HCDR3

In [None]:
# make query class and pass the input object from above to apply
my_query_5 = Query('1G05_all',length_min=18,length_max=31, d_gene="IGHD5-12",regex=r'....YSGYD..........')
queried5_df = my_query_5.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas5_df = queried5_df.select('_id','ez_donor').toPandas()
counts = pandas5_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 6: Precursors met the Frequency 5 criteria, with the DR motif positioned at the same location relative to the D gene as found in 1G05

In [None]:
# make query class and pass the input object from above to apply
my_query_6 = Query('1G05_DR',length_min=18,length_max=31, d_gene="IGHD5-12",regex=r'....YSGYDR.........')
queried6_df = my_query_6.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas6_df = queried6_df.select('_id','ez_donor').toPandas()
counts = pandas6_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 7: Precursors use same D genes in the same reading frame, match the regular expression, and contain D genes flanked by an equal or greater number of amino acids compared to Z2B3 BCRs with 25–30 amino acid HCDR3

In [None]:
# make query class and pass the input object from above to apply
my_query_7A = Query('DA03E17_all',length_min=24,length_max=31, d_gene="IGHD5-18",regex=r'.....DT.MV...............')
my_query_7B = Query('DA03E17_all',length_min=24,length_max=31, d_gene="IGHD5-5",regex=r'.....DT.MV...............')
queried7A_df = my_query_7A.apply(df_spark)
queried7B_df = my_query_7B.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas7A_df = queried7A_df.select('_id','ez_donor').toPandas()
pandas7B_df = queried7B_df.select('_id','ez_donor').toPandas()
pandas7_df = pandas.concat([pandas7A_df, pandas7B_df], axis=0)
counts = pandas7_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]

## Frequency 8: Precursors met the Frequency 7 criteria, with the DR motif positioned at the same location relative to the D gene as found in Z2B3

In [None]:
# make query class and pass the input object from above to apply
my_query_8A = Query('DA03E17_all',length_min=24,length_max=31, d_gene="IGHD5-18",regex=r'.....DT.MVDR.............')
my_query_8B = Query('DA03E17_all',length_min=24,length_max=31, d_gene="IGHD5-5",regex=r'.....DT.MVDR.............')
queried8A_df = my_query_8A.apply(df_spark)
queried8B_df = my_query_8B.apply(df_spark)

# Turn it into a pandas dataframe for frequency calculation
pandas8A_df = queried8A_df.select('_id','ez_donor').toPandas()
pandas8B_df = queried8B_df.select('_id','ez_donor').toPandas()
pandas8_df = pandas.concat([pandas8A_df, pandas8B_df], axis=0)
counts = pandas8_df.groupby('ez_donor').count().rename({'_id':'count'},axis=1).sort_values('count')
df_count = df_spark.groupby('ez_donor').count().toPandas()
merged = counts.reset_index().merge(df_count, on='ez_donor')
merged['normal'] = merged['count_x']/merged['count_y']*1000000
merged.sort_values('ez_donor')[['ez_donor','normal']]