## 0.-  Get data

### Install libraries

In [0]:
%pip install bioframe

Python interpreter will be restarted.
Python interpreter will be restarted.


### Import libraries

In [0]:
import pandas as pd
import bioframe
import glob
import os



### Functions

In [0]:
def download_data(url_list: list, path: str):
    """
    Download files from website
        param:
            - url_list: List url of files 
            - path: Where save files downloading
        return:
            None
    """
    for url in url_list:
        os.system(f"""(cd {path} && curl -O -L {url})""")

In [0]:
def get_name_file_from_folder(path: str):
    """
    Get the full path of the files in the folder
        param:
            - path: Path folder
    return:
        - A list with full path of the files in the folder
    """
    return glob.glob(path+"/*")

[All schemas](https://github.com/open2c/bioframe/blob/main/bioframe/io/schemas.py)

In [0]:
def create_dataframe(path_files: str, schema_name="bed9"):
    """
    Concat all data of files in the folder
        param:
            - path_files: A list with full path of the files in the folder
            - schema_name: Data schema you need
        return:
            - A pandas dataframe with all data of files in the folder
    """
    df_aux = pd.DataFrame()
    for file in path_files:
        df = bioframe.read_table(
            file,
            schema=schema_name
            )
        df_aux = pd.concat([df_aux, df])
    return df_aux.reset_index()

### Create spark dataframe HDFs

In [0]:
def create_spark_dataframe(df: pd.DataFrame):
    """
    Transform pandas dataframe to spark dataframe
        param:
            - df: Pandas dataframe you need to transform
    return:
        - A spark to dataframe
    """
    spark_dataframe = spark.createDataFrame(df)
    return spark_dataframe

### Save table spark dataframe

In [0]:
def save_table(spark_dataframe, table_name: str, temp_table=""):
    """
    Save spark dataframe in delta table, parquet and temporary table
        param:
            - spark_dataframe: Spark dataframe you need to transform
            - table_name: Name to save the table
            - temp_table: Name to save the temporary table by 
                          default is saved with the name of the table
    """
    if temp_table == "":
        temp_table = table_name
    spark_dataframe.createOrReplaceTempView(temp_table)
    spark_dataframe.write.format("delta").mode("overwrite").save(f"/user/hive/warehouse/data/{table_name}")
    #Best practice save all data or dimensions in a parquet file
    spark_dataframe.repartition(1).write.format('parquet').mode('overwrite').save(f"data/dimensions")

### Process
* Download data
* Decompress data

### Create a folder

In [0]:
%sh 
mkdir data/
mkdir data/genomics/
mkdir data/genomics/chip-seq
mkdir data/genomics/dnase

### Download the DNase data

In [0]:
url_dnase_list = [
    "https://www.encodeproject.org/files/ENCFF001UVC/@@download/ENCFF001UVC.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001UWQ/@@download/ENCFF001UWQ.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001WEI/@@download/ENCFF001WEI.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001UVQ/@@download/ENCFF001UVQ.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001SOM/@@download/ENCFF001SOM.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001UVU/@@download/ENCFF001UVU.bed.gz"
]
PATH_DNASE = "data/genomics/dnase"

In [0]:
download_data(url_dnase_list, PATH_DNASE)
!ls data/genomics/dnase

ENCFF001SOM.bed.gz  ENCFF001UVQ.bed.gz	ENCFF001UWQ.bed.gz
ENCFF001UVC.bed.gz  ENCFF001UVU.bed.gz	ENCFF001WEI.bed.gz


### Download the ChIP-seq

In [0]:
url_chip_seq_list = [
    "https://www.encodeproject.org/files/ENCFF001VED/@@download/ENCFF001VED.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001VMZ/@@download/ENCFF001VMZ.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001XMU/@@download/ENCFF001XMU.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001XQU/@@download/ENCFF001XQU.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001USC/@@download/ENCFF001USC.bed.gz",
    "https://www.encodeproject.org/files/ENCFF001XRC/@@download/ENCFF001XRC.bed.gz",
]
PATH_CHIP_SEQ = "data/genomics/chip-seq"

In [0]:
download_data(url_chip_seq_list, PATH_CHIP_SEQ)
!ls data/genomics/chip-seq

ENCFF001USC.bed.gz  ENCFF001VMZ.bed.gz	ENCFF001XQU.bed.gz
ENCFF001VED.bed.gz  ENCFF001XMU.bed.gz	ENCFF001XRC.bed.gz


### Create dataframe

In [0]:
path_files_chip_seq = get_name_file_from_folder(PATH_CHIP_SEQ)
path_files_dnase = get_name_file_from_folder(PATH_DNASE)

In [0]:
df_chip_seq = create_dataframe(path_files_chip_seq)
df_dnase = create_dataframe(path_files_dnase)

  return func(*args, **kwargs)


In [0]:
spark_dnase = create_spark_dataframe(df_dnase) 
spark_chip_seq =  create_spark_dataframe(df_chip_seq) 

In [0]:
# Create a hdfs table, temp table and parquet
save_table(spark_dnase, "dnase", "temp_dnase") 
save_table(spark_chip_seq, "chip_seq", "temp_chip_seq") 

## 1.- View data

### View tables

In [0]:
%sql
SHOW TABLES;

database,tableName,isTemporary
default,dnase,False
,temp_chip_seq,True
,temp_dnase,True


### Basic queries (SQL)

In [0]:
%sql
SELECT * FROM temp_chip_seq

index,chrom,start,end,name,score,strand,thickStart,thickEnd,rgb
0,chrX,10087209,10088161,.,1000,.,91.15329,300.0,300.0
1,chr12,32907970,32909598,.,1000,.,76.52205,300.0,300.0
2,chr12,50360675,50361606,.,1000,.,149.86319,300.0,300.0
3,chr12,56617729,56618612,.,1000,.,180.71755,300.0,300.0
4,chr12,82751977,82753177,.,1000,.,108.63797,300.0,300.0
5,chr12,122231602,122233291,.,1000,.,122.64868,300.0,300.0
6,chr11,189544,190474,.,1000,.,118.98055,300.0,300.0
7,chr11,58938845,58939735,.,1000,.,138.27267,300.0,300.0
8,chr11,62553854,62555271,.,1000,.,136.69627,300.0,300.0
9,chr11,64780906,64782157,.,1000,.,189.39302,300.0,300.0


In [0]:
%sql
SELECT chrom, COUNT(chrom) AS num_chrom
FROM temp_dnase
GROUP BY chrom

chrom,num_chrom
chr1,82014
chr10,38543
chr12,37211
chr13,18415
chr11,41197
chr14,22958
chr15,25177
chr17,44830
chr16,33722
chr2,59964


In [0]:
%sql
SELECT name, COUNT(chrom)
FROM temp_dnase
GROUP BY name

name,count(chrom)
chr1.258,4
chr1.561,4
chr1.707,4
chr1.822,4
chr1.1121,4
chr1.1214,4
chr1.1280,4
chr1.1473,4
chr1.1498,4
chr1.2653,4


In [0]:
%sql
SELECT COUNT(distinct name) AS total_name
FROM temp_dnase

total_name
137628


### Basic queries (Spark_SQL)

In [0]:
spark.sql("""
SELECT
    chrom,
    start,
    end,
    name
FROM
    temp_dnase
""").show()

+-----+------+------+-------+
|chrom| start|   end|   name|
+-----+------+------+-------+
| chr1|713835|714424| chr1.1|
| chr1|752775|753050| chr1.2|
| chr1|753373|753448| chr1.3|
| chr1|762057|763210| chr1.4|
| chr1|793358|793612| chr1.5|
| chr1|805101|805538| chr1.6|
| chr1|839626|840461| chr1.7|
| chr1|842125|842534| chr1.8|
| chr1|846334|846381| chr1.9|
| chr1|846545|846937|chr1.10|
| chr1|848219|848588|chr1.11|
| chr1|851784|852887|chr1.12|
| chr1|853228|853414|chr1.13|
| chr1|856335|856934|chr1.14|
| chr1|858910|861473|chr1.15|
| chr1|864323|864424|chr1.16|
| chr1|866939|867506|chr1.17|
| chr1|867659|867895|chr1.18|
| chr1|868382|868481|chr1.19|
| chr1|868918|869332|chr1.20|
+-----+------+------+-------+
only showing top 20 rows



In [0]:
spark.sql("""
SELECT
    chrom,
    start,
    end,
    name
FROM
    temp_dnase
ORDER BY chrom DESC
""").show()

+-----+-------+-------+-------+
|chrom|  start|    end|   name|
+-----+-------+-------+-------+
| chrY|2685240|2685390|      .|
| chrY|2655336|2655515| chrY.1|
| chrY|7242660|7242810|      .|
| chrY|4859732|4860163|chrY.21|
| chrY|2689760|2689910|      .|
| chrY|2655752|2655873| chrY.2|
| chrY|2709520|2709670|      .|
| chrY|2657974|2658081| chrY.3|
| chrY|2725840|2725990|      .|
| chrY|2665305|2665362| chrY.4|
| chrY|2727940|2728090|      .|
| chrY|2706083|2706297| chrY.5|
| chrY|2728720|2728870|      .|
| chrY|2709507|2709878| chrY.6|
| chrY|2742420|2742570|      .|
| chrY|2725845|2725944| chrY.7|
| chrY|2755600|2755750|      .|
| chrY|2781305|2781396| chrY.8|
| chrY|2756120|2756270|      .|
| chrY|2802876|2804346| chrY.9|
+-----+-------+-------+-------+
only showing top 20 rows



In [0]:
spark.sql("""
SELECT
    *
FROM
    temp_chip_seq
-- WHERE rgb < 300 AND rgb > 100
ORDER BY rgb ASC
""").show()

+-----+-----+---------+---------+----------+-----+------+----------+--------+----+
|index|chrom|    start|      end|      name|score|strand|thickStart|thickEnd| rgb|
+-----+-----+---------+---------+----------+-----+------+----------+--------+----+
| 9180|chr11| 64110000| 64110150|         .|    0|     .|      25.0| 12.2144|-1.0|
|    9| chr1|   840100|   840250|         .|    0|     .|      38.0| 41.7674|-1.0|
|   18| chr1|   994800|   994950|         .|    0|     .|       8.0|  6.9648|-1.0|
|   29| chr1|  1058380|  1058530|         .|    0|     .|       8.0| 5.05584|-1.0|
|20766|chr15| 84506758| 84507040|chr15.1757| 1000|     .|    1.0336|    16.0|-1.0|
|   10| chr1|   848340|   848490|         .|    0|     .|      14.0| 10.3093|-1.0|
|   38| chr1|  1376380|  1376530|         .|    0|     .|      74.0| 120.708|-1.0|
|   13| chr1|   860360|   860510|         .|    0|     .|      13.0| 11.8292|-1.0|
|19323|chr17| 65830120| 65830270|         .|    0|     .|      11.0|   13.08|-1.0|
|   

### Basic queries (PySpark)

In [0]:
spark_dnase.select("chrom","index").show(5)

+-----+-----+
|chrom|index|
+-----+-----+
| chr1|    0|
| chr1|    1|
| chr1|    2|
| chr1|    3|
| chr1|    4|
+-----+-----+
only showing top 5 rows



In [0]:
spark_dnase.printSchema()

root
 |-- index: long (nullable = true)
 |-- chrom: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)
 |-- strand: string (nullable = true)
 |-- thickStart: double (nullable = true)
 |-- thickEnd: double (nullable = true)
 |-- rgb: long (nullable = true)



