In [1]:
import duckdb
import pandas as pd
# import findspark
# findspark.init()
import pyspark
from pyspark.conf import SparkConf

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, FloatType, IntegerType, StructType, StructField, StringType, LongType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Word2Vec
from imblearn.over_sampling import SMOTE
import rdkit
from rdkit import Chem
from rdkit.Chem import AllChem

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score
# from sklearn.preprocessing import OneHotEncoder

import mlflow

import numpy as np

import torch
from torch.utils.data import Dataset, DataLoader

import sklearn
import time
# import h2o
# from h2o.estimators import H2OGradientBoostingEstimator
# from h2o.frame import H2OFrame
# from pysparkling import H2OContext
# import pysparkling

In [2]:
# schema = StructType([
#     StructField("id", LongType(), True),
#     StructField("buildingblock1_smiles", StringType(), True),
#     StructField("buildingblock2_smiles", StringType(), True),
#     StructField("buildingblock3_smiles", StringType(), True),
#     StructField("molecule_smiles", StringType(), True),
#     StructField("protein_name", StringType(), True),
#     StructField("binds", LongType(), True)
# ])
# train_1 = spark.read.parquet("train_1.parquet", schema=schema)
# train_0 = spark.read.parquet("train_0.parquet", schema=schema)

In [2]:
def vector_to_array(v):
    return v.toArray().tolist()
vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

def smiles_to_fingerprint(smiles, radius = 3, nBits = 25):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits) # 10, 15
        return [bit for bit in fp]
    else:
        return [0] * nBits

# Register the UDF
smiles_to_fingerprint_udf = udf(smiles_to_fingerprint, ArrayType(IntegerType()))

# Register the UDF using pandas_udf
@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def smiles_to_fingerprint_udf(smiles_series: pd.Series) -> pd.Series:
    return smiles_series.apply(smiles_to_fingerprint)



# ### Distinct Counts
# # buildingblock 1: 271
# # buildingblock 2: 693
# # buildingblock 3: 872
# # molecule_smiles: 29,656
# # 4 repeats for buildingblock triplets but they are binded to different target proteins



In [3]:
spark = SparkSession.builder.appName('belka') \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "4g") \
    .config("spark.cores.max", "16") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memoryOverhead", "2g") \
    .getOrCreate()
    # .config("spark.ext.h2o.backend.cluster.mode", "internal")\
    # .config("spark.executor.instances", "1")\
    # .config("spark.executor.memory", "2g")\
    # .config("spark.driver.memory", "2g")\
train_data = spark.read.parquet("train.parquet") # train_data.count() # 295,246,830
sample_data = train_data.sample(.0001)

train_1 = spark.read.parquet("train_1.parquet")

# pandas_sample = sample_data.toPandas()
# train_rdd = train_data.rdd
# h2o.init()
# hc = H2OContext.getOrCreate()

# bind1_data = train_data.where(F.col("binds") == 1)
# bind0_data = train_data.where(F.col("binds") == 0)

24/06/23 04:47:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
### Vectorize inputs with Morgan Fingerprint Bit Embedding
radius = 3
vector_size = 25
encoded_data = (
    train_1.withColumn("encoded_buildingblock1", smiles_to_fingerprint_udf(F.col("buildingblock1_smiles") ))
                .withColumn("encoded_buildingblock2", smiles_to_fingerprint_udf(F.col("buildingblock2_smiles") ))
                .withColumn("encoded_buildingblock3", smiles_to_fingerprint_udf(F.col("buildingblock3_smiles")))
                .withColumn("encoded_molecule_vector", smiles_to_fingerprint_udf(F.col("molecule_smiles")))
                .withColumn("encoded_sEH", F.when(F.col("protein_name") == "sEH", 1).otherwise(0))
                .withColumn("encoded_HSA", F.when(F.col("protein_name") == "HSA", 1).otherwise(0))
                .withColumn("encoded_BRD4", F.when(F.col("protein_name") == "BRD4", 1).otherwise(0))
)

