# Dataset Management - Combined Version using PySpark

This notebook combines manage1.ipynb and manage1(vers2).ipynb functionality using PySpark for efficient data processing.


In [None]:
# Initialize PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

# Create Spark session
spark = SparkSession.builder \
    .appName("EpitopeDatasetManagement") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("Spark session created successfully")


## Part 1: Load and Preprocess Data (from manage1.ipynb)


In [None]:
# Load positive and negative datasets
pos_pd = pd.read_csv('positif.csv')
neg_pd = pd.read_excel('negatif.xlsx')

# Convert to Spark DataFrames
pos_df = spark.createDataFrame(pos_pd)
neg_df = spark.createDataFrame(neg_pd)

print(f"Positive samples: {pos_df.count()}")
print(f"Negative samples: {neg_df.count()}")


In [None]:
# Add labels and actual columns
pos_df = pos_df.withColumn('label', lit('E')) \
    .withColumn('actual', expr("repeat('E', length(`Epitope - Name`))"))

neg_df = neg_df.withColumn('label', lit('.')) \
    .withColumn('actual', expr("repeat('.', length(`Epitope - Name`))"))

# Select required columns
sel_col = ['Epitope - Starting Position', 'Epitope - Ending Position', 'Epitope - Name', 'label', 'actual']
pos_df = pos_df.select(sel_col)
neg_df = neg_df.select(sel_col)

# Combine datasets
df1_merge = pos_df.union(neg_df)
print(f"Combined dataset size: {df1_merge.count()}")


In [None]:
# Define protein preprocessing function
def protein_preprocessing(sequence):
    """Filter valid amino acid characters"""
    amino_acids = "ACDEFGHIKLMNPQRSTVWY"
    processed = ''.join([char for char in sequence if char.upper() in amino_acids])
    return processed

# Register UDF
from pyspark.sql.functions import udf
preprocess_udf = udf(protein_preprocessing, StringType())

# Apply preprocessing
df1_merge = df1_merge.withColumn('Epitope - Name', preprocess_udf(col('Epitope - Name')))

# Drop null values
df1_merge = df1_merge.filter(
    col('Epitope - Starting Position').isNotNull() & 
    col('Epitope - Ending Position').isNotNull()
)

print(f"After preprocessing: {df1_merge.count()} rows")
df1_merge.show(5)


In [None]:
# Save dataset_type_1.csv
df1_merge_pd = df1_merge.toPandas()
df1_merge_pd.to_csv('dataset_type_1.csv', index=False)
print("Saved dataset_type_1.csv")


In [None]:
# Define function to create position sequence
def create_position_sequence(start, end):
    """Create list of positions from start to end"""
    return [float(start + i) for i in range(int(end - start + 1))]

# Register UDF
from pyspark.sql.functions import explode, arrays_zip, split, lit, col

position_sequence_udf = udf(create_position_sequence, ArrayType(DoubleType()))

# Create arrays for amino acids, labels, and positions
df1_merge = df1_merge.withColumn(
    'amino_array',
    split(col('Epitope - Name'), '')
).withColumn(
    'label_array',
    split(col('actual'), '')
).withColumn(
    'position_array',
    position_sequence_udf(
        col('Epitope - Starting Position'),
        col('Epitope - Ending Position')
    )
)

# Explode arrays to create individual rows
df2_merge = df1_merge.select(
    explode(arrays_zip(
        col('amino_array'),
        col('label_array'),
        col('position_array')
    )).alias('zipped')
).select(
    col('zipped.0').alias('amino'),
    col('zipped.1').alias('label'),
    col('zipped.2').alias('Position')
)

# Filter out empty amino acids
df2_merge = df2_merge.filter(col('amino') != '')

print(f"Dataset type 2 size: {df2_merge.count()}")
df2_merge.show(10)


## Part 2: Add Hydrophobicity Scales (from manage1(vers2).ipynb)


In [None]:
# Define hydrophobicity scales
def kyte_doolittle_scale(aa):
    scale = {'A': 1.8, 'C': 2.5, 'D': -3.5, 'E': -3.5, 'F': 2.8, 'G': -0.4, 'H': -3.2, 'I': 4.5, 'K': -3.9, 'L': 3.8, 'M': 1.9, 'N': -3.5, 'P': -1.6, 'Q': -3.5, 'R': -4.5, 'S': -0.8, 'T': -0.7, 'V': 4.2, 'W': -0.9, 'Y': -1.3}
    return scale.get(aa, 0.0)

def hopp_woods_scale(aa):
    scale = {'A': -0.5, 'C': -1.0, 'D': 3.0, 'E': 3.0, 'F': -2.5, 'G': 0.0, 'H': -0.5, 'I': -1.8, 'K': 3.0, 'L': -1.8, 'M': -1.3, 'N': 0.2, 'P': 0.0, 'Q': 0.2, 'R': 3.0, 'S': 0.3, 'T': -0.4, 'V': -1.5, 'W': -3.4, 'Y': -2.3}
    return scale.get(aa, 0.0)

def cornette_scale(aa):
    scale = {'A': 0.2, 'C': 4.1, 'D': -3.1, 'E': -1.8, 'F': 4.4, 'G': 0.0, 'H': 0.5, 'I': 4.8, 'K': -3.1, 'L': 5.7, 'M': 4.2, 'N': -0.5, 'P': -2.2, 'Q': -2.8, 'R': 1.4, 'S': -0.5, 'T': -1.9, 'V': 4.7, 'W': 1.0, 'Y': 3.2}
    return scale.get(aa, 0.0)