In [0]:
spark_chip_seq.filter(spark_chip_seq["rgb"]>100).show()

+-----+-----+---------+---------+----+-----+------+----------+--------+-----+
|index|chrom|    start|      end|name|score|strand|thickStart|thickEnd|  rgb|
+-----+-----+---------+---------+----+-----+------+----------+--------+-----+
|    0| chrX| 10087209| 10088161|   .| 1000|     .|  91.15329|   300.0|300.0|
|    1|chr12| 32907970| 32909598|   .| 1000|     .|  76.52205|   300.0|300.0|
|    2|chr12| 50360675| 50361606|   .| 1000|     .| 149.86319|   300.0|300.0|
|    3|chr12| 56617729| 56618612|   .| 1000|     .| 180.71755|   300.0|300.0|
|    4|chr12| 82751977| 82753177|   .| 1000|     .| 108.63797|   300.0|300.0|
|    5|chr12|122231602|122233291|   .| 1000|     .| 122.64868|   300.0|300.0|
|    6|chr11|   189544|   190474|   .| 1000|     .| 118.98055|   300.0|300.0|
|    7|chr11| 58938845| 58939735|   .| 1000|     .| 138.27267|   300.0|300.0|
|    8|chr11| 62553854| 62555271|   .| 1000|     .| 136.69627|   300.0|300.0|
|    9|chr11| 64780906| 64782157|   .| 1000|     .| 189.39302|  

