# Protein feature extraction pipeline

This notebook will contain the pipeline for extracting features from protein sequences. It will be used as a way to show the output without needing to run the `pipeline.py` file locally.

In [1]:
import pyarrow as pa
from fondant.pipeline import Pipeline
import os
from config import MOCK_DATA_PATH_FONDANT

## Generate Mock data

In [2]:
import subprocess

subprocess.run(["python", "utils/generate_mock_data.py"], shell=True)

CompletedProcess(args=['python', 'utils/generate_mock_data.py'], returncode=0)

In [3]:
# show content of the mock data
import pandas as pd
df = pd.read_parquet("." + MOCK_DATA_PATH_FONDANT)  # dot added to make it relative to the current directory
df

Unnamed: 0,sequence,name
0,MNQRGMPIQSLVTNVKINRLEENDCIHTRHRVRPGRTDGKNLHAMM...,Seq1
1,MAGLKPEVPLHDGINKFGKSDFAGQEGPKIVTTTDKALLVANGALK...,Seq2
2,MVDLKKELKNFVDSDFPGSPKQEAQGIDVRILLSFNNAAFREALII...,Seq3
3,MELILAKARLEFECDWGLLMLEPCVPPTKIFADRNYAVGVMFESDK...,Seq4
4,MRVLCDGSTGYACAKNTRIRFREKVASVLAKIQGYEQTFPHHMPNM...,Seq5


## Loading the dataset

In [4]:
# Create a new pipeline

BASE_PATH = ".fondant"
PIPELINE_NAME = "feature_extraction_pipeline"

pipeline = Pipeline(
	name=PIPELINE_NAME,
	base_path=BASE_PATH,
	description="A pipeline to extract features from protein sequences."
)

## Creating the pipeline

In [5]:
# Read the dataset

dataset = pipeline.read(
	"load_from_parquet",
	arguments={
		"dataset_uri": MOCK_DATA_PATH_FONDANT,
	},
	produces={
		"sequence": pa.string()
	}
)

## Components

This section will contain the components that will be used in the pipeline.

These are the components that will be used in the pipeline:

- `generate_protein_sequence_checksum_component`: This component will generate a checksum for the protein sequence.

- `biopython_component`: This component will extract features from the protein sequence using Biopython.

- `iFeatureOmega_component`: This component will extract features from the protein sequence using iFeature Omega. This component uses arguments to specify the type of features to extract.

- `filter_pdb_component`: This component will filter the PDB files that are already predicted, so the pipeline doesn't need to predict them again. You'll need to specify the following arguments before running the pipeline:
```json
"storage_type": "local",
"pdb_path": "/data/pdb_files/",
"bucket_name": "your-bucket-name",
"project_id": "your-project-id",
"google_cloud_credentials_path": "/data/google_cloud_credentials.json"
```

If you're only using `local`, then you can keep the `bucket_name`, `project_id` and `google_cloud_credentials_path` as empty strings. Using `remote` will require you to have a Google Cloud Storage bucket with credentials and a project ID.

- `predict_protein_3D_structure_component`: This component will predict the 3D structure of the protein using ESMFold.

- `store_pdb_component`: This component will store the PDB files in the provided `storage_type`. You'll need to specify the following arguments before running the pipeline:
```json
"storage_type": "local",
"pdb_path": "/data/pdb_files/",
"bucket_name": "your-bucket-name",
"project_id": "your-project-id",
"google_cloud_credentials_path": "/data/google_cloud_credentials.json"
```

If you're only using `local`, then you can keep the `bucket_name`, `project_id` and `google_cloud_credentials_path` as empty strings. Using `remote` will require you to have a Google Cloud Storage bucket with credentials and a project ID.

In [6]:
# Apply the components to the dataset

_ = dataset.apply(
	"./components/biopython_component"
).apply(
	"./components/generate_protein_sequence_checksum_component"
).apply(
	"./components/iFeatureOmega_component",
	# currently forcing the number of rows to 5, see readme for more info
	input_partition_rows=5,
	arguments={
		"descriptors": ["AAC", "CTDC", "CTDT"]
	}
)





## Run the pipeline

The `pipeline.py` file needs to be run using the command line. The following command will run the pipeline:

```bash
fondant < full_path_to_pipeline.py >\data:/data
```

In [20]:
from fondant.pipeline.runner import DockerRunner

# get current full path to the project
mounted_data = os.path.join(os.path.abspath("data"), ":/data")

DockerRunner().run(input=pipeline, extra_volumes=mounted_data)

[2024-04-18 15:34:25,726 | root | INFO] Found reference to un-compiled pipeline... compiling
[2024-04-18 15:34:25,728 | fondant.pipeline.compiler | INFO] Compiling feature_extraction_pipeline to .fondant/compose.yaml
[2024-04-18 15:34:25,731 | fondant.pipeline.compiler | INFO] Base path found on local system, setting up .fondant as mount volume
[2024-04-18 15:34:25,733 | fondant.pipeline.pipeline | INFO] Sorting pipeline component graph topologically.
[2024-04-18 15:34:25,742 | fondant.pipeline.pipeline | INFO] All pipeline component specifications match.
[2024-04-18 15:34:25,746 | fondant.pipeline.compiler | INFO] Compiling service for load_from_parquet
[2024-04-18 15:34:25,749 | fondant.pipeline.compiler | INFO] Compiling service for biopython_component
[2024-04-18 15:34:25,751 | fondant.pipeline.compiler | INFO] Found Dockerfile for biopython_component, adding build step.
[2024-04-18 15:34:25,753 | fondant.pipeline.compiler | INFO] Compiling service for generate_protein_sequence_che

