## Omics Analysis with Apache Spark
### GTEx eQTLデータ編 第2回

 前回はGTEx eQTLのAdipose_SubcutaneousをApache SparkのDataFrameに取り込みました。今回はいよいよそれを全tissueに拡大します。また、容量も大きくなってきましたので今回はmacではなくAWS上に構築したSpark Clusterを使用します。

### データの下準備
GTExのデータはgzipされていますがApache Sparkで分散処理を行うのにあたりbgzipに変換します。これで分散性能がかなり変わってきます。  
gsutilでファイルを取得  ->  gunzip  ->  bgzip  ->  s3へ配置  
という流れで前準備を行っています。


### 起動
あらかじめ設定しておいたAWSの"Service Catalog"からパラメータを入力し、欲しいサイズのSpark Clusterを起動していきます。  
今回の処理に必要最低限なオプションは以下のとおりです。  

|オプション|役割等|
|---|---|
|--packages| packages以下にコンマ区切りで依存関係にあるパッケージを指定しています。|
|io.projectglow:glow-spark3_2.12:1.1.2, <br> io.delta:delta-core_2.12:1.0.1, |この２行はdeltaを使うために必要です。|
|org.apache.hadoop:hadoop-aws:3.2.0, |s3へアクセスするために必要なものです。|
|--conf spark.driver.memory=12G| driverに12GBのメモリを割り当てています。<br>今回はClusterではないのですがその場合はdriverがexecutorの役割を担うためこのようにしています。|
|--conf spark.serializer=org.apache.spark.serializer.KryoSerializer|serializeにつかうclassを指定。推奨値です。|


In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = ' \
 --conf spark.jars.packages=\
io.projectglow:glow-spark3_2.12:1.1.2,\
io.delta:delta-core_2.12:1.0.1,\
org.apache.hadoop:hadoop-aws:3.2.0, \
 --conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec \
 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
 --master yarn pyspark-shell '

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()#

spark
sc = spark.sparkContext

sc

|命令文|役割等|
|---|---|
|from pyspark import SparkContext, SparkConf, SQLContext<br>from pyspark.sql import SparkSession|SparkSessionとSparkContextをつくるのに必要です。|
|sparkConf = SparkConf()<br>sparkConf.setAppName('GTEx eQTL data test')| Sparkの設定を記述できます。<br>今回は必要な設定は先のPYSPARK_SUBMIT_ARGSに入れています。|
|spark = SparkSession.builder.config(conf = sparkConf).getOrCreate() |Spark Sessionを作成しています。|
|sc = spark._sc|作成したSparkSessionより、SparkContextを得ています。|



### データの変換
前回と同様に、variant_idは chr1_13550_G_A_b38のように複数の情報がひとつになっていてこのままでは利活用に支障があると思われるため、次のような列に変換をすることにします。
また、後で他のtissueも結合しますので新たに「tissue」を追加します。  
スキーマも前回と同じものを使えます。

|列名|内容|
|---|---|
|contigName|chr1|
|start|13550|
|referennceAllele|G|
|alternateAlleles|A|
|tissue|Adipose_Subcutaneous|


以降で使うモジュールをロードし、ファイル名とtissueを定義します。

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import lit,split,col,regexp_replace


ファイルのリストから、inputファイルのリストやoutputファイルのリストを作成しておきます。  
今回はs3とhdfsの両方に出力していますが本来はどちらか一方でよいと思います。  
処理の内容で中間ファイルとして集めるものが多ければhdfsの方が高速ですし、  
そういった要素が少なく、最後に結果を保存するのみであればs3だけでよさそうです。

In [None]:
import subprocess

list_eqtl_files_gz = subprocess.run(["aws s3 ls \
  --recursive s3://[Your buckets containing omics-data-raw] /GTEx_Analysis_v8_QTLs/GTEx_Analysis_v8_eQTL_all_associations/ \
  |grep bgz$|awk '{print \"s3://ut-omics-data-raw/\"$4}'"],
                                stdout=subprocess.PIPE,shell=True)

eqtl_files_gz = list_eqtl_files_gz.stdout.decode('utf-8').split()
# input fileのリスト
with open("gtex_eQTL_paths_in.txt", "w") as f:
    for eqtl_file in eqtl_files_gz:
        f.write(f"{eqtl_file}\n")

