## Transformer learning process

In [15]:
import gc
from pyspark.sql import SparkSession
from enum import Enum

import pyspark
import pandas as pd
from pyspark.sql import functions as sf

from tqdm import tqdm
class ResidueType(Enum):
    ADEINE = 1
    CYTHOSINE = 2
    URACIL = 3
    GUANINE = 4

In [16]:
spark = SparkSession.builder \
        .appName("ribonanza")\
        .config("spark.driver.memory", "10g") \
        .getOrCreate()

train_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/adamczykb/projects/ribonanza/data/csv/train_data_QUICK_START.csv")
    
# train_df = train_df[train_df["SN_filter"].values > 0]
train_df = train_df.drop(*[c for c in train_df.columns if "_error_" in c])

df_2A3 = train_df.filter(train_df.experiment_type == "2A3_MaP")
df_DMS = train_df.filter(train_df.experiment_type == "DMS_MaP")

pk50_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/adamczykb/projects/ribonanza/data/csv/PK50_silico_predictions.csv") \
    .withColumnRenamed('hotknots_mfe', 'hotknots') \
    ["sequence","hotknots"]
pk90_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/adamczykb/projects/ribonanza/data/csv/PK90_silico_predictions.csv") \
    .withColumnRenamed('hotknots_mfe', 'hotknots') \
    ["sequence","hotknots"]
r1_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/adamczykb/projects/ribonanza/data/csv/R1_silico_predictions.csv") \
    ["sequence","hotknots"]
gpn15k_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/home/adamczykb/projects/ribonanza/data/csv/GPN15k_silico_predictions.csv") \
    ["sequence","hotknots"]

pairing = pk50_df.union(pk90_df).union(r1_df).union(gpn15k_df)


df_2A3 = df_2A3.join(pairing, on='sequence')
df_DMS = df_DMS.join(pairing, on='sequence')

del pk50_df,pk90_df,r1_df,gpn15k_df,pairing,train_df
gc.collect()
    # return df_2A3, df_DMS
    # _2a3_csv_path = process_structure(df_2A3)
    # dms_csv_path = process_structure(df_DMS)
     

24/03/17 18:55:47 WARN Utils: Your hostname, workstation resolves to a loopback address: 127.0.1.1; using 192.168.0.17 instead (on interface enp11s0)
24/03/17 18:55:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 18:55:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

1033

In [None]:
df_2A3.show()

In [17]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import arrays_zip, col, explode,concat_ws,split
cols=["reactivity_00{:02d}".format(i) for i in range(1,27)]
test = df_2A3.withColumn("sequence", sf.expr("substr(sequence, 26, 999)"))\
    .drop(*cols)
df=test\
    .withColumn("reactivity", sf.concat_ws(",", *[sf.col(x) for x in test.columns if "reactivity_" in x]))\
    .withColumn("reactivity", sf.split(sf.col("reactivity"),","))\
    .withColumn("sequence", sf.split(sf.col("sequence"),""))\
    .withColumn("hotknots", sf.split(sf.col("hotknots"),""))\
    .withColumn("triplet", sf.arrays_zip("sequence", "reactivity","hotknots")) \
    .withColumn("triplet", sf.explode("triplet")) \
    .select("sequence_id",  sf.col("triplet").sequence.alias('nucleotide'),sf.col("triplet").reactivity.cast("float").alias('reactivity'),sf.col("triplet").hotknots.alias('pairing'))\
    .withColumn("reactivity", sf.when(sf.col("reactivity") < 0, 0).otherwise(col("reactivity"))) \
    .replace({'.': '0','(':'1',')':'1','{':'2','}':'2','[':'3',']':'3','<':'4','>':'4','A':'5','a':'5','B':'6','b':'6'},subset=['pairing'])\
    .replace({'A':str(ResidueType.ADEINE.value),'C':str(ResidueType.CYTHOSINE.value),'G':str(ResidueType.GUANINE.value),'U':str(ResidueType.URACIL.value)},subset=['nucleotide'])\
    .withColumn("nucleotide", col("nucleotide").cast(IntegerType()))


In [18]:
df2 = (
        df.select("sequence_id", "nucleotide", "pairing", "reactivity")
        .groupby("sequence_id")
        .agg(
            sf.collect_list(sf.struct("nucleotide", "pairing")).alias("tokens"),
            sf.collect_list("reactivity").alias("reactivity"),
        )
        .withColumn("length",sf.size("tokens"))
        .select("sequence_id", "tokens", "reactivity","length")
        .sort(sf.asc("length"))
    )