## 2.- Data modify

### Funciones Funciones definidas por el usuario (UDF)

**Steps**:

* Creating a UDF
* Used

[More examples](https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/)

In [0]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

spark_chip_seq.withColumn("Upper chrom", upperCase(col("chrom"))).show(truncate=False)

+-----+-----+---------+---------+----+-----+------+----------+--------+-----+-----------+
|index|chrom|start    |end      |name|score|strand|thickStart|thickEnd|rgb  |Upper chrom|
+-----+-----+---------+---------+----+-----+------+----------+--------+-----+-----------+
|0    |chrX |10087209 |10088161 |.   |1000 |.     |91.15329  |300.0   |300.0|CHRX       |
|1    |chr12|32907970 |32909598 |.   |1000 |.     |76.52205  |300.0   |300.0|CHR12      |
|2    |chr12|50360675 |50361606 |.   |1000 |.     |149.86319 |300.0   |300.0|CHR12      |
|3    |chr12|56617729 |56618612 |.   |1000 |.     |180.71755 |300.0   |300.0|CHR12      |
|4    |chr12|82751977 |82753177 |.   |1000 |.     |108.63797 |300.0   |300.0|CHR12      |
|5    |chr12|122231602|122233291|.   |1000 |.     |122.64868 |300.0   |300.0|CHR12      |
|6    |chr11|189544   |190474   |.   |1000 |.     |118.98055 |300.0   |300.0|CHR11      |
|7    |chr11|58938845 |58939735 |.   |1000 |.     |138.27267 |300.0   |300.0|CHR11      |
|8    |chr

