In [89]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from pyspark.sql.functions import col
from pyspark.sql.types import LongType, IntegerType, StructType, StructField
from pyspark.sql.functions import lit

In [86]:
spark = (
    SparkSession
    .builder
    .appName("leash belka3")
    .config("spark.driver.memory", "48g")  # Increased driver memory
    .config("spark.executor.memory", "48g")  # Increased executor memory
    .config("spark.executor.instances", "16")  # 16 executors
    .config("spark.executor.cores", "4")  # 4 cores per executor
    .config("spark.driver.maxResultSize", "4g")  # Driver result size limit
    .config("spark.local.dir", "temp")  # Specify a directory with enough space
    .config("spark.shuffle.file.buffer", "128k")  # Shuffle buffer size
    .config("spark.memory.fraction", "0.8")  # Memory fraction for tasks
    .config("spark.shuffle.memoryFraction", "0.6")  # Shuffle memory fraction
    .config("spark.executor.javaOptions", "-Xmx48g")  # JVM heap size for executors
    .master("local[64]")  # Use all 64 cores on the machine
    .getOrCreate()
)

spark

24/12/24 20:46:19 WARN Utils: Your hostname, kanjur resolves to a loopback address: 127.0.1.1; using 10.119.2.14 instead (on interface eno3)
24/12/24 20:46:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/24 20:46:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/24 20:46:20 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [87]:
df = spark.read.format('parquet').load('test.parquet')

print(df.rdd.getNumPartitions())
print(df.count())
df.show()

                                                                                

8


                                                                                

1674896


                                                                                

+---------+---------------------+---------------------+---------------------+--------------------+------------+
|       id|buildingblock1_smiles|buildingblock2_smiles|buildingblock3_smiles|     molecule_smiles|protein_name|
+---------+---------------------+---------------------+---------------------+--------------------+------------+
|295246830| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|        BRD4|
|295246831| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|         HSA|
|295246832| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|         sEH|
|295246833| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| CC(O)Cn1cnc2c(N)n...|C#CCCC[C@H](Nc1nc...|        BRD4|
|295246834| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| CC(O)Cn1cnc2c(N)n...|C#CCCC[C@H](Nc1nc...|         HSA|
|295246835| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| CC(O)Cn1cnc2c(N)n...|C#CCCC[C@H](Nc1nc...|      

In [93]:
df = df.withColumn('binds', lit(2))
df.show()

+---------+---------------------+---------------------+---------------------+--------------------+------------+-----+
|       id|buildingblock1_smiles|buildingblock2_smiles|buildingblock3_smiles|     molecule_smiles|protein_name|binds|
+---------+---------------------+---------------------+---------------------+--------------------+------------+-----+
|295246830| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|        BRD4|    2|
|295246831| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|         HSA|    2|
|295246832| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1|       C=Cc1ccc(N)cc1|C#CCCC[C@H](Nc1nc...|         sEH|    2|
|295246833| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| CC(O)Cn1cnc2c(N)n...|C#CCCC[C@H](Nc1nc...|        BRD4|    2|
|295246834| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| CC(O)Cn1cnc2c(N)n...|C#CCCC[C@H](Nc1nc...|         HSA|    2|
|295246835| C#CCCC[C@H](NC(=O...|       C=Cc1ccc(N)cc1| 

In [95]:
df = df.repartition(1)

In [96]:
print(df.rdd.getNumPartitions())

1


In [98]:
df.write.format('parquet').mode('overwrite').option('header', True).save('test2.parquet')

                                                                                

///////////////////////////////////////////////////////////

In [4]:
import pandas as pd
import numpy as np
import os

import dask
import dask.dataframe as dd

from collections import Counter
import re

import joblib
from tqdm.auto import trange, tqdm
from IPython.display import display

from torch.utils.data import IterableDataset

from rich.progress import Progress, BarColumn, TaskProgressColumn, TimeRemainingColumn, TimeElapsedColumn, MofNCompleteColumn
import gc

