# Odczyty uliniowione

W tej części zajmiemy się analiża uliniowionych odczytów (BAM) poprzez platformę Spark. Tym razem zamiast korzystać z API DataFrame posłużymy się językiem SQL.

In [2]:
! gsutil ls gs://edugen-lab-${USER}2/bam

gs://edugen-lab-tgambin2/bam/mother.bam


## Przygotowanie sesji Spark
Zainicjowanie sesji Spark oraz stworzenie schematu bazy danych z której będziemy korzystać.

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

In [8]:
import os                               # moduł OS języka Python
user_name = os.environ.get('USER')      # pobieramy zmienną środowiskową USER
bucket = f"gs://edugen-lab-{user_name}2" # konstruujemy sciezke dostepowa do pliku
print(bucket)

gs://edugen-lab-tgambin2


Będziemy korzystać z modułu Spark SQL. Możemy nasze dane traktować jako dane tabelaryczne. Zdefiniujmy tabelę.
Tym razem korzystamy z DataSource -> BAMDataSource

In [9]:
table_bam =  'alignments'

alignments_path = f"{bucket}/bam/mother.bam"

spark.sql(f'DROP TABLE IF EXISTS {table_bam}')
spark.sql(f'CREATE TABLE IF NOT EXISTS {table_bam} \
USING org.biodatageeks.sequila.datasources.BAM.BAMDataSource \
OPTIONS(path "{alignments_path}")')


DataFrame[]

In [10]:
spark.sql(f"describe {table_bam}").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
|sample_id|   string|   null|
|    qname|   string|   null|
|     flag|      int|   null|
|   contig|   string|   null|
|      pos|      int|   null|
|pos_start|      int|   null|
|  pos_end|      int|   null|
|     mapq|      int|   null|
|    cigar|   string|   null|
|    rnext|   string|   null|
|    pnext|      int|   null|
|     tlen|      int|   null|
|      seq|   string|   null|
|     qual|   string|   null|
|   tag_AM|      int|   null|
|   tag_AS|      int|   null|
|   tag_BC|   string|   null|
|   tag_BQ|   string|   null|
|   tag_BZ|   string|   null|
|   tag_CB|   string|   null|
+---------+---------+-------+
only showing top 20 rows



Żeby dostać się do danych w tabeli, bedziemy korzystać z język SQL (standard ANSI). 
Struktura polecen SQL jest nastepująca: 

```sql
SELECT kolumny
FROM tabela
WHERE warunki
GROUP BY wyrazenia grupujace
HAVING warunki na grupy
ORDER BY wyrazenie sortujące
```
Zapytanie może mieć podzapytania zagnieżdzone. W zapytaniu można korzystać ze złączeń z innymi tabeli poprzez JOIN.

# Wybór kolumn (klauzula SELECT)

In [11]:
spark.sql(f"select * from {table_bam}").show()

+---------+--------------------+----+------+--------+---------+--------+----+------------+-----+--------+----+--------------------+--------------------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+--------------------+------+------+------+------+------+------+-------+--------------------+------+--------------+------+------+------+------+------+------+------+-------+------+------+------+------+------+------+
|sample_id|               qname|flag|contig|     pos|pos_start| pos_end|mapq|       cigar|rnext|   pnext|tlen|                 seq|                qual|tag_AM|tag_AS|tag_BC|tag_BQ|tag_BZ|tag_CB|tag_CC|tag_CG|tag_CM|tag_CO|tag_CP|tag_CQ|tag_CR|tag_CS|tag_CT|tag_CY|tag_E2|tag_FI|tag_FS|tag_FZ|tag_H0|tag_H1|tag_H2|tag_HI|tag_IH|tag_LB| tag_MC|              tag_MD|tag_MI|tag_MQ|tag_NH|tag_NM|tag_OA|tag_OC| tag_OP|              tag_OQ|tag_OX|   

In [12]:
spark.sql(f"select sample_id, contig, pos, mapq from {table_bam}").show()

+---------+------+--------+----+
|sample_id|contig|     pos|mapq|
+---------+------+--------+----+
|   mother|    20| 9999879|  60|
|   mother|    20| 9999879|  60|
|   mother|    20| 9999896|  60|
|   mother|    20| 9999905|  60|
|   mother|    20| 9999906|  60|
|   mother|    20| 9999909|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999945|  60|
|   mother|    20| 9999976|  60|
|   mother|    20| 9999979|  60|
|   mother|    20| 9999982|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999997|  70|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999999|  60|
|   mother|    20|10000023|  60|
|   mother|    20|10000027|  60|
+---------+------+--------+----+
only showing top 20 rows