encoded_data = encoded_data.select(*[(F.col("encoded_buildingblock1")[i]).alias(f"buildingblock1_{i}") for i in range(25)],
                   *[(F.col("encoded_buildingblock2")[i]).alias(f"buildingblock2_{i}") for i in range(25)],
                   *[(F.col("encoded_buildingblock3")[i]).alias(f"buildingblock3_{i}") for i in range(25)],
                   *[(F.col("encoded_molecule_vector")[i]).alias(f"molecule_{i}") for i in range(25)],
                   "encoded_sEH", "encoded_HSA", "encoded_BRD4", "binds").withColumn("row_id", F.monotonically_increasing_id())


# print(bind1_data.count()) # 1_589_906
# print(bind0_data.count()) # 293_656_924

# 293656924 / 1589906 # 184.70080872705682

In [9]:
# for i in range(12,101): # 10
#     start = time.time()
#     print(i)
#     sample_data = encoded_data.where(F.col("row_id").between(i * 15899, (i + 1) * 15899))
#     sample_data.write.mode("overwrite").parquet(f"train_1_encoded_shards/shard_{i}.parquet")
#     final_df = pd.concat([final_df, sample_data.toPandas()])
#     end = time.time()

#     print("Time: ", end - start)



12


24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
24/06/23 04:50:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

NameError: name 'final_df' is not defined

In [10]:
full_train_1 = spark.read.parquet("train_1_encoded_full.parquet")

In [19]:
full_train_pd = full_train_1.toPandas()

                                                                                

Py4JJavaError: An error occurred while calling o441.collectToPython.
: java.lang.OutOfMemoryError: Java heap space


In [28]:
final_df

Unnamed: 0,buildingblock1_0,buildingblock1_1,buildingblock1_2,buildingblock1_3,buildingblock1_4,buildingblock1_5,buildingblock1_6,buildingblock1_7,buildingblock1_8,buildingblock1_9,...,molecule_20,molecule_21,molecule_22,molecule_23,molecule_24,encoded_sEH,encoded_HSA,encoded_BRD4,binds,row_id
0,1,0,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,22527
1,1,0,1,1,0,1,1,1,1,1,...,1,1,1,1,1,0,0,1,1,22528
2,1,0,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,22529
3,1,0,1,1,0,1,1,1,1,1,...,1,1,1,1,1,0,1,0,1,22530
4,1,0,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,22531
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10936,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,456108
10937,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,0,1,0,1,456109
10938,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,456110
10939,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,456111


In [11]:
dummy_row = full_train_1.limit(1).collect()[0]
dummy_row = dummy_row.asDict()
dummy_row["binds"] = 0
# sample_pd = sample_pd.append(dummy_row, ignore_index=True)



In [38]:
smote_x

Unnamed: 0,buildingblock1_0,buildingblock1_1,buildingblock1_2,buildingblock1_3,buildingblock1_4,buildingblock1_5,buildingblock1_6,buildingblock1_7,buildingblock1_8,buildingblock1_9,...,molecule_19,molecule_20,molecule_21,molecule_22,molecule_23,molecule_24,encoded_sEH,encoded_HSA,encoded_BRD4,row_id
0,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,169987
1,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,292146
2,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,101790
3,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,449638
4,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,266158
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
299996,1,1,1,1,0,1,1,1,1,1,...,1,1,0,1,1,1,0,0,1,17179946304
299997,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,368201
299998,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,0,1,0,0,336635
299999,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,0,1,0,8589953699


In [48]:

# 58492, 58492

Unnamed: 0,binds
0,0
1,0
2,0
3,0
4,0
...,...
58487,1
58488,1
58489,1
58490,1


In [50]:
smote_x['binds'] = smote_y
smote_x

  smote_x['binds'] = smote_y


Unnamed: 0,buildingblock1_0,buildingblock1_1,buildingblock1_2,buildingblock1_3,buildingblock1_4,buildingblock1_5,buildingblock1_6,buildingblock1_7,buildingblock1_8,buildingblock1_9,...,molecule_19,molecule_20,molecule_21,molecule_22,molecule_23,molecule_24,encoded_sEH,encoded_HSA,encoded_BRD4,binds
0,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,0
1,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,0
2,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,0
3,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,1,0,0
4,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
58487,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,0,0,0,1
58488,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,1
58489,1,1,1,1,0,1,1,1,1,1,...,1,1,1,1,1,1,0,0,1,1
58490,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,0,0,1


In [24]:
# ### Vectorize Inputs
# vector_size = 20