protein_map = {'BRD4': 1, 'HSA': 2, 'sEH': 3}
vocab = {'C': 6825082866, '#': 81527490, '@': 511451694, 'H': 456489972, '=': 1406606874, 'O': 2554179786, 'N': 2469595230, 
         'c': 12257477022, '-': 438483636, '.': 216945504, 'l': 491088828, 'B': 123330132, 'r': 121915914, 'n': 1997759694, 
         'D': 295246830, 'y': 295246830, 'o': 67918650, 's': 156618468, 'S': 90662574, 'F': 492710238, '+': 65206260, 
         'i': 1414026, '/': 11547096, 'I': 23972994}

In [5]:
def make_vocab(dff, update=None):
    letter_counts = Counter(update) if update else Counter()
    l = dff.drop(columns=['id', 'protein_name', 'binds']).to_numpy().flatten()
    l = np.char.replace(l, r'[\d()\[\]{}]+', '', regex=True)
    letter_counts.update(''.join(l))
    return dict(letter_counts)

def make_counter(l):
    l = re.sub(r'[\d()\[\]{}]+', '', ''.join(l))
    return dict(Counter(l))

def allign_counter_to_vocab(counter, vocab):
    return {key: counter.get(key, 0) for key in vocab.keys()}

def make_features(df, vocab):
    id = df['id'].to_numpy()
    smiles = df.drop(columns=['id', 'protein_name', 'binds']).to_numpy()
    protein = df['protein_name'].to_numpy()
    y = df['binds'].to_numpy()

    df_features = {'id':[], 'bb1':[], 'bb2':[], 'bb3':[], 'molecule':[], 'protein':[], 'y':[]}
    for i in trange(len(id), desc='making features'):
        df_features['id'].append(id[i])

        counter = make_counter(smiles[i][0])
        df_features['bb1'].append(allign_counter_to_vocab(counter, vocab))

        counter = make_counter(smiles[i][1])
        df_features['bb2'].append(allign_counter_to_vocab(counter, vocab))

        counter = make_counter(smiles[i][2])
        df_features['bb3'].append(allign_counter_to_vocab(counter, vocab))

        counter = make_counter(smiles[i][3])
        df_features['molecule'].append(allign_counter_to_vocab(counter, vocab))

        df_features['protein'].append(protein[i])
        df_features['y'].append(y[i])

    return df_features

def check_df_allignment(dff_features, vocab):
    flag = True
    for i in trange(len(dff_features['bb1'])):
        if dff_features['bb1'][i].keys() != vocab.keys():
            print(dff_features['bb1'][i].keys())
            print(vocab.keys())
            flag = False
            break
    return flag


def df_vectors(dff_features, vocab, protein_map):
    op = np.empty((100,7))
    for i in trange(0,len(dff_features['id']),100, desc='Making vector df'):
        df = pd.DataFrame({
            'id': dff_features['id'][i:i+100],
            'bb1': dff_features['bb1'][i:i+100],
            'bb2': dff_features['bb2'][i:i+100],
            'bb3': dff_features['bb3'][i:i+100],
            'molecule': dff_features['molecule'][i:i+100],
            'protein': dff_features['protein'][i:i+100],
            'y': dff_features['y'][i:i+100]
        })

        df.bb1 = df.bb1.apply(lambda x: list(x.values()))
        df.bb2 = df.bb2.apply(lambda x: list(x.values()))
        df.bb3 = df.bb3.apply(lambda x: list(x.values()))
        df.molecule = df.molecule.apply(lambda x: list(x.values()))
        df.protein = df.protein.map(protein_map)

        op = np.concatenate((op, df.to_numpy()))

    return op[100:]


def process_row(row, protein_map=protein_map):
    return {
             'id': row['id'],
             'bb1': list(allign_counter_to_vocab(make_counter(row['buildingblock1_smiles']), vocab).values()),
             'bb2': list(allign_counter_to_vocab(make_counter(row['buildingblock2_smiles']), vocab).values()),
             'bb3': list(allign_counter_to_vocab(make_counter(row['buildingblock3_smiles']), vocab).values()),
             'molecule': list(allign_counter_to_vocab(make_counter(row['molecule_smiles']), vocab).values()),
             'protein': protein_map[row['protein_name']],
             'y': row['binds']
        }

