# Challenge 1: Distributed Computing on Cloud
- Enrique Almazán Sánchez
- Victor Miguel Álvarez Camarero
- Javier Villoldo Fernández

Distributed computing on cloud has revolutionized the way we handle large-scale data processing tasks, offering unparalleled scalability and efficiency. In this practice, we delve into the realm of Azure cloud computing, leveraging its robust infrastructure to tackle tasks related to genomic data analysis. Our primary focus is on processing a Variant Call Format (VCF) file with parquet, a common format for storing large datasets efficiently. By harnessing the power of distributed computing, we aim to analyze vast genomic datasets swiftly and accurately. 

One of the key objectives is to identify the most common variants associated with a particular disease, such as Parkinson's disease. Additionally, we explore the  integration of regression analysis and machine learning algorithms to glean insights and patterns from the data, further enhancing our understanding of genetic factors underlying diseases. Through this practice, we embark on a journey to unlock the potential of cloud-based distributed computing for genomic research and medical insights.

## Imports


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, udf, desc, count, expr, when, array_contains
from pyspark.sql.types import StringType

StatementMeta(enrique, 2, 17, Finished, Available)

## 1. Download the data

In the initial phase of our analysis, we embark on acquiring the necessary data for our genomic research endeavors. Leveraging the capabilities of Azure Synapse Analytics, we access the dataset residing in the cloud storage container 'blob1' within the Azure Data Lake Storage Gen2 account named 'massivelabpractices'. Using the Azure Blob FileSystem (ABFSS) protocol, we retrieve the dataset effortlessly through the specified path as a DataFrame. 

This Parquet-compressed file encapsulates the genomic data in a structured and efficient manner, facilitating seamless data ingestion and processing within our Spark environment. With this crucial step, we lay the foundation for our subsequent analyses, paving the way for insightful discoveries and advancements in genomic research within the Azure Synapse Analytics ecosystem.

In [17]:
df = spark.read.parquet('abfss://blob1@massivelabpractices.dfs.core.windows.net/part-00000-bf776855-7d75-4bbf-9448-deabb543dd3a-c000.snappy.parquet')

StatementMeta(enrique, 2, 18, Finished, Available)

## 2. Information about the data

The data's structure, types, and metadata is also examined. The data types, as well as the dataframe it self are displayed to visually grasp its contents. Furthermore, we conduct a count to quantify the dataset's instances, providing insights into the abundance of genomic codes. This analysis lays the groundwork for deeper exploration and understanding of the genomic data.

In [18]:
# Data types of each column
df.dtypes

StatementMeta(enrique, 2, 19, Finished, Available)

[('CHROM', 'string'),
 ('POS', 'string'),
 ('ID', 'string'),
 ('REF', 'string'),
 ('ALT', 'string'),
 ('QUAL', 'string'),
 ('FILTER', 'string'),
 ('INFO', 'string')]

In [19]:
# Dataframe structure
display(df)

StatementMeta(enrique, 2, 20, Finished, Available)

SynapseWidget(Synapse.DataFrame, ea8b0e9b-7f2c-46d2-b894-b49e7848d7ae)

In [20]:
# Genetic code count
df.count()

StatementMeta(enrique, 2, 21, Finished, Available)

1168603000

## 3. Preprocessing of the data

In this section, we preprocess the data from the INFO column of the VCF file to extract variant information related to the specified disease, Parkinson. The INFO column contains semi-structured metadata about each variant, including the CLNDN field, which holds variant-related description, including its name. To facilitate analysis, we define a user-defined function (UDFs) in PySpark, serving as essential preprocessing steps to extract meaningful variant information for subsequent analysis.

The `extract_field` function, isolates the CLNDN field from the INFO column. It parses each INFO entry, searching for the CLNDN field and extracting its corresponding valulysis.

