In [18]:
#### ####
import findspark

findspark.init("/opt/homebrew/Cellar/apache-spark/3.3.0/libexec")

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

from psutil import virtual_memory
from pyspark import SparkFiles
from pyspark.conf import SparkConf
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


def detect_spark_memory_limit():
    """Spark does not automatically use all available memory on a machine. When working on large datasets, this may
    cause Java heap space errors, even though there is plenty of RAM available. To fix this, we detect the total amount
    of physical memory and allow Spark to use (almost) all of it."""
    mem_gib = virtual_memory().total >> 30
    return int(mem_gib * 0.9)


spark_mem_limit = detect_spark_memory_limit()
spark_conf = (
    SparkConf()
    .set("spark.driver.memory", f"{spark_mem_limit}g")
    .set("spark.executor.memory", f"{spark_mem_limit}g")
    .set("spark.driver.maxResultSize", "0")
    .set("spark.debug.maxToStringFields", "2000000000")
    .set("spark.sql.execution.arrow.maxRecordsPerBatch", "500000")
    ###.set("spark.executor.heartbeatInterval", "3600s")
    .set("spark.sql.execution.arrow.pyspark.enabled", "true")
    .set("spark.ui.showConsoleProgress", "false")
)


spark = (
    SparkSession.builder.config(conf=spark_conf)
    .master("local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")  ### Run locally
    .getOrCreate()
)


In [97]:
### load target parquet files version 23.06
targets_path="/Users/juanr/Desktop/DepMap_2ndGen/targets"
target = spark.read.parquet(targets_path)
targetSymbols=target.select("id","approvedSymbol").withColumnRenamed("approvedSymbol","TARGET").withColumnRenamed("id","targetId")
### load target prioritisation factors 
targetPrioritisationFactorsPath="/Users/juanr/Desktop/DepMap_2ndGen/targetsPriorisation"
targetPrior = spark.read.parquet(targetPrioritisationFactorsPath,sep=r"\t",header=True)
targetPrior.show(truncate=False)

### Load supplementary Tables 12 y 13
secondGenMapTable12_path="/Users/juanr/Desktop/SupplementaryTable12.txt"
table12 = (spark.read.csv(secondGenMapTable12_path,sep=r"\t",header=True)
    .withColumn("geneType_J", F.lit("specificCancer"))
    .withColumnRenamed("CE","Common_essential")
    .withColumnRenamed("Cancer Type", "cancerType"))

secondGenMapTable13_path="/Users/juanr/Desktop/SupplementaryTable13.txt"
table13 = (spark.read.csv(secondGenMapTable13_path,sep=r"\t",header=True)
    .withColumn("geneType_J", F.lit("panCancer"))
    .withColumnRenamed("Common essential","Common_essential")
    .withColumn("cancerType", F.lit("panCancer")))

+----------------------+---------+-----------+-----+------------+-----------------------+-----------+--------------------------------------------------------------+---------------------------------------------------------+---------------------------------------+---------------------+------------+-------+----------------+-------------+--------------+
|cancerType            |TARGET   |PRIORITY   |GROUP|TRACTABILITY|MOLTYPE                |MARKERCLASS|ASSOCIATION_EFFECT                                            |MARKER                                                   |ANOVA_table_entry                      |INDICATION           |PPI_distance|PPI_min|Common_essential|Network score|geneType_J    |
+----------------------+---------+-----------+-----+------------+-----------------------+-----------+--------------------------------------------------------------+---------------------------------------------------------+---------------------------------------+---------------------+------------

In [64]:
### 302 from cancer-type specific analyses 

table12depMap.select("TARGET").distinct().count()

302

In [65]:
### 196 were pan-cancer 

table13depMap.select("TARGET").distinct().count()

196

In [128]:
### Make Union of the two tables 
jointTable=table12.union(table13.select('cancerType',
 'TARGET',
 'PRIORITY',
 'GROUP',
 'TRACTABILITY',
 'MOLTYPE',
 'MARKERCLASS',
 'ASSOCIATION_EFFECT',
 'MARKER',
 'ANOVA_table_entry',
 'INDICATION',
 'PPI_distance',
 'PPI_min',
 'Common_essential',
 'Network score',
 'geneType_J'))

In [145]:
### Include Ensembl Id to the datasets 
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

joinTableEns=jointTable.join(targetSymbols, on="TARGET", how="left")
completeTable=joinTableEns.join(targetPrior, on="targetId",how="left").withColumn("TRACTABILITY", F.col("TRACTABILITY").cast(IntegerType()))

In [168]:
completeTable.count()

713

In [167]:
completeTable.groupBy("TARGET").count().count()

370

In [152]:
completeTable.groupBy("geneType_J").agg(F.countDistinct("TARGET")).show()

+--------------+-------------+
|    geneType_J|count(TARGET)|
+--------------+-------------+
|     panCancer|          196|
|specificCancer|          302|
+--------------+-------------+



In [None]:
completeTable.toPandas().to_csv("completeTableGenesDepMap2nd.csv")

In [160]:
removeColumns=['MOLTYPE',
 'MARKERCLASS',
 'ASSOCIATION_EFFECT',
 'MARKER',
 'ANOVA_table_entry',
 'INDICATION',
 'PPI_distance',
 'PPI_min',
 'Common_essential',
 'Network score']

In [163]:

selectedTable=completeTable.select([c for c in completeTable.columns if c not in removeColumns])

In [164]:
selectedTable.toPandas().to_csv("selectedTable.csv")