Pokaż unikalne wartości flag jakie przyjmują wartości z tabeli

In [13]:
spark.sql(f"select distinct flag from {table_bam} order by flag ASC").show()  # distinct

+----+
|flag|
+----+
|  65|
|  69|
|  73|
|  81|
|  83|
|  89|
|  97|
|  99|
| 101|
| 113|
| 129|
| 133|
| 137|
| 145|
| 147|
| 153|
| 161|
| 163|
| 165|
| 177|
+----+
only showing top 20 rows



`Czym (jakim typem danych) są dane zwracane przez spark.sql ("SELECT..")?`

## Sortowanie (klauzula ORDER BY)

In [14]:
df = spark.sql(f"select distinct flag from {table_bam} order by flag ASC") 
print(type(df))
df.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- flag: integer (nullable = false)



### Filtrowanie wierszy 

In [17]:
spark.sql(f"select sample_id, contig, pos, mapq from {table_bam} where pos < 13858310").show()

+---------+------+--------+----+
|sample_id|contig|     pos|mapq|
+---------+------+--------+----+
|   mother|    20| 9999879|  60|
|   mother|    20| 9999879|  60|
|   mother|    20| 9999896|  60|
|   mother|    20| 9999905|  60|
|   mother|    20| 9999906|  60|
|   mother|    20| 9999909|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999945|  60|
|   mother|    20| 9999976|  60|
|   mother|    20| 9999979|  60|
|   mother|    20| 9999982|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999997|  70|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999999|  60|
|   mother|    20|10000023|  60|
|   mother|    20|10000027|  60|
+---------+------+--------+----+
only showing top 20 rows



In [19]:
spark.sql(f"select sample_id, contig, pos, mapq from {table_bam} where pos < 13858310 and flag NOT IN (113,117,121) and mapq > 50").show()

+---------+------+--------+----+
|sample_id|contig|     pos|mapq|
+---------+------+--------+----+
|   mother|    20| 9999879|  60|
|   mother|    20| 9999879|  60|
|   mother|    20| 9999896|  60|
|   mother|    20| 9999905|  60|
|   mother|    20| 9999906|  60|
|   mother|    20| 9999909|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999945|  60|
|   mother|    20| 9999976|  60|
|   mother|    20| 9999979|  60|
|   mother|    20| 9999982|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999997|  70|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999999|  60|
|   mother|    20|10000023|  60|
|   mother|    20|10000027|  60|
+---------+------+--------+----+
only showing top 20 rows



<div class="alert alert-block alert-warning">
<b>Zadanie 3_1:</b> 
Napisz zapytanie które zwróci unikalne wartości CIGAR na contigu 20 na pozycjach 1-1000000. Posortuj względem długości pola cigar malejąco. Jesli kilka CIGAR ma taką samą długość to posortuj rosnąco alfebatycznie. Ile jest takich unikalnych wartości?
    </div>


# Grupowanie danych

Pokaż ile jest wierszy o konkretnej wartości flagi

In [20]:
spark.sql(f"select flag, count(*) as cnt from {table_bam} group by flag order by cnt desc").show()  # AS - alias kolumny

+----+-----+
|flag|  cnt|
+----+-----+
|  99|28753|
| 147|28735|
| 163|28660|
|  83|28638|
|1123| 4334|
|1171| 4331|
|1187| 4284|
|1107| 4282|
| 165|  317|
| 133|  268|
|  89|  250|
|  73|  222|
|1113|   67|
|1097|   46|
|  65|    9|
| 129|    9|
| 101|    5|
|  69|    3|
| 137|    3|
| 177|    3|
+----+-----+
only showing top 20 rows



Mozna wykorzystac kilka funkcji agregujących (na roznych kolumnach) w jednym zapytaniu. 

Pokaz ile odczytow ma dana flage. Pokaz takze ich srednia jakosc mapowania oraz minimalna pozycje na ktorej wystepuja

In [21]:
spark.sql(f"select flag, count(*) as cnt, avg(mapq) as avg_m, min(pos_start) as min from {table_bam} group by flag order by cnt, avg_m desc").show()  # AS - alias kolumny

