In [2]:
import pyspark.pandas as ps
import json

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.driver.memory", "4g")  # Set to your desired heap size

spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()


## Load genotypes

In [3]:
vcfLikePath='./als_1kg.exon.num.txt'

genotypes = ps.read_csv(vcfLikePath, sep='\t', index_col=['chrom', 'position'])

In [4]:
genotypes.shape

                                                                                

(654394, 5923)

In [5]:
genotypes.columns

Index(['Gene', 'Func.knownGene', 'Func.refGene', 'Func.ensGene', 'ExonicFunc',
       'AAChange.knownGene', 'AAChange.refGene', 'AAChange.ensGene', 'FILTER',
       'REF',
       ...
       'CGND-HDA-02442', 'CGND-HDA-02438', 'CGND-HDA-02445', 'CGND-HDA-02693',
       'CGND-HDA-02446', 'CGND-HDA-02444', 'CGND-HDA-02439', 'CGND-HDA-02462',
       'CGND-HDA-02688', 'CGND-HDA-02440'],
      dtype='object', length=5923)

## Load gene sets

In [6]:
def readJsonFile(path):
    with open(path) as f:
        data = json.load(f)
    return data

In [7]:
geneSets = {
    'alsod': ps.read_csv('./alsodList.csv', sep='\t')['Gene symbol'].tolist(),
    'cardiac': readJsonFile('./BIOCARTA_ALK_PATHWAY.v2023.1.Hs.json')['BIOCARTA_ALK_PATHWAY']['geneSymbols'] 
        + readJsonFile('./BIOCARTA_ACE2_PATHWAY.v2023.1.Hs.json')['BIOCARTA_ACE2_PATHWAY']['geneSymbols'] 
        + readJsonFile('./BIOCARTA_AT1R_PATHWAY.v2023.1.Hs.json')['BIOCARTA_AT1R_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_CARDIACEGF_PATHWAY.v2023.1.Hs.json')['BIOCARTA_CARDIACEGF_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_FLUMAZENIL_PATHWAY.v2023.1.Hs.json')['BIOCARTA_FLUMAZENIL_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_GCR_PATHWAY.v2023.1.Hs.json')['BIOCARTA_GCR_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_HDAC_PATHWAY.v2023.1.Hs.json')['BIOCARTA_HDAC_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_NFAT_PATHWAY.v2023.1.Hs.json')['BIOCARTA_NFAT_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_PGC1A_PATHWAY.v2023.1.Hs.json')['BIOCARTA_PGC1A_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_PITX2_PATHWAY.v2023.1.Hs.json')['BIOCARTA_PITX2_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_AMI_PATHWAY.v2023.1.Hs.json')['BIOCARTA_AMI_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_P53HYPOXIA_PATHWAY.v2023.1.Hs.json')['BIOCARTA_P53HYPOXIA_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_NO1_PATHWAY.v2023.1.Hs.json')['BIOCARTA_NO1_PATHWAY']['geneSymbols']
        + readJsonFile('./BIOCARTA_HIF_PATHWAY.v2023.1.Hs.json')['BIOCARTA_HIF_PATHWAY']['geneSymbols']
        }



In [8]:
for set in geneSets:
    print(geneSets[set])

['AGT', 'ALAD', 'ALS2', 'ALS3', 'ALS7', 'ANG', 'ANXA11', 'APEX1', 'APOE', 'AR', 'ARHGEF28', 'ARPP21', 'ATXN1', 'ATXN2', 'B4GALT6', 'BCL11B', 'BCL6', 'C9orf72', 'CAMTA1', 'CAV1', 'CAV2', 'CCNF', 'CCS', 'CDH13', 'CDH22', 'CFAP410', 'CHCHD10', 'CHGB', 'CHMP2B', 'CNTF', 'CNTN4', 'CNTN6', 'CRIM1', 'CRYM', 'CSNK1G3', 'CST3', 'CX3CR1', 'CYP2D6', 'DAO', 'DCTN1', 'DIAPH3', 'DISC1', 'DNAJC7', 'DNMT3A', 'DNMT3B', 'DOC2B', 'DPP6', 'DYNC1H1', 'EFEMP1', 'ELP3', 'ENAH', 'EphA3', 'EPHA4', 'ERBB4', 'ERLIN1', 'EWSR1', 'FEZF2', 'FGGY', 'FIG4', 'FUS', 'GARS', 'GLE1', 'GLT8D1', 'GPX3', 'GRB14', 'GRN', 'HEXA', 'HFE', 'HNRNPA1', 'HNRNPA2B1', 'ITPR2', 'KDR', 'KIF5A', 'KIFAP3', 'LIF', 'LIPC', 'LMNB1', 'LOX', 'LUM', 'MAOB', 'MAPT', 'MATR3', 'MOBP', 'MTND2P1', 'NAIP', 'NEFH', 'NEFL', 'NEK1', 'NETO1', 'NIPA1', 'NT5C1A', 'ODR4', 'OGG1', 'OMA1', 'OPTN', 'PARK7', 'PCP4', 'PFN1', 'PLEKHG5', 'PNPLA6', 'PON1', 'PON2', 'PON3', 'PRPH', 'PSEN1', 'PVR', 'RAMP3', 'RBMS1', 'RFTN1', 'RNASE2', 'RNF19A', 'SARM1', 'SCFD1', 'SCN7

## Filter gene sets

In [9]:
filteredGeneSets = {}
for setName in geneSets:
    filteredGeneSets[setName] = genotypes[(genotypes['Gene'].isin(geneSets[setName]) & (genotypes['ExonicFunc'] == 'nonsynonymous_SNV'))]


In [11]:
import subprocess

for setName in filteredGeneSets:
    print(f"{setName}: {filteredGeneSets[setName].shape}")
    filteredGeneSets[setName].reset_index().to_csv(f'./{setName}Genotypes', sep="\t", index=False)
    
   # Construct and run the concatenation command with awk to remove redundant headers
    cat_command = f"awk '(NR == 1) || (FNR > 1)' ./{setName}Genotypes/part*.csv > ./{setName}Genotypes.csv"
    subprocess.run(cat_command, shell=True, check=True)
    
    # Construct and run the removal command
    rm_command = f"rm -r ./{setName}Genotypes"
    subprocess.run(rm_command, shell=True, check=True)



                                                                                

alsod: (3021, 5923)


23/10/08 15:37:09 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
                                                                                

cardiac: (3319, 5923)


23/10/08 15:38:49 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/10/08 15:39:00 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 436)]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.invoke.DirectMethodHandle.allocateInstance(DirectMethodHandle.java:520)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.newInvokeSpecial(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:476)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:405)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
	at org.apache.spark.sql.catalyst.expre

Py4JError: py4j does not exist in the JVM

In [10]:
import subprocess

filteredGeneSets['cardiac'].reset_index().to_csv(f'./cardiacGenotypes', sep="\t", index=False)
# Construct and run the concatenation command with awk to remove redundant headers
cat_command = f"awk '(NR == 1) || (FNR > 1)' ./cardiacGenotypes/part*.csv > ./cardiacGenotypes.csv"
subprocess.run(cat_command, shell=True, check=True)

# Construct and run the removal command
rm_command = f"rm -r ./cardiacGenotypes"
subprocess.run(rm_command, shell=True, check=True)

23/10/08 15:34:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/10/08 15:34:09 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
                                                                                

CompletedProcess(args='rm -r ./cardiacGenotypes', returncode=0)