def eisenberg_scale(aa):
    scale = {'A': 0.62, 'C': 0.29, 'D': -0.90, 'E': -0.74, 'F': 1.19, 'G': 0.48, 'H': -0.40, 'I': 1.38, 'K': -1.50, 'L': 1.06, 'M': 0.64, 'N': -0.78, 'P': 0.12, 'Q': -0.85, 'R': -2.53, 'S': -0.18, 'T': -0.05, 'V': 1.08, 'W': 0.81, 'Y': 0.26}
    return scale.get(aa, 0.0)

def rose_scale(aa):
    scale = {'A': 0.74, 'C': 0.91, 'D': 0.62, 'E': 0.62, 'F': 0.88, 'G': 0.72, 'H': 0.78, 'I': 0.88, 'K': 0.52, 'L': 0.85, 'M': 0.85, 'N': 0.63, 'P': 0.64, 'Q': 0.62, 'R': 0.64, 'S': 0.66, 'T': 0.70, 'V': 0.86, 'W': 0.85, 'Y': 0.76}
    return scale.get(aa, 0.0)

def janin_scale(aa):
    scale = {'A': 0.30, 'C': 0.90, 'D': -0.60, 'E': -0.70, 'F': 0.50, 'G': 0.30, 'H': -0.10, 'I': 0.70, 'K': -1.80, 'L': 0.50, 'M': 0.40, 'N': -0.50, 'P': -0.30, 'Q': -0.70, 'R': -1.40, 'S': -0.10, 'T': -0.20, 'V': 0.60, 'W': 0.30, 'Y': -0.40}
    return scale.get(aa, 0.0)

def engelman_ges_scale(aa):
    scale = {'A': 1.60, 'C': 2.00, 'D': -9.20, 'E': -8.20, 'F': 3.70, 'G': 1.00, 'H': -3.00, 'I': 3.10, 'K': -8.80, 'L': 2.80, 'M': 3.40, 'N': -4.80, 'P': -0.20, 'Q': -4.10, 'R': -12.3, 'S': 0.60, 'T': 1.20, 'V': 2.60, 'W': 1.90, 'Y': -0.70}
    return scale.get(aa, 0.0)

# Register UDFs
kyte_doolittle_udf = udf(kyte_doolittle_scale, DoubleType())
hopp_woods_udf = udf(hopp_woods_scale, DoubleType())
cornette_udf = udf(cornette_scale, DoubleType())
eisenberg_udf = udf(eisenberg_scale, DoubleType())
rose_udf = udf(rose_scale, DoubleType())
janin_udf = udf(janin_scale, DoubleType())
engelman_ges_udf = udf(engelman_ges_scale, DoubleType())

# Add hydrophobicity scale columns
df2_merge = df2_merge \
    .withColumn('Kyte-Doolittle', kyte_doolittle_udf(col('amino'))) \
    .withColumn('Hopp-Woods', hopp_woods_udf(col('amino'))) \
    .withColumn('Cornette', cornette_udf(col('amino'))) \
    .withColumn('Eisenberg', eisenberg_udf(col('amino'))) \
    .withColumn('Rose', rose_udf(col('amino'))) \
    .withColumn('Janin', janin_udf(col('amino'))) \
    .withColumn('Engelman GES', engelman_ges_udf(col('amino')))

df2_merge.show(5)


In [None]:
# Calculate z-score for Position
from pyspark.sql.window import Window

# Calculate mean and std for Position
pos_stats = df2_merge.agg(
    avg('Position').alias('mean_pos'),
    stddev('Position').alias('std_pos')
).collect()[0]

mean_pos = pos_stats['mean_pos']
std_pos = pos_stats['std_pos']

# Add z-score column
df2_merge = df2_merge.withColumn(
    'Position z-score',
    (col('Position') - lit(mean_pos)) / lit(std_pos)
)

print(f"Position mean: {mean_pos:.2f}, std: {std_pos:.2f}")
df2_merge.select('amino', 'Position', 'Position z-score', 'Kyte-Doolittle').show(10)


In [None]:
# Save final dataset
df2_final = df2_merge.select(
    'amino', 'Position', 'label',
    'Kyte-Doolittle', 'Hopp-Woods', 'Cornette', 'Eisenberg', 
    'Rose', 'Janin', 'Engelman GES', 'Position z-score'
)

df2_final_pd = df2_final.toPandas()
df2_final_pd.to_csv('dataset_type_2_vers2_hidropobicity.csv', index=False)
print(f"Saved dataset_type_2_vers2_hidropobicity.csv with {len(df2_final_pd)} rows")

# Also save basic version
df2_basic = df2_merge.select('amino', 'Position', 'label')
df2_basic_pd = df2_basic.toPandas()
df2_basic_pd.to_csv('dataset_type_2.csv', index=False)
print(f"Saved dataset_type_2.csv with {len(df2_basic_pd)} rows")


In [None]:
# Summary statistics
print("Dataset Summary:")
print(f"Total rows: {df2_final.count()}")
print(f"Epitope labels: {df2_final.filter(col('label') == 'E').count()}")
print(f"Non-epitope labels: {df2_final.filter(col('label') == '.').count()}")

print("\nColumn Statistics:")
df2_final.describe().show()

# Stop Spark session
spark.stop()
print("\nSpark session stopped. Processing complete!")