df2.show()

24/03/17 18:55:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+------------+--------------------+--------------------+------+
| sequence_id|              tokens|          reactivity|length|
+------------+--------------------+--------------------+------+
|00c09b2f5db5|[{1, 0}, {3, 0}, ...|[1.364, 0.346, 0....|   115|
|2577ae738ad5|[{1, 0}, {3, 0}, ...|[0.109, 0.011, 0....|   115|
|2b498d53e42e|[{1, 1}, {4, 1}, ...|[0.044, 0.014, 0....|   115|
|65235960e5d3|[{1, 0}, {3, 0}, ...|[1.837, 0.405, 0....|   115|
|0e32f3defde4|[{1, 1}, {4, 1}, ...|[0.186, 1.13, 0.1...|   115|
|29fe5407b168|[{1, 1}, {1, 1}, ...|[0.062, 0.193, 0....|   115|
|5dd225e9bdb7|[{1, 1}, {1, 1}, ...|[0.103, 0.954, 0....|   115|
|23905c52de44|[{1, 0}, {1, 0}, ...|[0.031, 1.479, 0....|   115|
|2e4891e0dd14|[{1, 0}, {3, 0}, ...|[0.304, 0.041, 0....|   115|
|2a6e753b286b|[{1, 0}, {3, 0}, ...|[1.599, 0.27, 0.4...|   115|
|2c58166daa45|[{1, 0}, {3, 1}, ...|[0.527, 0.012, 0....|   115|
|052c39e93e98|[{1, 0}, {3, 0}, ...|[0.151, 0.017, 0....|   115|
|2c67c8f4d724|[{1, 1}, {3, 1}, ...|[1.40

24/03/17 18:56:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [1]:
import h5py
file = h5py.File("/home/adamczykb/projects/ribonanza/data/parsed_dms.h5")

In [4]:
!pip3 install tables --break-system-packages

Defaulting to user installation because normal site-packages is not writeable
Collecting tables
  Downloading tables-3.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.3 kB)
Collecting numexpr>=2.6.2 (from tables)
  Downloading numexpr-2.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.9 kB)
Collecting py-cpuinfo (from tables)
  Using cached py_cpuinfo-9.0.0-py3-none-any.whl.metadata (794 bytes)
Collecting blosc2>=2.3.0 (from tables)
  Downloading blosc2-2.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.2 kB)
Collecting ndindex>=1.4 (from blosc2>=2.3.0->tables)
  Using cached ndindex-1.8-py3-none-any.whl.metadata (3.4 kB)
Downloading tables-3.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.5 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.5/7.5 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m0:01[0m:01[0m
[?25hDownloading blosc2-2.5.1-c

In [2]:
# import pandas as pd
# a=pd.read_hdf("/home/adamczykb/projects/ribonanza/data/parsed_dms.h5","sequence_id")

import tables
a=tables.open_file("/home/adamczykb/projects/ribonanza/data/parsed_dms.h5",'r')

In [16]:
!pip3 install h5torch --break-system-packages

Defaulting to user installation because normal site-packages is not writeable
Collecting h5torch
  Downloading h5torch-0.2.14-py3-none-any.whl.metadata (6.4 kB)
Downloading h5torch-0.2.14-py3-none-any.whl (11 kB)
Installing collected packages: h5torch
Successfully installed h5torch-0.2.14


In [3]:
import pandas as pd
a=pd.read_hdf("/home/adamczykb/projects/ribonanza/data/parsed_dms.h5")

In [16]:
a['reactivity'][0]

[0.2840000092983246,
 0.15199999511241913,
 0.035999998450279236,
 0.0020000000949949026,
 0.026000000536441803,
 0.31700000166893005,
 0.32100000977516174,
 0.9559999704360962,
 2.9110000133514404,
 0.7630000114440918,
 0.7210000157356262,
 0.013000000268220901,
 0.2809999883174896,
 0.14900000393390656,
 0.019999999552965164,
 0.0689999982714653,
 0.027000000700354576,
 0.07800000160932541,
 0.07800000160932541,
 0.0560000017285347,
 0.26499998569488525,
 0.02199999988079071,
 0.2029999941587448,
 0.10599999874830246,
 0.6600000262260437,
 0.14000000059604645,
 1.4259999990463257,
 1.4420000314712524,
 0.032999999821186066,
 0.019999999552965164,
 0.17299999296665192,
 0.05299999937415123,
 0.10700000077486038,
 1.5880000591278076,
 0.0949999988079071,
 0.13699999451637268,
 0.3240000009536743,
 0.12099999934434891,
 0.03500000014901161,
 0.03799999877810478,
 0.13899999856948853,
 0.9860000014305115,
 0.3889999985694885,
 0.39800000190734863,
 0.05999999865889549,
 0.841000020503997

In [8]:
q = df_2A3.approxQuantile('reactivity', [0.25, 0.5, 0.75], 0) 
upper_limit = q[2] + 1.5*(q[2]-q[0])


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `reactivity` cannot be resolved. Did you mean one of the following? [`reactivity_0001`, `reactivity_0002`, `reactivity_0003`, `reactivity_0004`, `reactivity_0005`].;
'Project ['reactivity]
+- Project [sequence#1948, sequence_id#1947, experiment_type#1949, dataset_name#1950, reactivity_0001#1951, reactivity_0002#1952, reactivity_0003#1953, reactivity_0004#1954, reactivity_0005#1955, reactivity_0006#1956, reactivity_0007#1957, reactivity_0008#1958, reactivity_0009#1959, reactivity_0010#1960, reactivity_0011#1961, reactivity_0012#1962, reactivity_0013#1963, reactivity_0014#1964, reactivity_0015#1965, reactivity_0016#1966, reactivity_0017#1967, reactivity_0018#1968, reactivity_0019#1969, reactivity_0020#1970, ... 187 more fields]
   +- Join Inner, (sequence#1948 = sequence#3008)
      :- Filter (experiment_type#1949 = 2A3_MaP)
      :  +- Project [sequence_id#1947, sequence#1948, experiment_type#1949, dataset_name#1950, reactivity_0001#1951, reactivity_0002#1952, reactivity_0003#1953, reactivity_0004#1954, reactivity_0005#1955, reactivity_0006#1956, reactivity_0007#1957, reactivity_0008#1958, reactivity_0009#1959, reactivity_0010#1960, reactivity_0011#1961, reactivity_0012#1962, reactivity_0013#1963, reactivity_0014#1964, reactivity_0015#1965, reactivity_0016#1966, reactivity_0017#1967, reactivity_0018#1968, reactivity_0019#1969, reactivity_0020#1970, ... 186 more fields]
      :     +- Relation [sequence_id#1947,sequence#1948,experiment_type#1949,dataset_name#1950,reactivity_0001#1951,reactivity_0002#1952,reactivity_0003#1953,reactivity_0004#1954,reactivity_0005#1955,reactivity_0006#1956,reactivity_0007#1957,reactivity_0008#1958,reactivity_0009#1959,reactivity_0010#1960,reactivity_0011#1961,reactivity_0012#1962,reactivity_0013#1963,reactivity_0014#1964,reactivity_0015#1965,reactivity_0016#1966,reactivity_0017#1967,reactivity_0018#1968,reactivity_0019#1969,reactivity_0020#1970,... 392 more fields] csv
      +- Union false, false
         :- Project [sequence#3008, hotknots#3069]
         :  +- Project [id#3007, sequence#3008, notes#3009, eterna_nupack#3010, eterna_eternafold+threshknot#3011, vienna2_mfe#3012, contrafold2_mfe#3013, eternafold_mfe#3014, e2efold_mfe#3015, hotknots_mfe#3016 AS hotknots#3069, ipknots_mfe#3017, knotty_mfe#3018, pknots_mfe#3019, spotrna_mfe#3020, vienna[threshknot]_mfe#3021, vienna[hungarian]_mfe#3022, eternafold[threshknot]_mfe#3023, eternafold[hungarian]_mfe#3024, contrafold[threshknot]_mfe#3025, contrafold[hungarian]_mfe#3026, nupack[threshknot]_mfe#3027, nupack[hungarian]_mfe#3028, shapify_mfe#3029, eternafold+hfold_1#3030, ... 7 more fields]
         :     +- Relation [id#3007,sequence#3008,notes#3009,eterna_nupack#3010,eterna_eternafold+threshknot#3011,vienna2_mfe#3012,contrafold2_mfe#3013,eternafold_mfe#3014,e2efold_mfe#3015,hotknots_mfe#3016,ipknots_mfe#3017,knotty_mfe#3018,pknots_mfe#3019,spotrna_mfe#3020,vienna[threshknot]_mfe#3021,vienna[hungarian]_mfe#3022,eternafold[threshknot]_mfe#3023,eternafold[hungarian]_mfe#3024,contrafold[threshknot]_mfe#3025,contrafold[hungarian]_mfe#3026,nupack[threshknot]_mfe#3027,nupack[hungarian]_mfe#3028,shapify_mfe#3029,eternafold+hfold_1#3030,... 7 more fields] csv
         :- Project [sequence#3125, hotknots#3191]
         :  +- Project [id#3121, title#3122, name#3123, body#3124, sequence#3125, eterna_nupack#3126, eterna_eternafold+threshknot#3127, vienna2_mfe#3128, contrafold2_mfe#3129, eternafold_mfe#3130, e2efold_mfe#3131, hotknots_mfe#3132 AS hotknots#3191, ipknots_mfe#3133, knotty_mfe#3134, pknots_mfe#3135, spotrna_mfe#3136, vienna[threshknot]_mfe#3137, vienna[hungarian]_mfe#3138, eternafold[threshknot]_mfe#3139, eternafold[hungarian]_mfe#3140, contrafold[threshknot]_mfe#3141, contrafold[hungarian]_mfe#3142, nupack[threshknot]_mfe#3143, nupack[hungarian]_mfe#3144, ... 11 more fields]
         :     +- Relation [id#3121,title#3122,name#3123,body#3124,sequence#3125,eterna_nupack#3126,eterna_eternafold+threshknot#3127,vienna2_mfe#3128,contrafold2_mfe#3129,eternafold_mfe#3130,e2efold_mfe#3131,hotknots_mfe#3132,ipknots_mfe#3133,knotty_mfe#3134,pknots_mfe#3135,spotrna_mfe#3136,vienna[threshknot]_mfe#3137,vienna[hungarian]_mfe#3138,eternafold[threshknot]_mfe#3139,eternafold[hungarian]_mfe#3140,contrafold[threshknot]_mfe#3141,contrafold[hungarian]_mfe#3142,nupack[threshknot]_mfe#3143,nupack[hungarian]_mfe#3144,... 11 more fields] csv
         :- Project [sequence#3251, hotknots#3259]
         :  +- Relation [rowID#3247,id#3248,name#3249,body#3250,sequence#3251,title#3252,vienna2_mfe#3253,vienna2_time#3254,contrafold2_mfe#3255,contrafold2_time#3256,eternafold_mfe#3257,eternafold_time#3258,hotknots#3259,hotknots_time#3260,ipknots#3261,ipknots_time#3262,knotty#3263,knotty_time#3264,spotrna#3265,spotrna_time#3266,nupack_pk#3267,nupack_pk_time#3268,vienna_2[threshknot]#3269,vienna_2[threshknot]_time#3270,... 20 more fields] csv
         +- Project [sequence#3357, hotknots#3364]
            +- Relation [rowID#3355,seqID#3356,sequence#3357,vienna2_mfe#3358,vienna2_time#3359,contrafold2_mfe#3360,contrafold2_time#3361,eternafold_mfe#3362,eternafold_time#3363,hotknots#3364,hotknots_time#3365,ipknots#3366,ipknots_time#3367,knotty#3368,knotty_time#3369,spotrna#3370,spotrna_time#3371,nupack_pk#3372,nupack_pk_time#3373,vienna_2[threshknot]#3374,vienna_2[threshknot]_time#3375,vienna_2[hungarian]#3376,vienna_2[hungarian]_time#3377,eternafold[threshknot]#3378,... 13 more fields] csv


In [8]:
upper_limit

1.4265000484883785

In [21]:
aaa=df2.select(sf.col("tokens")).collect()

                                                                                

In [25]:
a=df2.select(sf.col("tokens")).rdd.map(tuple)


                                                                                

In [29]:
a.zipWithIndex().filter(lambda x: x[1] == 1).map(lambda x: x[0]).collect()[0]


                                                                                

TypeError: 'PipelinedRDD' object is not subscriptable