+----+---+------------------+--------+
|flag|cnt|             avg_m|     min|
+----+---+------------------+--------+
|1201|  1|              60.0|16037158|
| 145|  1|              60.0|16019143|
| 161|  1|              60.0|15966265|
|  97|  1|              60.0|16031210|
|1137|  1|               0.0|16034939|
|  81|  1|               0.0|15960755|
|1153|  2|              60.0|10065628|
|1089|  2|              60.0|10065594|
|1177|  2|              20.0|10101679|
| 177|  3|              40.0|10144245|
| 113|  3|              40.0|10144245|
| 153|  3|              10.0|10101686|
| 137|  3|               0.0|15863674|
|  69|  3|               0.0|15863674|
| 101|  5|               0.0|10101679|
| 129|  9| 52.22222222222222|10000816|
|  65|  9|49.111111111111114|10000670|
|1097| 46| 59.67391304347826|10019625|
|1113| 67| 56.35820895522388|10003067|
|  73|222|56.509009009009006|10000788|
+----+---+------------------+--------+
only showing top 20 rows



Funkcje mozemy zagniezdzac. Np: round(avg(kolumna))

In [23]:
# Uzycie funkcji floor na średniej
spark.sql(f"select flag, count(*) as cnt, floor(avg(mapq)) as avg_m, min(pos_start) as min from {table_bam} group by flag order by cnt, avg_m desc").show()  # AS - alias kolumny

+----+---+-----+--------+
|flag|cnt|avg_m|     min|
+----+---+-----+--------+
|1201|  1|   60|16037158|
| 145|  1|   60|16019143|
| 161|  1|   60|15966265|
|  97|  1|   60|16031210|
|1137|  1|    0|16034939|
|  81|  1|    0|15960755|
|1153|  2|   60|10065628|
|1089|  2|   60|10065594|
|1177|  2|   20|10101679|
| 177|  3|   40|10144245|
| 113|  3|   40|10144245|
| 153|  3|   10|10101686|
| 137|  3|    0|15863674|
|  69|  3|    0|15863674|
| 101|  5|    0|10101679|
| 129|  9|   52|10000816|
|  65|  9|   49|10000670|
|1097| 46|   59|10019625|
|1113| 67|   56|10003067|
|  73|222|   56|10000788|
+----+---+-----+--------+
only showing top 20 rows



<div class="alert alert-block alert-warning">
<b>Zadanie 3_2:</b> 
Wiedząc, że jako wyrażenie grupujące można wykorzystać wywołanie funkcji na kolumnie napisz zapytanie które policzy ile odczytów ma określoną długość CIGARa. Dla tych grup policz także zaokrągloną (ROUND) wartość jakości mapowania odczytów. Wynik zawęź tylko do odczytów występujących na chromosomie 20 lub 21. Posortuj malejaco liczności grup.
    </div>




# Filtrowanie grup danych

Warunki na pojedyncze wiersze nakładamy za pomocą klauzuli WHERE. Jeśli dokonujemy grupowania danych i chcemy w wynikach uzyskać jedynie informacje o grupach, które spełniają określone warunki używamy klauzuli HAVING. (HAVING moze wystapic wylacznie z GROUP BY).

In [24]:
# Zrob podsumowanie dla grup odczytów o zgodnym flag. Policz srednie mapq oraz minmalna pozycje
# Pokaz jedynie informacja o flagach dla ktorych jest ponad 1000 odczytów.
spark.sql(f"select flag, count(*) as cnt, floor(avg(mapq)) as avg_m, min(pos_start) as min \
            from {table_bam} \
            group by flag \
            having count(*) > 1000 \
            order by cnt, avg_m desc").show()

+----+-----+-----+--------+
|flag|  cnt|avg_m|     min|
+----+-----+-----+--------+
|1107| 4282|   59|10000397|
|1187| 4284|   59|10000189|
|1171| 4331|   59|10000037|
|1123| 4334|   59| 9999879|
|  83|28638|   59|10000096|
| 163|28660|   59| 9999896|
| 147|28735|   59|10000037|
|  99|28753|   59| 9999879|
+----+-----+-----+--------+



`Czy w klauzuli HAVING możemy użyc aliasu kolumny cnt?`

Warunki w HAVING też mogą być złożone (złączone AND, OR). Ale warunki w HAVING mogą się odnosić jedynie do kolumn grupujących lub funkcji agregujących

In [25]:
spark.sql(f"select flag, count(*) as cnt, floor(avg(mapq)) as avg_m, min(pos_start) as min \
            from {table_bam} \
            group by flag \
            having count(*) > 1000 and flag != 99 \
            order by cnt, avg_m desc").show()

+----+-----+-----+--------+
|flag|  cnt|avg_m|     min|
+----+-----+-----+--------+
|1107| 4282|   59|10000397|
|1187| 4284|   59|10000189|
|1171| 4331|   59|10000037|
|1123| 4334|   59| 9999879|
|  83|28638|   59|10000096|
| 163|28660|   59| 9999896|
| 147|28735|   59|10000037|
+----+-----+-----+--------+



