In [None]:
import dxpy #是 DNAnexus 平台的官方 Python SDK。在 Notebook 中直访问、管理DNAnexus 上的项目和数据文件。
import dxdata #是 DNAnexus 提供的 数据接口层（data access layer）。它主要用于加载大型数据集
import pandas as pd #处理小规模表格数据（比如几千行）
import pyspark #是 Apache Spark 的 Python 接口（API）。在Python环境里使用Spark 的分布式计算功能。
import re

In [54]:
###############################phenotype data##############################
#a. Initialize Spark
sc = pyspark.SparkContext() #启动 Spark 的计算引擎（SparkContext
spark = pyspark.sql.SparkSession(sc) #创建一个 SparkSession 对象，它是你和 Spark 交互的"入口”。

#b. Load dataset description：
dispensed_dataset = dxpy.find_one_data_object( typename = "Dataset", name = "app*.dataset",folder = "/",name_mode = "glob")#在你的 DNAnexus 项目里搜索符合条件的dataset
dispensed_dataset_id = dispensed_dataset["id"] #提取出那个数据集的唯一 ID
dataset = dxdata.load_dataset(id = dispensed_dataset_id)#加载这个数据集
dir(dataset)#查看这个数据集的所有属性与方法
dataset.entities_by_name #查看包含所有的entity

#c. Select entity and load cohort
participant = dataset['participant'] #从dataset取出名为participant entity(就是一张表)。
participant.fields #可以看到所有的field ID
case = dxdata.load_cohort("urticaria_case") #从 DNAnexus 项目中加载名队列（cohort）对象。
cont = dxdata.load_cohort("urticaria_control") # 加载对照组。

#d. Select fields
#要从 participant对象中提取的字段，字段可以在建立cohort时看到。
field_ids = ['31', '22001','22006','22019','22021','21022','41270']

#function用来找到每个field id的field name
def fields_for_id(field_id):
    from distutils.version import LooseVersion
    field_id = str(field_id)
    fields = participant.find_fields(name_regex=r'^p{}(_i\d+)?(_a\d+)?$'.format(field_id))
    return sorted(fields, key=lambda f: LooseVersion(f.name))

# 找出所有field_ids对应的field name
fields = [participant.find_field(name='eid')] + [fields_for_id(f)[0] for f in field_ids] + [participant.find_field(name='p20160_i0')]

#转化为可读表格
field_description = pd.DataFrame({
    'Field': [f.name for f in fields],
    'Title': [f.title for f in fields],
    'Coding' : [f.coding.codes if f.coding is not None else '' for f in fields]
})

#e. Retrieve fields and concatenate cohorts
case_df = participant.retrieve_fields(fields = fields, filter_sql = case.sql, engine = dxdata.connect()).toPandas()
#提取病例组的参与者数据
cont_df = participant.retrieve_fields(fields = fields, filter_sql = cont.sql, engine = dxdata.connect(dialect = 'hive+pyspark', connect_args = {'config': {'spark.kryoserializer.buffer.max': '256m', 'spark.sql.autoBroadcastJoinThreshold':'-1'}
})).toPandas()
#提取对照组的参与者数据，并且用Spark方式来执行。
cont_only = cont_df[~cont_df['eid'].isin(case_df['eid'])]#在提取对照组的cohort时，把case的患者放进来了，所以增加了这步
df = pd.concat([case_df, cont_only])
df.shape

#f. Create phenotype variable
#在 df 里新建了一列叫 urticaria_cc，并且给这一列的所有行都赋初值为 0。
df['urticaria_cc'] = 0
#把那些在 case_df里的参与者标记为 1，其他保持 0。
df.loc[df.eid.isin(case_df.eid),'urticaria_cc'] = 1
#统计列中每个取值（0 和 1）出现的次数。
df.urticaria_cc.value_counts()
df.head()

#g. filter samples
df_qced = df [
(df['p31'] == df['p22001']) & #Check gender and genetic sex are the same
(df['p22006'] == 1)& #Check participant has white british ancestry
(df['p22019'].isnull())& # Check No sex chromosome aneuploidy
(df['p22021'] == 0) #No kinship found
]
df_qced.urticaria_cc.value_counts() #查看筛选后的participant数量

#h. select and rename phenotype and covariate columns
df_qced = df_qced.rename(columns={
    'eid': 'IID',
    'p31': 'sex',
    'p21022': 'age',
    'p20160_i0': 'ever_smoked',
    'p22006': 'ethnic_group',
    'p22019': 'sex_chromosome_aneuploidy',
    'p22021': 'kinship_to_other_participants'
}) #重命名列

df_qced['FID'] = df_qced['IID'] #新增FID列，Regenie要求输入表中必须包含 FID 和 IID 两列
df_phenotype = df_qced[['FID', 'IID', 'urticaria_cc', 'sex', 'age', 'ever_smoked']] #从 QC 过的数据中挑出真正要用于 GWAS 的关键变量，生成最终的 phenotype 表。

df_qced.head()
df_qced.shape

(246311, 11)

In [58]:
################Select only samples that have WES data available##############
# Get WES（.fam 是 PLINK 格式的一种文件，主要存放样本的家族结构和性别等基本信息。）
exome_folder = 'Population level exome OQFE variants, PLINK format - final release'#再此用的是final release，可以有自由选择。
exome_field_id = '23158' #这个文件夹里所有文件的field id都是23158
output_dir = '/Data/'

#只读了c1_b0_v1.fam，是因为所有的.fam文件都是一样的，可以查看md5来验证,详见最下方。
path_to_family_file = f'/mnt/project/Bulk/Exome sequences/{exome_folder}/ukb{exome_field_id}_c1_b0_v1.fam'
plink_fam_df = pd.read_csv(path_to_family_file,  sep=r'\s+', dtype='object',                           
                           names = ['FID','IID','Father ID','Mother ID', 'sex', 'Pheno'], engine='python')

# Intersect the phenotype file and the WES .fam file
# to generate phenotype DataFrame for the participants
urticaria_wes_df = df_phenotype.join(plink_fam_df.set_index('IID'), on='IID', rsuffix='_fam', how='inner')

# Drop unuseful columns from .fam file
urticaria_wes_df.drop(
    columns=['FID_fam','Father ID','Mother ID','sex_fam', 'Pheno'], axis=1, inplace=True, errors='ignore')

# Write phenotype files to a TSV file
urticaria_wes_df.to_csv('urticaria_wes.phe', sep='\t', na_rep='NA', index=False, quoting=3)
urticaria_wes_df.shape

(237209, 6)

In [59]:
############### Upload the geno-pheno intersect phenotype file back to the RAP project###########
%%bash -s "$output_dir"
dx upload urticaria_wes.phe -p --path $1 --brief

file-J3xZ4Q8J3g3XFjKvf9Gp3YVP


In [49]:
#校验文件fam文件是否一致
import glob, pandas as pd, hashlib

fam_paths = sorted(glob.glob(f'/mnt/project/Bulk/Exome sequences/{exome_folder}/ukb{exome_field_id}_c*_b0_v1.fam'))
# 读取第一个 fam 作为样本清单
plink_fam_df = pd.read_csv(
    fam_paths[0],
    sep=r'\s+',       #空格和tab都可以识别
    dtype='object',
    names=['FID','IID','Father ID','Mother ID','sex','Pheno']
)

# 一致性校验
def md5(p): 
    with open(p, 'rb') as f: 
        return hashlib.md5(f.read()).hexdigest()

assert len({md5(p) for p in fam_paths}) == 1, "Not all .fam files are identical!"