def split(path, frac):
    dask_df = dd.read_parquet(path)
    train_fraction = frac
    train_df, val_df = dask_df.random_split([train_fraction, 1 - train_fraction], random_state=42)
    print(f"Train size: {train_df.shape[0].compute()}")
    print(f"Validation size: {val_df.shape[0].compute()}")
    train_df.to_parquet("train_split.parquet", write_index=False)
    val_df.to_parquet("val_split.parquet", write_index=False)

def split2(path, frac=0.5):
    dask_df = dd.read_parquet(path)
    train_fraction = frac

    f1, f2 = dask_df.random_split([train_fraction, 1 - train_fraction], random_state=42)
    f3, f4 = f1.random_split([train_fraction, 1 - train_fraction], random_state=42)
    f5, f6 = f2.random_split([train_fraction, 1 - train_fraction], random_state=42)

    f7, f8 = f3.random_split([train_fraction, 1 - train_fraction], random_state=42)
    f9, f10 = f4.random_split([train_fraction, 1 - train_fraction], random_state=42)
    f11, f12 = f5.random_split([train_fraction, 1 - train_fraction], random_state=42)
    f13, f14 = f6.random_split([train_fraction, 1 - train_fraction], random_state=42)

    print(f"Split-1: {f7.shape[0].compute()}")
    f7.to_parquet("Train_Full_Split-1.parquet", write_index=False)
    print(f"Split-2: {f8.shape[0].compute()}")
    f8.to_parquet("Train_Full_Split-2.parquet", write_index=False)
    print(f"Split-3: {f9.shape[0].compute()}")
    f9.to_parquet("Train_Full_Split-3.parquet", write_index=False)
    print(f"Split-4: {f10.shape[0].compute()}")
    f10.to_parquet("Train_Full_Split-4.parquet", write_index=False)
    print(f"Split-5: {f11.shape[0].compute()}")
    f11.to_parquet("Train_Full_Split-5.parquet", write_index=False)
    print(f"Split-6: {f12.shape[0].compute()}")
    f12.to_parquet("Train_Full_Split-6.parquet", write_index=False)
    print(f"Split-7: {f13.shape[0].compute()}")
    f13.to_parquet("Train_Full_Split-7.parquet", write_index=False)
    print(f"Split-8: {f14.shape[0].compute()}")
    f14.to_parquet("Train_Full_Split-8.parquet", write_index=False)



class ParquetDataset(IterableDataset):
    def __init__(self, dask_df, vocab=vocab, protein_map=protein_map, transform=None):
        self.dask_df = dask_df
        self.partitions = self.dask_df.to_delayed()
        self.vocab = vocab
        self.protein_map = protein_map
        self.transform = transform
        

    def __iter__(self):
        for partition in self.partitions:
            chunk = partition.compute()
            for _, row in chunk.iterrows():
                yield self.process_row(row)

    def process_row(self, row):
        data = {
            'id': row['id'],
            'bb1': list(allign_counter_to_vocab(make_counter(row['buildingblock1_smiles']), self.vocab).values()),
            'bb2': list(allign_counter_to_vocab(make_counter(row['buildingblock2_smiles']), self.vocab).values()),
            'bb3': list(allign_counter_to_vocab(make_counter(row['buildingblock3_smiles']), self.vocab).values()),
            'molecule': list(allign_counter_to_vocab(make_counter(row['molecule_smiles']), self.vocab).values()),
            'protein': self.protein_map[row['protein_name']],
            'y': row['binds']
        }
        if self.transform:
            data = self.transform(data)
        return data

In [6]:
dask_df = dd.read_parquet("/home/23m1521/ashish/kaggle/test2.parquet")