In [None]:
# Bledna konstrukcja - W HAVING mamy warunek na kolumny niegrupujace
# spark.sql(f"select flag, count(*) as cnt, floor(avg(mapq)) as avg_m, min(pos_start) as min \
#             from {table_bam} \
#             group by flag \
#             having count(*) > 1000 and sample_id ='mother' \
#             order by cnt, avg_m desc").show()

<div class="alert alert-block alert-warning">
<b>Zadanie 3_3:</b> 
Wiedząc, że jako wyrażenie grupujące można wykorzystać wywołanie funkcji na kolumnie napisz zapytanie które policzy ile odczytów ma określoną długość CIGARa. 
Dla tych grup policz także zaokrągloną (ROUND) wartość jakości mapowania odczytów. 
Wynik zawęź tylko do odczytów występujących na chromosomie 20 lub 21. Pokaż tylko grupy mające średnia jakosc mapowania powyzej 30. Posortuj malejaco liczności grup.
    </div>

## Podzapytania zagnieżdżone

Chcemy zobaczyć, jaka jest najwyższa wartość mapq w naszym zbiorze danych.

In [26]:
spark.sql(f"select max(mapq) from {table_bam} ").show()

+---------+
|max(mapq)|
+---------+
|       70|
+---------+



Zobaczmy zatem jakie są dane tych odczytów które charakteryzują się najwyższą wartością jakosci mapowania

In [27]:
spark.sql(f"select sample_id, contig, pos, mapq from {table_bam} where mapq=60 ").show()

+---------+------+--------+----+
|sample_id|contig|     pos|mapq|
+---------+------+--------+----+
|   mother|    20| 9999879|  60|
|   mother|    20| 9999879|  60|
|   mother|    20| 9999896|  60|
|   mother|    20| 9999905|  60|
|   mother|    20| 9999906|  60|
|   mother|    20| 9999909|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999930|  60|
|   mother|    20| 9999945|  60|
|   mother|    20| 9999976|  60|
|   mother|    20| 9999979|  60|
|   mother|    20| 9999982|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999987|  60|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999997|  60|
|   mother|    20| 9999999|  60|
|   mother|    20|10000023|  60|
|   mother|    20|10000027|  60|
|   mother|    20|10000037|  60|
+---------+------+--------+----+
only showing top 20 rows



Teraz sprobujmy to zapisac bez zaszywania wartości 60 w zapytaniu.

In [28]:
spark.sql(f"select sample_id, contig, pos, mapq from {table_bam} where mapq=(select max(mapq) from {table_bam}) ").show()

+---------+------+--------+----+
|sample_id|contig|     pos|mapq|
+---------+------+--------+----+
|   mother|    20| 9999997|  70|
|   mother|    20|10006697|  70|
|   mother|    20|10006697|  70|
|   mother|    20|10006698|  70|
|   mother|    20|10006698|  70|
|   mother|    20|10006819|  70|
|   mother|    20|10008145|  70|
|   mother|    20|10008682|  70|
|   mother|    20|10008684|  70|
|   mother|    20|10008759|  70|
|   mother|    20|10008759|  70|
|   mother|    20|10010396|  70|
|   mother|    20|10010433|  70|
|   mother|    20|10010481|  70|
|   mother|    20|10010481|  70|
|   mother|    20|10011484|  70|
|   mother|    20|10015532|  70|
|   mother|    20|10015534|  70|
|   mother|    20|10015539|  70|
|   mother|    20|10015546|  70|
+---------+------+--------+----+
only showing top 20 rows



<div class="alert alert-block alert-warning">
<b>Zadanie 3_4:</b> 

1) Sprawdź ile jest odczytów mających maksymalna wartosc mapq. 

2) Zobacz ile jest odczytów ktorych mapq jest w pierwszej 3 wartosci mapq ze zbioru danych.

 * podpowiedzi: ograniczenie liczby zwracanych wierszy (LIMIT). 
 * przemyślenie operatora porównania wartosci mapq oraz wartosci zwracanych przez podzapytanie
    </div>



# Wykorzystanie funkcji wbudowanych

Niektóre funkcje do wykorzystania:
ROUND, FLOOR, SIGN, POW, LEAST, LOG
UPPER, LOWER, SUBSTR, 
NVL
CURRENT_DATE
MIN, MAX, SUM, STDDEV, AVG

In [29]:
spark.sql(f"select distinct sample_id, upper(sample_id), current_date() from {table_bam} ").show()