In [0]:
@udf(returnType=StringType()) 
def removeLetters(str):
    return str.replace('chr', '')

spark_chip_seq.withColumn("Upper chrom", removeLetters(col("chrom"))).show(truncate=False)

+-----+-----+---------+---------+----+-----+------+----------+--------+-----+-----------+
|index|chrom|start    |end      |name|score|strand|thickStart|thickEnd|rgb  |Upper chrom|
+-----+-----+---------+---------+----+-----+------+----------+--------+-----+-----------+
|0    |chrX |10087209 |10088161 |.   |1000 |.     |91.15329  |300.0   |300.0|X          |
|1    |chr12|32907970 |32909598 |.   |1000 |.     |76.52205  |300.0   |300.0|12         |
|2    |chr12|50360675 |50361606 |.   |1000 |.     |149.86319 |300.0   |300.0|12         |
|3    |chr12|56617729 |56618612 |.   |1000 |.     |180.71755 |300.0   |300.0|12         |
|4    |chr12|82751977 |82753177 |.   |1000 |.     |108.63797 |300.0   |300.0|12         |
|5    |chr12|122231602|122233291|.   |1000 |.     |122.64868 |300.0   |300.0|12         |
|6    |chr11|189544   |190474   |.   |1000 |.     |118.98055 |300.0   |300.0|11         |
|7    |chr11|58938845 |58939735 |.   |1000 |.     |138.27267 |300.0   |300.0|11         |
|8    |chr

In [0]:
keys = spark.sql("""
SELECT chrom
FROM temp_chip_seq
GROUP BY chrom
""").rdd.map(lambda x: x[0]).collect()

In [0]:
dictionary = {key: value for key, value in zip(keys, range(0, len(keys)))}

In [0]:
@udf(returnType=StringType()) 
def replaceByCode(str):
    return dictionary[str]

spark_chip_seq.withColumn("Code", replaceByCode(col("chrom"))).show(truncate=False)

+-----+-----+---------+---------+----+-----+------+----------+--------+-----+----+
|index|chrom|start    |end      |name|score|strand|thickStart|thickEnd|rgb  |Code|
+-----+-----+---------+---------+----+-----+------+----------+--------+-----+----+
|0    |chrX |10087209 |10088161 |.   |1000 |.     |91.15329  |300.0   |300.0|6   |
|1    |chr12|32907970 |32909598 |.   |1000 |.     |76.52205  |300.0   |300.0|0   |
|2    |chr12|50360675 |50361606 |.   |1000 |.     |149.86319 |300.0   |300.0|0   |
|3    |chr12|56617729 |56618612 |.   |1000 |.     |180.71755 |300.0   |300.0|0   |
|4    |chr12|82751977 |82753177 |.   |1000 |.     |108.63797 |300.0   |300.0|0   |
|5    |chr12|122231602|122233291|.   |1000 |.     |122.64868 |300.0   |300.0|0   |
|6    |chr11|189544   |190474   |.   |1000 |.     |118.98055 |300.0   |300.0|16  |
|7    |chr11|58938845 |58939735 |.   |1000 |.     |138.27267 |300.0   |300.0|16  |
|8    |chr11|62553854 |62555271 |.   |1000 |.     |136.69627 |300.0   |300.0|16  |
|9  