# word2Vec1 = Word2Vec(vectorSize=vector_size, minCount=0, inputCol="buildingblock1_array", outputCol="buildingblock1_vector")
# model1 = word2Vec1.fit(sample_train_1)
# result1 = model1.transform(sample_train_1)

# print("Done 1")

# word2Vec2 = Word2Vec(vectorSize=vector_size, minCount=0, inputCol="buildingblock2_array", outputCol="buildingblock2_vector")
# model2 = word2Vec2.fit(result1)
# result2 = model2.transform(result1)

# print("Done 2")

# word2Vec3 = Word2Vec(vectorSize=vector_size, minCount=0, inputCol="buildingblock3_array", outputCol="buildingblock3_vector")
# model3 = word2Vec3.fit(result2)
# result3 = model3.transform(result2)

# print("Done 3")

# word2Vec4 = Word2Vec(vectorSize=vector_size, minCount=0, inputCol="molecule_array", outputCol="molecule_vector")
# model4 = word2Vec4.fit(result3)
# result4 = model4.transform(result3)

# print("Done 4")



# vectorized_samples = result4.select('buildingblock1_vector', 'buildingblock2_vector', 'buildingblock3_vector', 
#                                     'molecule_vector', "encoded_sEH", "encoded_HSA", "encoded_BRD4", 'binds')

# array_samples =(
#     vectorized_samples
#     .withColumn("buildingblock1_vector", vector_to_array_udf(vectorized_samples["buildingblock1_vector"]))
#     .withColumn("buildingblock2_vector", vector_to_array_udf(vectorized_samples["buildingblock2_vector"]))
#     .withColumn("buildingblock3_vector", vector_to_array_udf(vectorized_samples["buildingblock3_vector"]))
#     .withColumn("molecule_vector", vector_to_array_udf(vectorized_samples["molecule_vector"]))
# )

# pandas_vector_df = array_samples.toPandas()

# ### Unpacking arrays
# rename_block1 = {i:f"buildingblock1_feature_{i}" for i in range(vector_size)}
# values_df = pd.DataFrame(pandas_vector_df['buildingblock1_vector'].tolist(), index=pandas_vector_df.index)
# result_df = pd.concat([pandas_vector_df.drop(columns=['buildingblock1_vector']), values_df], axis=1)
# result_df = result_df.rename(columns=rename_block1)

# rename_block2 = {i:f"buildingblock2_feature_{i}" for i in range(vector_size)}
# values_df = pd.DataFrame(result_df['buildingblock2_vector'].tolist(), index=result_df.index)
# result_df = pd.concat([result_df.drop(columns=['buildingblock2_vector']), values_df], axis=1)
# result_df = result_df.rename(columns=rename_block2)

# rename_block3 = {i:f"buildingblock3_feature_{i}" for i in range(vector_size)}
# values_df = pd.DataFrame(result_df['buildingblock3_vector'].tolist(), index=result_df.index)
# result_df = pd.concat([result_df.drop(columns=['buildingblock3_vector']), values_df], axis=1)
# result_df = result_df.rename(columns=rename_block3)

# rename_molecule = {i:f"molecule_feature_{i}" for i in range(vector_size)}
# values_df = pd.DataFrame(result_df['molecule_vector'].tolist(), index=result_df.index)
# result_df = pd.concat([result_df.drop(columns=['molecule_vector']), values_df], axis=1)
# result_df = result_df.rename(columns=rename_molecule)
# result_df

In [19]:
binds_distribution = train_data.select('binds').groupby('binds').count()
binds_distribution.show()
"""
Imbalanced labels. Most of the attempts do not bind at all.
0: 293656924
1: 1589906
"""

[Stage 22:>                                                       (0 + 14) / 28]

+-----+---------+
|binds|    count|
+-----+---------+
|    0|293656924|
|    1|  1589906|
+-----+---------+





'\nImbalanced labels. Most of the attempts do not bind at all.\n'

In [16]:
result_distribution = train_data.groupby(['protein_name', 'binds']).count()
result_distribution.show()
"""
sEH tends to be more likely to have a bind than the other two.
"""



+------------+-----+--------+
|protein_name|binds|   count|
+------------+-----+--------+
|         HSA|    0|98007200|
|         sEH|    1|  724532|
|         sEH|    0|97691078|
|        BRD4|    1|  456964|
|        BRD4|    0|97958646|
|         HSA|    1|  408410|
+------------+-----+--------+