# hdfsへのoutput fileのリスト
with open("gtex_eQTL_paths_hdfs_out.txt", "w") as f:
    for eqtl_file in eqtl_files_gz:
        eqtl_file_out = eqtl_file.replace("s3://[Your buckets containing omics-data-raw] ", "/user/beowulf/gtex-delta").replace(".bgz", ".delta")
        f.write(f"{eqtl_file_out}\n")


# すべてのeQTLデータをまとめたDataFrameの保存先
gtex_eqtl_all_delta_s3_path = \
  's3a://[User buckets for omics-data-output ]/gtex-delta/GTEx_Analysis_v8_QTLs/GTEx_Analysis_v8_eQTL_all_associations/all_delta'



リストをもとにbgzファイルを読み込み、前回つくったSchemaを使ってDataFrameに変換します。    
同時に、sparkのunionでつなげていき、1つの大きなDataFrameにしています。  
個々のtissueはそれぞれdelta形式でs3へ保存しています。  

In [None]:
%%time
from pyspark.sql.types import *
from pyspark.sql.functions import lit,split,col,regexp_replace

with open("gtex_eQTL_paths_in.txt") as f:
    eqtl_files = f.read().splitlines()

for gtex_eqtl_tmp_path in eqtl_files:
    name = "GTEx_eQTL_allpairs_" + gtex_eqtl_tmp_path.split(".")[0].split("/")[-1]
    print(name)
    tissue =  gtex_eqtl_tmp_path.split(".")[0].split("/")[-1]
    gtex_eqtl_tmp_delta_hdfs_path = f"gtex-delta/GTEx_Analysis_v8_QTLs/GTEx_Analysis_v8_eQTL_all_associations/{tissue}_delta"
    schema = StructType([
      StructField("gene_id", StringType(), False),
      StructField("variant_id", StringType(), False),
      StructField("tss_distance", IntegerType(), False),
      StructField("ma_samples", IntegerType(), False),
      StructField("ma_count", IntegerType(), False),
      StructField("maf", DoubleType(), False),
      StructField("pval_nominal", DoubleType(), False),
      StructField("slope", DoubleType(), False),
      StructField("slope_se", DoubleType(), False),
    ])
    gtex_eqtl_tmp = spark.read.option("inferSchema",False) \
                    .option("delimiter","\t") \
                    .csv(gtex_eqtl_tmp_path, header=True,schema=schema)
    gtex_eqtl_tmp = gtex_eqtl_tmp.\
        withColumn("tissue",lit(tissue)).\
        withColumn("contigName",split(col("variant_id"), "_").getItem(0)).\
        withColumn("start",split(col("variant_id"), "_").getItem(1).cast(IntegerType())).\
        withColumn("referenceAllele",split(col("variant_id"), "_").getItem(2)).\
        withColumn("alternateAlleles",split(col("variant_id"), "_").getItem(3)).\
        select("tissue",regexp_replace(col("contigName"),"chr","").\
               alias("contigName"),"start","referenceAllele","alternateAlleles","gene_id","variant_id","tss_distance",\
              "ma_samples","ma_count","maf","pval_nominal","slope","slope_se")
    if 'df' in locals():
        df = df.union(gtex_eqtl_tmp)
    else:
        df = gtex_eqtl_tmp

### ディスクへの書き込み、読み込み
##### S3
結合したeQTLデータをdelta形式でs3へ保存します

In [None]:
%%time
df.write.format("delta").mode("overwrite").save(gtex_eqtl_all_delta_s3_path)

s3から読み込む場合は、このようにします。

In [None]:
df_s3 = spark.read.format("delta").load(gtex_eqtl_all_delta_s3_path).cache()

読み込んだDataFrameを確認します

In [None]:
df_s3.show(5)

In [None]:
df_s3.printSchema()

In [None]:
df_s3.rdd.getNumPartitions()

In [None]:
df_s3.count()

### DataFrame操作
読み込んだDataFrameは次のようにFilterなどの操作ができます。  
ここでは8番染色体のみに絞り込み、pval_nominalをsortしています。

In [None]:
%%time
df_s3.filter(df_s3.contigName == 8).orderBy("pval_nominal").show()

filterを追加してmaf < 0.1のみを表示させてみます。

In [None]:
%%time
df_s3.filter(df_s3.contigName == 8).orderBy("pval_nominal").filter(df_s3.maf < 0.1).show()

参考で、2回目以降はメモリに載ってくるので高速になります。

In [None]:
%%time
df_s3.filter(df_s3.contigName == 8).orderBy("pval_nominal").show()