In [1]:
SparkContext

pyspark.context.SparkContext

In [2]:
!pwd

/home/almalinux/eda1-coursework/src/merizo_pipeline


In [3]:
!pip3 install -r requirements.txt

Defaulting to user installation because normal site-packages is not writeable
Collecting torch==2.0.1+cpu
  Using cached https://download.pytorch.org/whl/cpu/torch-2.0.1%2Bcpu-cp39-cp39-linux_x86_64.whl (195.4 MB)


In [4]:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [5]:
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [6]:
sc.master

'local[4]'

In [7]:
os.getcwd()

'/home/almalinux/eda1-coursework/src/merizo_pipeline'

In [8]:
input_dir = "/UP000000625_83333_ECOLI_v4/"

In [9]:
file_rdd = sc.wholeTextFiles(input_dir + "*.pdb")
file_paths_rdd = file_rdd.map(lambda x: (x[0], os.path.basename(x[0])))

In [10]:
from pyspark import SparkFiles
local_file_path = SparkFiles.get(input_dir + "/AF-Q46839-F1-model_v4.pdb")
local_file_path

'/UP000000625_83333_ECOLI_v4/AF-Q46839-F1-model_v4.pdb'

In [11]:
!ls

AF-P0A6S3-F1-model_v4.pdb_search.tsv   AF-Q47319-F1-model_v4.pdb_segment.tsv
AF-P0A6S3-F1-model_v4.pdb_segment.tsv  merizo_pipeline
AF-P28246-F1-model_v4.pdb_segment.tsv  pipeline_job.py
AF-P76097-F1-model_v4.pdb.parsed       pipeline_playground.ipynb
AF-P76097-F1-model_v4.pdb_search.tsv   remove_requirements.txt
AF-P76097-F1-model_v4.pdb_segment.tsv  requirements.txt
AF-P77672-F1-model_v4.pdb.parsed       results_parser.py
AF-P77672-F1-model_v4.pdb_search.tsv   setup.py
AF-P77672-F1-model_v4.pdb_segment.tsv  test.bin
AF-Q47319-F1-model_v4.pdb.parsed       test_requirements.txt
AF-Q47319-F1-model_v4.pdb_search.tsv   tmp-6c3f26804cda43c89b65f248cc62cd16


In [12]:
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
import os

In [13]:
input_dir = "/UP000000625_83333_ECOLI_v4/"
# input_dir = "/"
# Failed example : AF-P0DSE5-F1-model_v4.pdb
# Success example: AF-P75975-F1-model_v4.pdb
file_rdd = sc.binaryFiles(input_dir + "*.pdb")
file_rdd = file_rdd.sample(withReplacement=False, fraction=0.005)
file_content_rdd = file_rdd.map(lambda x: (os.path.basename(x[0]), x[1]))

In [14]:
def delete_local_file(file_path):
    try:
        os.remove(file_path)
        print(f"{file_path} local file has been deleted.")
    except FileNotFoundError:
        print(f"{file_path} does not exist.")
    except PermissionError:
        print(f"Permission denied to delete {file_path}.")
    except Exception as e:
        print(f"An error occurred: {e}")

In [15]:
def upload_file_to_hdfs(local_file_path, hdfs_file_path):
    hdfs_put_cmd = ['/home/almalinux/hadoop-3.4.0/bin/hdfs', 'dfs', '-put', local_file_path, hdfs_file_path]
    print(f'STEP 3: UPLOADING ANALYSIS OUTPUT TO HDFS: {" ".join(hdfs_put_cmd)}')
    p = Popen(hdfs_put_cmd, stdin=PIPE,stdout=PIPE, stderr=PIPE)
    out, err = p.communicate()
    # Decode the byte output to string
    print("Output:")
    print(out.decode("utf-8"))  # Decode and print the standard output
    
    if err:
        print("Error:")
        print(err.decode("utf-8"))  # Decode and print the standard  

In [16]:
def run_parser(input_file):
    """
    Run the results_parser.py over the hhr file to produce the output summary
    """
    search_file = input_file+"_search.tsv"
    print("search_file: ", search_file)
    cmd = ['python3', './results_parser.py', search_file]
    print(f'STEP 2: RUNNING PARSER: {" ".join(cmd)}')
    p = Popen(cmd, stdin=PIPE,stdout=PIPE, stderr=PIPE)
    out, err = p.communicate()
    # Decode the byte output to string
    print("Output:")
    print(out.decode("utf-8"))  # Decode and print the standard output
        
    if err:
        print("Error:")
        print(err.decode("utf-8"))  # Decode and print the standard error