df_len = dask_df.shape[0].compute()
print(f"Number of rows: {df_len}")

df_dataset = ParquetDataset(dask_df)

# Output directory for chunk files
output_dir = 'chunks_output_test'
os.makedirs(output_dir, exist_ok=True)

chunk_size = 1674896  # Size of each chunk
chunk_data = []

with Progress(
    "[cyan]{task.description}",
    BarColumn(),
    TaskProgressColumn(),
    MofNCompleteColumn(),
    TimeElapsedColumn(),
    TimeRemainingColumn(),
) as progress:
    task = progress.add_task("Processing...", total=df_len)
    
    for i, data in enumerate(df_dataset):
        progress.update(task, advance=1)
        chunk_data.append(data)
        
        if (i + 1) % chunk_size == 0:
            # Save the chunk to a separate parquet file
            chunk_file = os.path.join(output_dir, f"chunk_{(i + 1) // chunk_size}.parquet")
            df = pd.DataFrame(chunk_data)
            df.to_parquet(chunk_file, engine='pyarrow', compression='snappy', index=False)
            print(f"Saved {chunk_file}")
            
            # Free RAM
            del chunk_data, df
            chunk_data = []
            gc.collect()

# Save remaining data
if chunk_data:
    chunk_file = os.path.join(output_dir, f"chunk_{(df_len // chunk_size) + 1}.parquet")
    df = pd.DataFrame(chunk_data)
    df.to_parquet(chunk_file, engine='pyarrow', compression='snappy', index=False)
    print(f"Saved {chunk_file}")
    
    # Free RAM
    del chunk_data, df
    gc.collect()

//////////////////////////////////////////////////////////////

### Feature Dataset Creation

In [1]:
train_len = 295246830
one_len = 1589906
zero_len = 293656924
protein_map = {'BRD4': 1, 'HSA': 2, 'sEH': 3}
vocab = {'C': 6825082866, '#': 81527490, '@': 511451694, 'H': 456489972, '=': 1406606874, 'O': 2554179786, 
         'N': 2469595230, 'c': 12257477022, '-': 438483636, '.': 216945504, 'l': 491088828, 'B': 123330132, 
         'r': 121915914, 'n': 1997759694, 'D': 295246830, 'y': 295246830, 'o': 67918650, 's': 156618468, 
         'S': 90662574, 'F': 492710238, '+': 65206260, 'i': 1414026, '/': 11547096, 'I': 23972994}

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from pyspark.sql.functions import col
from pyspark.sql.types import LongType, IntegerType, StructType, StructField

In [3]:
# for 128 Gb and 32 Cores
# spark = (
#     SparkSession
#     .builder
#     .appName("leash belka3")
#     .config("spark.driver.memory", "16g")
#     .config("spark.executor.memory", "16g")
#     .config("spark.executor.instances", "4")
#     .config("spark.executor.cores", "4")
#     .config("spark.driver.maxResultSize", "4g")
#     .master("local[*]")
#     .getOrCreate()
# )

# spark

# for 256 Gb and 64 Cores
spark = (
    SparkSession
    .builder
    .appName("leash belka3")
    .config("spark.driver.memory", "48g")  # Increased driver memory
    .config("spark.executor.memory", "48g")  # Increased executor memory
    .config("spark.executor.instances", "16")  # 16 executors
    .config("spark.executor.cores", "4")  # 4 cores per executor
    .config("spark.driver.maxResultSize", "4g")  # Driver result size limit
    .config("spark.local.dir", "temp")  # Specify a directory with enough space
    .config("spark.shuffle.file.buffer", "128k")  # Shuffle buffer size
    .config("spark.memory.fraction", "0.8")  # Memory fraction for tasks
    .config("spark.shuffle.memoryFraction", "0.6")  # Shuffle memory fraction
    .config("spark.executor.javaOptions", "-Xmx48g")  # JVM heap size for executors
    .master("local[64]")  # Use all 64 cores on the machine
    .getOrCreate()
)

spark