In [21]:
# Define a UDF to extract the value of the interest field
@udf(StringType())
def extract_field(info):
    """
    Extracts the value of the CLNDN field from the INFO column.

    Args:
        info (str): The value of the INFO column.

    Output:
        str: The value of the CLNDN field.
    """

    # The specific field to focus on is defined
    field = "CLNDN"

    # The field inside the 'INFO' column are split
    fields = info.split(';')

    # Iterating over each of the fields inside the 'INFO' column
    for f in fields:

        # If the CLNDN field is found then it is returned
        if field in f:
            return f.split('=')[1]

StatementMeta(enrique, 2, 22, Finished, Available)

### STEP 1: Dataset filtering

Filtering the DataFrame to only include rows where the value in the "INFO" column contains the string "parkinson". It uses the `filter` function along with the `contains` method applied to the "INFO" column. The `contains` method checks if the specified substring "parkinson" is present in each value of the "INFO" column. The resulting DataFrame will contain only those rows where the "INFO" column meets this condition.

In [22]:
# Dataset filtering, obtaining those rows that contain parkinson in the 'INFO' column
d_df = df.filter(col("INFO").contains("parkinson"))

StatementMeta(enrique, 2, 23, Finished, Available)

### STEP 2: Adding CLNDN column

Creating a new column called "CLNDN" in the DataFram which vaalue  are obtained by applying th extract_fieldd function to the "INFO" columf. The extract_fiel  function extracts a specific field ("CLNDN") from the information contained in the "INFO" colum..

In [23]:
# Adding a new column, with the CLNDN information stored in the 'INFO' column
d_df = d_df.withColumn("CLNDN", extract_field(d_df["INFO"]))

StatementMeta(enrique, 2, 24, Finished, Available)

### STEP 3: Showing some information about the new DataFrame

Different information of the new DataFrame can be shown, such as the count of variants, its names or even displaying it.

In [24]:
# The new dataset can be displayed
display(d_df)

StatementMeta(enrique, 2, 25, Finished, Available)

SynapseWidget(Synapse.DataFrame, e0064f0b-d31c-4f9a-b3f7-7699c1aad683)

In [25]:
# The new dataset can be counted
d_df.count()

StatementMeta(enrique, 2, 26, Finished, Available)

35

## 4. Top 10 variants

Finally, the 10 most common variants associated with Parkinson's disease from the extracted variant names is identified. 

### STEP 1: Grouping by variants through the IDs

Group the instances based on the variants through the column 'ID'. Also their frequencies are computed.

In [26]:
# Group by variants and compute its frequency.
value_counts = d_df.groupBy("id").agg(count("id").alias("frequency"))

StatementMeta(enrique, 2, 27, Finished, Available)

In [27]:
# Show each different variant and their frequency
value_counts.show()

StatementMeta(enrique, 2, 28, Finished, Available)

+------------+---------+
|          id|frequency|
+------------+---------+
|  rs63750756|        3|
| rs377402921|        3|
|  rs63751392|        3|
|  rs63751273|        3|
|  rs63750424|        3|
|rs1553122918|        1|
|rs2105608859|        1|
|rs2105613409|        1|
|rs2105604954|        1|
| rs774629025|        1|
|rs2105605124|        1|
|   rs8042919|        1|
| rs753306031|        1|
| rs909275553|        1|
| rs766432479|        1|
| rs397518480|        1|
| rs142013283|        1|
|rs2033909793|        1|
| rs368024152|        1|
|rs1060499594|        1|
+------------+---------+
only showing top 20 rows



### STEP 2: Sorting variants

Sort the variants based on their frequencies in descending order and select the top three variants. 

In [28]:
# Descendent order with respect variant frequency
sorted_counts = value_counts.orderBy(desc("frequency"))

StatementMeta(enrique, 2, 29, Finished, Available)

### STEP 3: Obtaining the top 10 variants

In [29]:
# Limit to the 10 most repetitive variants
top_10_variants = sorted_counts.limit(10)

StatementMeta(enrique, 2, 30, Finished, Available)

In [30]:
# Displaying those 10 variants allows to show a graph that represents the frequency of the top 3 variants.
display(top_10_variants)

StatementMeta(enrique, 2, 31, Finished, Available)

SynapseWidget(Synapse.DataFrame, 566fb5df-7438-4d3d-929e-4914dad82cec)