In [17]:
def run_merizo_search(file_name, file_content):
    print(f"File Name: {file_name}")
    # Create a temporary file to hold the content
    with NamedTemporaryFile(delete=True, mode='wb') as temp_file:
        temp_file.write(file_content)
        temp_file_path = temp_file.name
        cmd = ['python3',
           '/home/almalinux/merizo_search/merizo_search/merizo.py',
           'easy-search',
           temp_file_path,
           '/home/almalinux/data/cath-4.3-foldclassdb',
           file_name,
           'tmp',
           '--iterate',
           '--output_headers',
           '-d',
           'cpu',
           '--threads',
           '2'
          ]
        print(f'STEP 1: RUNNING MERIZO: {" ".join(cmd)}')
        p = Popen(cmd, stdin=PIPE,stdout=PIPE, stderr=PIPE)
        out, err = p.communicate()
        # Decode the byte output to string
        print("Output:")
        print(out.decode("utf-8"))  # Decode and print the standard output
        
        if err:
            print("Error:")
            print(err.decode("utf-8"))  # Decode and print the standard 


In [18]:
def upload_analysis_outputs_to_hdfs(file_name):
    # upload anaylsis output files to hdfs and clean local files
    local_files_paths = [ file_name + '_segment.tsv', file_name + '_search.tsv', file_name + '.parsed']
    hdfs_file_path = '/analysis_outputs/'
    for local_file_path in local_files_paths:
        upload_file_to_hdfs(local_file_path, hdfs_file_path)
        delete_local_file(local_file_path)

In [19]:
def combine_parsed_dict_and_mean(analysis_output1, analysis_output2):
    cath_counts_dict1, mean_plddt1 = analysis_output1
    cath_counts_dict2, mean_plddt2 = analysis_output2

    for key in cath_counts_dict2:
        if key in cath_counts_dict1:
            cath_counts_dict1[key] +=cath_counts_dict2[key]
        else:
            cath_counts_dict1[key] = cath_counts_dict2[key]

    mean_plddt = (mean_plddt1 + mean_plddt2) / 2.0

    return cath_counts_dict1, mean_plddt

analysis_output1 = ({'3.40.710.10': 1}, 98.4244)
analysis_output2 = ({'3.40.710.10': 3, '2.30.810.20': 2}, 88.4244)

combine_parsed_dict_and_mean(analysis_output1, analysis_output2)

({'3.40.710.10': 4, '2.30.810.20': 2}, 93.4244)

In [20]:
def combine_parsed_dict(cath_counts_dict1, cath_counts_dict2):

    for key in cath_counts_dict2:
        if key in cath_counts_dict1:
            cath_counts_dict1[key] +=cath_counts_dict2[key]
        else:
            cath_counts_dict1[key] = cath_counts_dict2[key]

    return cath_counts_dict1

cath_counts_dict1 = {'3.40.710.10': 1}
cath_counts_dict2 = {'3.40.710.10': 3, '2.30.810.20': 2}

combine_parsed_dict(cath_counts_dict1, cath_counts_dict2)

{'3.40.710.10': 4, '2.30.810.20': 2}

In [25]:
import os 

def read_parsed_file(file_name):
    """
    Reads a .parsed file and extracts:
    1. A dictionary with 'cath_id' as keys and their counts as values.
    2. The mean pLDDT value.
    
    Args:
        file_path (str): Path to the .parsed file.
        
    Returns:
        tuple: (dict of cath_id counts, mean pLDDT value)
    """
    cath_counts = {}
    mean_plddt = 0

    file_path = file_name + '.parsed'

    if not os.path.exists(file_path):
        return mean_plddt, cath_counts
    
    with open(file_path, 'r') as file:
        lines = file.readlines()
        
        # Extract mean pLDDT from the header
        if len(lines) < 1:
            return cath_counts, mean_plddt

        first_line = lines[0]
        if "mean plddt:" in first_line:
            mean_plddt = float(first_line.split("mean plddt:")[1].strip())
        
        # Skip the header and process the data rows
        for line in lines[2:]:  # Assuming data rows start from the 3rd line
            if not line.strip():
                continue # Ignore empty lines
            cath_id, count = line.strip().split(',')
            cath_counts[cath_id] = int(count)
    
    return mean_plddt, cath_counts