+---------+----------------+--------------+
|sample_id|upper(sample_id)|current_date()|
+---------+----------------+--------------+
|   mother|          MOTHER|    2022-05-13|
+---------+----------------+--------------+



# Wykorzystanie funkcji rozszerzonych

Istnieją rozszerzenia Sparka do obslugi genomicznych danych. Na przykład pakiet sequila dostarcza metod do rozproszonego wyliczenia glebokosci pokrycia i pileup.

http://biodatageeks.org/sequila/

In [30]:
# zeby skorzystac z rozszerzen importujemy modul i tworzymy obiekt ss, opakowanie na sesje sparkową
from pysequila import SequilaSession
ss = SequilaSession(spark)

Coverage jest dostępny jako funkcja tabelaryczna, czyli funkcja zwracająca tabelę.

In [33]:
# konstruujemy zapytanie
cov_query = f"select * from coverage ('{table_bam}', 'mother', 'blocks')"


In [34]:
ss.sql(cov_query).show()

+------+---------+--------+--------+
|contig|pos_start| pos_end|coverage|
+------+---------+--------+--------+
|    20|  9999879| 9999895|       1|
|    20|  9999896| 9999904|       2|
|    20|  9999905| 9999905|       3|
|    20|  9999906| 9999908|       4|
|    20|  9999909| 9999929|       5|
|    20|  9999930| 9999944|       6|
|    20|  9999945| 9999975|       7|
|    20|  9999976| 9999978|       8|
|    20|  9999979| 9999981|       9|
|    20|  9999982| 9999986|      10|
|    20|  9999987| 9999996|      12|
|    20|  9999997| 9999998|      15|
|    20|  9999999|10000022|      16|
|    20| 10000023|10000026|      17|
|    20| 10000027|10000027|      18|
|    20| 10000028|10000036|      17|
|    20| 10000037|10000044|      18|
|    20| 10000045|10000053|      16|
|    20| 10000054|10000054|      15|
|    20| 10000055|10000061|      14|
+------+---------+--------+--------+
only showing top 20 rows



<div class="alert alert-block alert-warning">
<b>Zadanie 3_5:</b> 
Pokaż tylko pozycje, które mają głębokość pokrycia większą niż 5.
    </div>


<div class="alert alert-block alert-warning">
<b>Zadanie 3_6:</b> 
Korzystając z opcji uruchomienia funkcji coverage ktora zwraca pokrycie dla kazdej pozycji niezaleznie (bases zamiast blocks) napisz  zapytanie które zwróci ile pozycji ma daną głębokość pokrycia. Posortuj po liczności malejąco.
    </div>



In [35]:
ss.stop()

# Wykrywanie wariantów
 W wersji rozproszonej można użyć GATK HaplotypeCaller
 

In [40]:
#! gatk-hpt-caller-k8s.sh \
  #--conf "spark.executor.memory=1g" \
  #--conf "spark.driver.memory=1g" \
  #--conf "spark.executor.instances=2" \
  #--conf "spark.hadoop.fs.gs.block.size=8388608" 
  #-R /mnt/data/mapping/ref/ref.fasta \
  #-I gs://edugen-lab-$USER/bam/mother10.bam \
  #-O gs://edugen-lab-$USER/vcf/mother10.vcf

In [41]:
! cat /usr/local/bin/gatk-hpt-caller-k8s.sh  # podejrzymy konfiguracje

#!/usr/bin/env bash
echo "####Running GATK with users' params: $@ on Kubernetes"
gatk HaplotypeCallerSpark \
  --spark-runner SPARK \
  --spark-master k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT \
  --conf spark.jars=/tmp/gcs-connector-${GCS_CONNECTOR_VERSION}-shaded.jar,/tmp/google-cloud-nio-${GCS_NIO_VERSION}-shaded.jar \
  --conf spark.hadoop.google.cloud.auth.service.account.enable=true \
  --conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=$GOOGLE_APPLICATION_CREDENTIALS \
  --conf spark.kubernetes.driverEnv.GCS_PROJECT_ID=$PROJECT_ID \
  --conf spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS \
  --conf spark.executorEnv.GCS_PROJECT_ID=$PROJECT_ID \
  --conf spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS=$GOOGLE_APPLICATION_CREDENTIALS \
  --conf spark.kubernetes.driver.secrets.$SERVICE_ACCOUNT-secret=$SECRETS_MOUNT_DIR \
  --conf spark.kubernetes.executor.secrets.$SERVICE_ACCOUNT-secret=$SECRETS_MOUNT