24/12/24 21:13:06 WARN Utils: Your hostname, kanjur resolves to a loopback address: 127.0.1.1; using 10.119.2.14 instead (on interface eno3)
24/12/24 21:13:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/24 21:13:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/24 21:13:07 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [4]:
df = spark.read.format('parquet').load('chunks_output_test')

print(df.rdd.getNumPartitions())
print(df.count())
df.show()

                                                                                

7
1674896
+---------+--------------------+--------------------+--------------------+--------------------+-------+---+
|       id|                 bb1|                 bb2|                 bb3|            molecule|protein|  y|
+---------+--------------------+--------------------+--------------------+--------------------+-------+---+
|295246830|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[2, 0, 0, 0, 1, 0...|[11, 1, 1, 1, 3, ...|      1|  2|
|295246831|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[2, 0, 0, 0, 1, 0...|[11, 1, 1, 1, 3, ...|      2|  2|
|295246832|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[2, 0, 0, 0, 1, 0...|[11, 1, 1, 1, 3, ...|      3|  2|
|295246833|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[3, 0, 0, 0, 0, 1...|[12, 1, 1, 1, 2, ...|      1|  2|
|295246834|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[3, 0, 0, 0, 0, 1...|[12, 1, 1, 1, 2, ...|      2|  2|
|295246835|[10, 1, 1, 1, 2, ...|[2, 0, 0, 0, 1, 0...|[3, 0, 0, 0, 0, 1...|[12, 1, 1, 1, 2, ...|      3|  2|
|295246836|[10, 1,

In [5]:
cols = []
for i in range(24):
    cols.append(col('bb1').getItem(i).alias(f'a{i+1}'))
    cols.append(col('bb2').getItem(i).alias(f'b{i+1}'))
    cols.append(col('bb3').getItem(i).alias(f'c{i+1}'))
    cols.append(col('molecule').getItem(i).alias(f'd{i+1}'))

schema = StructType([
    StructField('id', LongType(), True),
    StructField('protein', IntegerType(), True),
    *[StructField(f'a{i+1}', IntegerType(), True) for i in range(24)],
    *[StructField(f'b{i+1}', IntegerType(), True) for i in range(24)],
    *[StructField(f'c{i+1}', IntegerType(), True) for i in range(24)],
    *[StructField(f'd{i+1}', IntegerType(), True) for i in range(24)],
    StructField('y', IntegerType(), True)
])

df = df.select('id', 'protein', *cols, 'y')
df = spark.createDataFrame(df.rdd, schema)

In [6]:
df.first()

24/12/24 21:13:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Row(id=295246830, protein=1, a1=10, a2=2, a3=2, a4=11, a5=1, a6=0, a7=0, a8=1, a9=1, a10=0, a11=0, a12=1, a13=1, a14=0, a15=0, a16=1, a17=2, a18=1, a19=1, a20=3, a21=4, a22=0, a23=0, a24=1, b1=1, b2=1, b3=1, b4=4, b5=12, b6=6, b7=6, b8=15, b9=1, b10=0, b11=0, b12=0, b13=0, b14=0, b15=0, b16=0, b17=0, b18=0, b19=0, b20=0, b21=0, b22=0, b23=0, b24=0, c1=0, c2=0, c3=0, c4=0, c5=0, c6=0, c7=0, c8=3, c9=0, c10=0, c11=0, c12=1, c13=0, c14=0, c15=0, c16=1, c17=0, c18=0, c19=0, c20=0, c21=0, c22=0, c23=0, c24=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0, d8=0, d9=0, d10=0, d11=0, d12=0, d13=0, d14=0, d15=0, d16=0, d17=0, d18=0, d19=0, d20=0, d21=0, d22=0, d23=0, d24=0, y=2)

In [7]:
print(df.rdd.getNumPartitions())

7


In [8]:
df = df.repartition(1)

In [9]:
print(df.rdd.getNumPartitions())



1


In [10]:
df.write.format('parquet').mode('overwrite').option('header', True).save('test_features.parquet')

                                                                                