# # Apply the function to the uploaded file
# file_name = "AF-P00811-F1-model_v4.pdb"
# mean_plddt, cath_counts = read_parsed_file(file_name)
# mean_plddt, cath_counts

In [26]:
def pipeline(file_tuple):
    file_name, file_content = file_tuple
    # STEP 1
    run_merizo_search(file_name, file_content)
    # STEP 2
    run_parser(file_name)
    # STEP 3
    mean_plddt, cath_counts_dict = read_parsed_file(file_name)
    # STEP 4
    upload_analysis_outputs_to_hdfs(file_name)
    return mean_plddt, cath_counts_dict 

In [27]:
pipeline_rdd = file_content_rdd.map(pipeline)

In [None]:
results_rdd = pipeline_rdd.collect()

In [29]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType, MapType, IntegerType

schema = StructType([
    StructField("mean_plddt", FloatType(), False),
    StructField("cath_counts_dict", MapType(StringType(), IntegerType()), False)
])
df = spark.createDataFrame(results_rdd, schema)
df.show()

+----------+--------------------+
|mean_plddt|    cath_counts_dict|
+----------+--------------------+
|   95.1058|{6.10.140.1350 ->...|
|   95.7457|  {3.40.50.620 -> 1}|
|   97.7731|{3.40.50.720 -> 1...|
|  98.40875|{3.40.50.1970 -> ...|
|       0.0|                  {}|
|   90.7456|{1.10.287.1060 ->...|
|   97.2253|{3.90.25.10 -> 1,...|
|       0.0|                  {}|
|   93.1894|  {3.40.50.300 -> 1}|
|       0.0|                  {}|
|  93.13824|{2.30.30.280 -> 1...|
|   97.6179|{3.30.360.20 -> 1...|
|   93.2427|{UNASSIGNED -> 1,...|
|       0.0|                  {}|
|   96.0342|   {3.40.47.10 -> 2}|
|   92.0031|  {1.20.950.20 -> 1}|
|   93.2138|    {1.10.8.50 -> 1}|
+----------+--------------------+



In [30]:
import math
from collections import Counter
import time

# Define the zero value for the accumulator: (float_sum, float_sum_of_squares, float_count, combined_dict)
zero_value = (0.0, 0.0, 0, {})

# Function to combine a single record with the accumulator
def seq_op(accum, value):
    float_sum, float_sum_of_squares, float_count, combined_dict = accum
    new_float, new_dict = value
    return (
        float_sum + new_float,
        float_sum_of_squares + new_float ** 2,
        float_count + 1,
        dict(Counter(combined_dict) + Counter(new_dict)),
    )

# Function to merge two accumulators
def comb_op(accum1, accum2):
    float_sum1, float_sum_of_squares1, float_count1, dict1 = accum1
    float_sum2, float_sum_of_squares2, float_count2, dict2 = accum2
    return (
        float_sum1 + float_sum2,
        float_sum_of_squares1 + float_sum_of_squares2,
        float_count1 + float_count2,
        dict(Counter(dict1) + Counter(dict2)),
    )

# Use aggregate to compute the result
start_time = time.time()
result = pipeline_rdd.aggregate(zero_value, seq_op, comb_op)

# Extract results
float_sum, float_sum_of_squares, float_count, combined_dict = result
mean = float_sum / float_count

# std_dev = math.sqrt(float_sum_of_squares / (float_count - mean) ** 2)
# sample_variance = (float_sum_of_squares - (float_sum ** 2) / float_count) / (float_count - 1)
# sample_std_dev = math.sqrt(sample_variance) if float_count > 1 else 0.0
population_variance = (float_sum_of_squares - (float_sum ** 2) / float_count) / float_count
population_std_dev = math.sqrt(population_variance)

end_time = time.time()
total_time = end_time - start_time
print(f"Done with the pipeline in {total_time:.2f} seconds")  

# Final results
print("Combined Dictionary:", combined_dict)
print("Mean of Float Values:", mean)
print("Standard Deviation of Float Values:", population_std)


File Name: AF-P25714-F1-model_v4.pdb
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmpwg5vy85r /home/almalinux/data/cath-4.3-foldclassdb AF-P25714-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
File Name: AF-P77409-F1-model_v4.pdb
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmptljtctku /home/almalinux/data/cath-4.3-foldclassdb AF-P77409-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
File Name: AF-P05719-F1-model_v4.pdb
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmp8epzrgs5 /home/almalinux/data/cath-4.3-foldclassdb AF-P05719-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
Output:

Error:
2024-12-25 18:56:29,449 | INFO | Starting easy-search with command: 

/home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmptljtctku /home/almalinux/data/

Done with the pipeline in 195.10 seconds
Combined Dictionary: {'3.90.220.20': 2, '6.10.140.1350': 1, '3.40.50.620': 2, '3.40.50.720': 2, '3.30.360.10': 1, '3.40.50.1970': 1, '1.20.1090.10': 1, '1.10.287.1060': 1, '3.40.1380.10': 1, '3.90.25.10': 1, '3.40.50.300': 1, '2.30.30.280': 1, '2.40.30.10': 1, '3.30.110.140': 1, '3.30.360.20': 1, '3.30.70.1560': 1, 'UNASSIGNED': 1, '3.40.47.10': 2, '1.20.950.20': 1, '1.10.8.50': 1}
Mean of Float Values: 72.55550490196077


                                                                                

NameError: name 'sample_std_dev' is not defined

In [31]:
# Final results
print("Combined Dictionary:", combined_dict)
print("Mean of Float Values:", mean)
print("Standard Deviation of Float Values:", population_std_dev)


Combined Dictionary: {'3.90.220.20': 2, '6.10.140.1350': 1, '3.40.50.620': 2, '3.40.50.720': 2, '3.30.360.10': 1, '3.40.50.1970': 1, '1.20.1090.10': 1, '1.10.287.1060': 1, '3.40.1380.10': 1, '3.90.25.10': 1, '3.40.50.300': 1, '2.30.30.280': 1, '2.40.30.10': 1, '3.30.110.140': 1, '3.30.360.20': 1, '3.30.70.1560': 1, 'UNASSIGNED': 1, '3.40.47.10': 2, '1.20.950.20': 1, '1.10.8.50': 1}
Mean of Float Values: 72.55550490196077
Standard Deviation of Float Values: 40.29962422764759


In [39]:
# Convert results to DataFrames
# 1. Statistics DataFrame
stats_df = spark.createDataFrame(
    [("human", mean, population_std_dev), ("ecoli", mean, population_std_dev)],
    schema=StructType([
        StructField("organism", StringType(), True),
        StructField("mean plddt", FloatType(), True),
        StructField("plddt std", FloatType(), True),
    ])
).coalesce(1)

stats_df.show()

# 2. Dictionary DataFrame
dict_df = spark.createDataFrame(
    [(key, value) for key, value in combined_dict.items()],
    schema=StructType([
        StructField("cath_code", StringType(), True),
        StructField("count", IntegerType(), True),
    ])
).coalesce(1)

dict_df.show()


+--------+----------+---------+
|organism|mean plddt|plddt std|
+--------+----------+---------+
|   human| 72.555504|40.299625|
|   ecoli| 72.555504|40.299625|
+--------+----------+---------+

+-------------+-----+
|    cath_code|count|
+-------------+-----+
|  3.90.220.20|    2|
|6.10.140.1350|    1|
|  3.40.50.620|    2|
|  3.40.50.720|    2|
|  3.30.360.10|    1|
| 3.40.50.1970|    1|
| 1.20.1090.10|    1|
|1.10.287.1060|    1|
| 3.40.1380.10|    1|
|   3.90.25.10|    1|
|  3.40.50.300|    1|
|  2.30.30.280|    1|
|   2.40.30.10|    1|
| 3.30.110.140|    1|
|  3.30.360.20|    1|
| 3.30.70.1560|    1|
|   UNASSIGNED|    1|
|   3.40.47.10|    2|
|  1.20.950.20|    1|
|    1.10.8.50|    1|
+-------------+-----+



In [41]:
# Write DataFrames to HDFS as CSV
stats_df.write.option("header","true").mode("overwrite").csv("/summary_outputs/pIDDT_means")
dict_df.write.option("header","true").mode("overwrite").csv("/summary_outputs/ecoli_cath_summary")

In [52]:
def write_df_to_hdfs_csv(df, hdfs_path, csv_file_name):
    print(f'WRITING ANALYSIS SUMMARY OUTPUT {csv_file_name} TO HDFS...')
    write_path = hdfs_path + csv_file_name
    df.write.option("header","true").mode("overwrite").csv(write_path)
    hdfs_mv_cmd = ['hdfs', 'dfs', '-mv', write_path + '/part-00000-*.csv', write_path + '.csv']
    p = Popen(hdfs_mv_cmd, stdin=PIPE,stdout=PIPE, stderr=PIPE)
    out, err = p.communicate()
    # Decode the byte output to string
    print("Output:")
    print(out.decode("utf-8"))  # Decode and print the standard output
    
    if err:
        print("Error:")
        print(err.decode("utf-8"))  # Decode and print the standard  
    

In [53]:
write_df_to_hdfs_csv(stats_df, "/summary_outputs/", "pIDDT_means")
write_df_to_hdfs_csv(dict_df, "/summary_outputs/", "ecoli_cath_summary")

WRITING ANALYSIS SUMMARY OUTPUT pIDDT_means TO HDFS...
Output:

WRITING ANALYSIS SUMMARY OUTPUT ecoli_cath_summary TO HDFS...
Output:



In [36]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, FloatType, StringType, MapType, IntegerType

def process_with_dataframes(rdd):
    """
    Convert RDD of (float, dict) to DataFrame and calculate statistics
    
    Args:
        rdd: RDD containing tuples of (mean_plddt, cath_counts_dict)
    Returns:
        Tuple of (stats_dict, combined_counts_dict)
    """
    # Define schema for our data
    schema = StructType([
        StructField("mean_plddt", FloatType(), False),
        StructField("cath_counts_dict", MapType(StringType(), IntegerType()), False)
    ])
    
    # Convert RDD to DataFrame
    df = spark.createDataFrame(rdd, schema)
    df.show()
    
    # Calculate statistics for float values
    stats_df = df.select(
        F.mean("mean_plddt").alias("mean"),
        F.stddev_pop("mean_plddt").alias("std_dev")
    )
    stats_df.show()
    
    # Combine dictionaries using explode and aggregation
    counts_df = df.select(
        F.explode("cath_counts_dict").alias("id", "count")
    ).groupBy(
        "id"
    ).agg(
        F.sum("count").alias("total_count")
    )

    counts_df.show()
    
    # # Convert results to Python dictionaries
    # statistics = {
    #     "mean": stats_df["mean"],
    #     "std_dev": stats_df["std_dev"] if stats_df["std_dev"] is not None else 0.0
    # }
    
    # combined_counts = dict(counts_df.collect())
    
    return stats_df, counts_df

# Example usage:
"""
# Assuming your map function returns (mean_plddt, cath_counts_dict)
mapped_rdd = rdd.map(your_map_function)
stats, counts = process_with_dataframes(mapped_rdd)

print(f"Mean: {stats['mean']}")
print(f"Standard Deviation: {stats['std_dev']}")
print("Combined counts:", counts)
"""

'\n# Assuming your map function returns (mean_plddt, cath_counts_dict)\nmapped_rdd = rdd.map(your_map_function)\nstats, counts = process_with_dataframes(mapped_rdd)\n\nprint(f"Mean: {stats[\'mean\']}")\nprint(f"Standard Deviation: {stats[\'std_dev\']}")\nprint("Combined counts:", counts)\n'

In [35]:
start_time = time.time()
results_rdd = pipeline_rdd.collect()
stats, combined_counts = process_with_dataframes(results_rdd)
end_time = time.time()
total_time = end_time - start_time
print(f"Done with the pipeline using collect in {total_time:.2f} seconds")
stats, combined_counts

File Name: AF-P25714-F1-model_v4.pdb
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmpcmjlqvta /home/almalinux/data/cath-4.3-foldclassdb AF-P25714-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
File Name: AF-P77409-F1-model_v4.pdb                                (0 + 3) / 7]
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmpxjvdqft6 /home/almalinux/data/cath-4.3-foldclassdb AF-P77409-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
File Name: AF-P05719-F1-model_v4.pdb
STEP 1: RUNNING MERIZO: python3 /home/almalinux/merizo_search/merizo_search/merizo.py easy-search /tmp/tmpg0v_6_6p /home/almalinux/data/cath-4.3-foldclassdb AF-P05719-F1-model_v4.pdb tmp --iterate --output_headers -d cpu --threads 2
Output:

Error:
2024-12-25 19:03:54,816 | INFO | Starting easy-search with command: 

/home/almalinux/merizo_search/merizo_search/merizo.py easy-s

+----------+--------------------+
|mean_plddt|    cath_counts_dict|
+----------+--------------------+
|   95.1058|{6.10.140.1350 ->...|
|   95.7457|  {3.40.50.620 -> 1}|
|   97.7731|{3.40.50.720 -> 1...|
|  98.40875|{3.40.50.1970 -> ...|
|       0.0|                  {}|
|   90.7456|{1.10.287.1060 ->...|
|   97.2253|{3.90.25.10 -> 1,...|
|       0.0|                  {}|
|   93.1894|  {3.40.50.300 -> 1}|
|       0.0|                  {}|
|  93.13824|{2.30.30.280 -> 1...|
|   97.6179|{3.30.360.20 -> 1...|
|   93.2427|{UNASSIGNED -> 1,...|
|       0.0|                  {}|
|   96.0342|   {3.40.47.10 -> 2}|
|   92.0031|  {1.20.950.20 -> 1}|
|   93.2138|    {1.10.8.50 -> 1}|
+----------+--------------------+

+-----------------+------------------+
|             mean|           std_dev|
+-----------------+------------------+
|72.55550474279067|41.539901793132884|
+-----------------+------------------+

+-------------+-----------+
|           id|total_count|
+-------------+-----------+
|  3.

(DataFrame[mean: double, std_dev: double],
 DataFrame[id: string, total_count: bigint])

In [None]:
def combine_parsed_dict_and_mean(analysis_output1, analysis_output2):
    cath_counts_dict1, mean_plddt1 = analysis_output1
    cath_counts_dict2, mean_plddt2 = analysis_output2

    for key in cath_counts_dict2:
        if key in cath_counts_dict1:
            cath_counts_dict1[key] +=cath_counts_dict2[key]
        else:
            cath_counts_dict1[key] = cath_counts_dict2[key]

    mean_plddt = (mean_plddt1 + mean_plddt2) / 2.0

    return cath_counts_dict1, mean_plddt

analysis_output1 = ({'3.40.710.10': 1}, 98.4244)
analysis_output2 = ({'3.40.710.10': 3, '2.30.810.20': 2}, 88.4244)

combine_parsed_dict_and_mean(analysis_output1, analysis_output2)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, FloatType, MapType, StringType, IntegerType
import time
import random
from typing import Tuple, Dict
from statistics import mean, stdev

def generate_test_data(n: int) -> list[Tuple[float, Dict[str, int]]]:
    """Generate sample data for testing"""
    return [
        (
            random.uniform(1, 100),  # float value
            {f"id_{random.randint(1, 10)}": random.randint(1, 100) for _ in range(5)}  # count dict
        )
        for _ in range(n)
    ]

def benchmark_dataframe_approach(spark, data):
    """Benchmark DataFrame approach"""
    
    # Define schema
    schema = StructType([
        StructField("float_value", FloatType(), False),
        StructField("count_dict", MapType(StringType(), IntegerType()), False)
    ])
    
    # Create DataFrame
    df = spark.createDataFrame(data, schema)
    start_time = time.time()
    # Calculate statistics
    stats = df.select(
        F.mean("float_value").alias("mean"),
        F.stddev("float_value").alias("std_dev")
    ).first()
    
    # Combine dictionaries
    counts_df = df.select(
        F.explode("count_dict").alias("id", "count")
    ).groupBy(
        "id"
    ).agg(
        F.sum("count").alias("total_count")
    )
    
    combined_counts = dict(counts_df.collect())
    
    end_time = time.time()
    return end_time - start_time, stats, combined_counts

def benchmark_reduce_approach(spark, data):
    """Benchmark reduce approach"""
    start_time = time.time()
    
    rdd = spark.sparkContext.parallelize(data)
    
    # First reduction for float values
    float_values = rdd.map(lambda x: x[0]).collect()
    float_stats = {
        'mean': mean(float_values),
        'std_dev': stdev(float_values) if len(float_values) > 1 else 0.0
    }
    
    # Second reduction for dictionaries
    combined_counts = rdd.map(lambda x: x[1]) \
        .reduce(lambda acc, curr: {
            k: acc.get(k, 0) + curr.get(k, 0) 
            for k in set(acc) | set(curr)
        })
    
    end_time = time.time()
    return end_time - start_time, float_stats, combined_counts

def run_benchmark(sizes=[1000, 10000, 100000]):
    """Run benchmark with different data sizes"""
    spark = SparkSession.builder \
        .appName("BenchmarkTest") \
        .getOrCreate()
    
    results = []
    
    for size in sizes:
        print(f"\nBenchmarking with {size} records:")
        data = generate_test_data(size)
        
        # Warm up Spark
        spark.sparkContext.parallelize([1]).collect()
        
        # Run DataFrame benchmark
        df_time, df_stats, df_counts = benchmark_dataframe_approach(spark, data)
        print(f"DataFrame approach took: {df_time:.2f} seconds")
        print(f"DataFrame stats: Mean={df_stats['mean']:.2f}, StdDev={df_stats['std_dev']:.2f}")
        
        # Run Reduce benchmark
        reduce_time, reduce_stats, reduce_counts = benchmark_reduce_approach(spark, data)
        print(f"Reduce approach took: {reduce_time:.2f} seconds")
        print(f"Reduce stats: Mean={reduce_stats['mean']:.2f}, StdDev={reduce_stats['std_dev']:.2f}")
        
        results.append({
            'size': size,
            'dataframe_time': df_time,
            'reduce_time': reduce_time,
            'df_stats': df_stats,
            'reduce_stats': reduce_stats
        })
    
    return results

In [None]:
def benchmark_dataframe_approach(spark, data):

    # Define schema
    schema = StructType([
        StructField("float_value", FloatType(), False),
        StructField("count_dict", MapType(StringType(), IntegerType()), False)
    ])
    
    # Start timing after DataFrame creation to be fair
    df = spark.createDataFrame(data, schema)
    
    start_time = time.time()
    stats = df.select(
        F.mean("float_value").alias("mean"),
        F.stddev("float_value").alias("std_dev")
    ).first()
    
    counts_df = df.select(
        F.explode("count_dict").alias("id", "count")
    ).groupBy("id").agg(F.sum("count").alias("total_count"))
    
    combined_counts = dict(counts_df.collect())
    end_time = time.time()
    
    return end_time - start_time, stats, combined_counts

def benchmark_reduce_approach(spark, data):
    rdd = spark.sparkContext.parallelize(data)
    
    start_time = time.time()
    
    # Initialize with first element
    first = data[0]
    initial = ([first[0]], first[1])
    
    def combine_fn(acc, curr):
        values, counts = acc
        values.append(curr[0])
        for k, v in curr[1].items():
            counts[k] = counts.get(k, 0) + v
        return values, counts
        
    values, counts = rdd.reduce(combine_fn)
    
    stats = {
        'mean': mean(values),
        'std_dev': stdev(values) if len(values) > 1 else 0.0
    }
    end_time = time.time()
    
    return end_time - start_time, stats, counts

In [None]:
if __name__ == "__main__":
    results = run_benchmark()
    
    # Print summary
    print("\nSummary:")
    print("Size\tDataFrame(s)\tReduce(s)\tDifference(%)")
    for r in results:
        diff_percent = ((r['reduce_time'] - r['dataframe_time']) / r['dataframe_time']) * 100
        print(f"{r['size']}\t{r['dataframe_time']:.2f}\t\t{r['reduce_time']:.2f}\t\t{diff_percent:.1f}%")

Output:

AF-P05719-F1-model_v4.pdb_search.tsv local file has been deleted.
STEP 3: UPLOADING ANALYSIS OUTPUT TO HDFS: hdfs dfs -put AF-P05719-F1-model_v4.pdb.parsed /analysis_outputs/
