In [None]:
import io
import time
import boto3
import warnings
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from botocore import UNSIGNED
from botocore.config import Config

## **Data Pipeline Process**

A data pipeline typically follows an ETL (Extract, Transform, Load) like workflow. In this case, data is first extracted from AWS S3 using Boto3. It is then parsed into a structured Pandas DataFrame for processing. 

The pipeline filters relevant data, extracting attributes such as `gene_id` and `gene_name`. Finally, the cleaned dataset is converted into Apache Arrow format and stored as a Parquet file for efficient storage and retrieval. Using Arrow and Parquet significantly improves read/write performance compared to traditional formats like CSV. This structured approach ensures scalability, data integrity, and quick access for analytics and machine learning.

In [None]:
warnings.simplefilter(action='ignore', category=FutureWarning) # prevent warnings from popping up and obfuscating results
pd.options.mode.chained_assignment = None

s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))  
# Connect anonymously to s3 --> since data is public

In [3]:
def load_gtf_data(bucket_name, file_key): # reads content from AWS
    try:
        response = s3.get_object(Bucket=bucket_name, Key=file_key) # fetches the files from s3
        gtf_data = response['Body'].read().decode('utf-8')
        return gtf_data
    except Exception as e: # error-handling
        print(f"Error fetching data from S3: {e}")
        return None

In [4]:
def parse_gtf_data(gtf_text): #converts the text into a pandas dataframe
    col_names = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]
    df = pd.read_csv(io.StringIO(gtf_text), sep='\t', comment='#', names=col_names, low_memory=False)
    return df

In [5]:
def filter_genes(df):
    return df[df['feature'] == 'gene'] 
    # we want rows where the feature is "gene" only to reduce dataset size

In [6]:
def extract_attributes(df): 
    #to return a cleaned dataframe with only essential columns (varies according to function)
    df.loc[:,'gene_id'] = df['attribute'].str.extract('gene_id "(.*?)"')
    df.loc[:,'gene_name'] = df['attribute'].str.extract('gene_name "(.*?)"')
    df.loc[:,'gene_biotype'] = df['attribute'].str.extract('gene_biotype "(.*?)"')
    return df[['seqname', 'start', 'end', 'strand', 'gene_id', 'gene_name', 'gene_biotype']]

## PyArrow and Data Parsing

PyArrow is a very powerful library commonly used in a variety of data-science and DE projects which provides efficient in-memory data structures based on the Apache Arrow format. It enables zero-copy reads, which means data can be processed without unnecessary duplication, making it significantly faster than traditional row-based formats.

PyArrow works well on Parquet based files. Parquet, a columnar storage format, is optimized for analytics and reduces I/O overhead by enabling selective column reads. Compared to Pandas, which relies on row-based processing, the integration of PyArrow + Parquet dramatically improves read/write speeds, particularly for large datasets. PyArrow also integrates seamlessly with cloud storage and big data frameworks, making it ideal for scalable, high-performance data pipelines. Nonetheless - Pandas also does feature PyArrow capabilites on the backend with the inclusion of a suitable engine - allowing for it to read files fairly quickly as well!!

In [7]:
def convert_to_arrow(df, filename="data/genes_filtered.parquet"):
    arrow_table = pa.Table.from_pandas(df) #converts to arrow table
    pq.write_table(arrow_table, filename) #saves as parquet
    print(f"Data saved as Parquet: {filename}")
    return filename

In [8]:
def load_arrow_table(filename):
    arrow_table = pq.read_table(filename) #using the parquet file for analysis
    print("Arrow Table Loaded:")
    print(arrow_table.schema) #maintain the same schema cuz arrow is quite good (main point of using it)
    return arrow_table

In [9]:
# Full pipeline execution: fetch from s3; parse, clean, filter and extract attributes; store in parquet
if __name__ == "__main__":
    bucket_name = "sg-nex-data"
    file_key = "data/data_tutorial/annotations/hg38_chr22.gtf"

    gtf_text = load_gtf_data(bucket_name, file_key)
    if gtf_text:
        gtf_df = parse_gtf_data(gtf_text)
        filtered_genes_df = filter_genes(gtf_df)
        processed_df = extract_attributes(filtered_genes_df)

        parquet_file = convert_to_arrow(processed_df)

        arrow_table = load_arrow_table(parquet_file)

Data saved as Parquet: data/genes_filtered.parquet
Arrow Table Loaded:
seqname: int64
start: int64
end: int64
strand: string
gene_id: string
gene_name: string
gene_biotype: string
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1118


In [10]:
arrow_table = pq.read_table("data/genes_filtered.parquet")

print("Schema of the Arrow Table:")
print(arrow_table.schema)

print("\nColumn Names:", arrow_table.column_names) #check column names
print("\nMetadata:", arrow_table.schema.metadata) # show metadata of arrow

df = arrow_table.to_pandas() # convert to pandas to compare
print("\nFirst 5 rows of the Pandas DataFrame:")
print(df.head())

selected_columns = arrow_table.select(["gene_id", "gene_name"]) #to compare selective columns
print("\nArrow Table with Selected Columns:")
print(selected_columns)

Schema of the Arrow Table:
seqname: int64
start: int64
end: int64
strand: string
gene_id: string
gene_name: string
gene_biotype: string
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1118

Column Names: ['seqname', 'start', 'end', 'strand', 'gene_id', 'gene_name', 'gene_biotype', '__index_level_0__']

Metadata: {b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "seqname", "field_name": "seqname", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}, {"name": "start", "field_name": "start", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}, {"name": "end", "field_name": "end", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}, {"name": "strand", "field_name": "strand", "pandas_type": "unicode", "numpy_type"

In [11]:
df.to_parquet("data/genes_pandas.parquet", engine="pyarrow") 
# save using Pandas (it uses PyArrow under the hood)
df_pandas = pd.read_parquet("data/genes_pandas.parquet", engine="pyarrow") 
#loads it back

print("\nData loaded back using Pandas (via PyArrow):")
print(df_pandas.head())


Data loaded back using Pandas (via PyArrow):
    seqname     start       end strand          gene_id   gene_name  \
0        22  10736171  10736283      -  ENSG00000277248          U2   
3        22  10939388  10961338      -  ENSG00000283047  CU459211.1   
14       22  11066418  11068174      +  ENSG00000279973  CU104787.1   
18       22  11124337  11125705      +  ENSG00000226444    ACTR3BP6   
22       22  11249809  11249959      -  ENSG00000276871   5_8S_rRNA   

              gene_biotype  
0                    snRNA  
3   unprocessed_pseudogene  
14                 lincRNA  
18    processed_pseudogene  
22                    rRNA  


In [12]:
# Measure Pandas read time
start_time = time.time()
df_pandas = pd.read_parquet("data/genes_pandas.parquet", engine="pyarrow")
print(f"Pandas read time: {time.time() - start_time} seconds")

# Measure Arrow read time
start_time = time.time()
arrow_table = pq.read_table("data/genes_pandas.parquet")
print(f"Arrow read time: {time.time() - start_time} seconds")

Pandas read time: 0.010181903839111328 seconds
Arrow read time: 0.00826406478881836 seconds