Starting pipeline run...
Finished pipeline run.


## Results

The following results have been taken from the output of the pipeline, which is stored in the `.fondant` directory. This directory contains the output of each component, together with the cache of the previous run. Currently, the pipeline doesn't implement the `write_to_file` component, so the results will be taken individually from the output of each component.

In [21]:
import glob

# get the most recent folder in the folder named: BASE_PATH + PIPELINE_NAME + PIPELINE_NAME-<timestamp>
matching_folders = glob.glob(f"{BASE_PATH}/{PIPELINE_NAME}/{PIPELINE_NAME}-*")

if matching_folders:
    output_folder = max(matching_folders, key=os.path.getctime)
else:
    print("No matching folders found")
    exit()

In [22]:
import os
import pandas as pd
import pyarrow.parquet as pq

# Function to merge the Parquet folders with each other using the sequence column as pivot
def merge_parquet_folders(folder_path):
	merged_df = pd.DataFrame()
	
	# Loop through each folder in the output_folder
	for folder in os.listdir(folder_path):
		parquet_partitions = os.path.join(folder_path, folder)
		
		folder_df = pd.DataFrame()
		
		for file in os.listdir(parquet_partitions):
			if file.endswith(".parquet"):
				file_path = os.path.join(parquet_partitions, file)
				df = pq.read_table(file_path).to_pandas()		
				folder_df = pd.concat([folder_df, df])
		
		if not merged_df.empty:
			merged_df = pd.merge(merged_df, folder_df, how='outer', on='sequence')
		else:
			merged_df = folder_df.copy()
	
	return merged_df

merged_df = merge_parquet_folders(output_folder)

  folder_df = pd.concat([folder_df, df])
  folder_df = pd.concat([folder_df, df])
  folder_df = pd.concat([folder_df, df])
  folder_df = pd.concat([folder_df, df])
  folder_df = pd.concat([folder_df, df])


In [23]:
merged_df

Unnamed: 0,sequence,sequence_length,molecular_weight,aromaticity,isoelectric_point,instability_index,gravy,helix,turn,sheet,...,CTDT_polarity.Tr2332,CTDT_polarizability.Tr1221,CTDT_polarizability.Tr1331,CTDT_polarizability.Tr2332,CTDT_secondarystruct.Tr1221,CTDT_secondarystruct.Tr1331,CTDT_secondarystruct.Tr2332,CTDT_solventaccess.Tr1221,CTDT_solventaccess.Tr1331,CTDT_solventaccess.Tr2332
0,MAGLKPEVPLHDGINKFGKSDFAGQEGPKIVTTTDKALLVANGALK...,400,43254.8112,0.055,5.964593,38.948025,-0.26375,0.335,0.3225,0.335,...,0.230576,0.265664,0.145363,0.177945,0.190476,0.265664,0.170426,0.26817,0.230576,0.152882
1,MELILAKARLEFECDWGLLMLEPCVPPTKIFADRNYAVGVMFESDK...,350,39615.9422,0.091429,4.825028,40.802857,-0.248,0.38,0.248571,0.357143,...,0.191977,0.255014,0.160458,0.243553,0.266476,0.206304,0.148997,0.275072,0.212034,0.160458
2,MNQRGMPIQSLVTNVKINRLEENDCIHTRHRVRPGRTDGKNLHAMM...,600,66369.0679,0.06,5.397908,38.074,-0.334833,0.385,0.268333,0.328333,...,0.215359,0.313856,0.123539,0.223706,0.265442,0.277129,0.126878,0.292154,0.186978,0.165275
3,MRVLCDGSTGYACAKNTRIRFREKVASVLAKIQGYEQTFPHHMPNM...,420,47355.5634,0.088095,5.392736,42.44,-0.518333,0.319048,0.283333,0.32619,...,0.229117,0.26969,0.167064,0.212411,0.250597,0.250597,0.116945,0.26969,0.214797,0.155131
4,MVDLKKELKNFVDSDFPGSPKQEAQGIDVRILLSFNNAAFREALII...,550,60158.722,0.072727,5.349652,38.161636,-0.183273,0.341818,0.296364,0.34,...,0.222222,0.300546,0.162113,0.185792,0.222222,0.256831,0.151184,0.28051,0.209472,0.145719


In [24]:
# Export the merged dataframe to a Parquet file

if not os.path.exists(os.path.join(os.path.abspath("data"), "export")):
	os.makedirs(os.path.join(os.path.abspath("data"), "export"))

output_path = os.path.join(os.path.abspath("data"), "export")

merged_df.to_parquet(os.path.join(output_path, "results.parquet"))