# Prepare Data for DESeq2
aedavids@ucsc.edu 10/24/21

create a master count matrix and estimated scaling factors, we can use to test /private/home/aedavids/extraCellularRNA/terra/deseq/R/ DESeq scripts

The reason we use spark for these tasks is that it required us to read the entire data set into memory. Something that would not be possible using a monolithic tool like DESeq2.

ref: [sql-programming-guide.htm](https://spark.apache.org/docs/latest/sql-programming-guide.html)

## <span style="color:red">TODO make a production vesion so we can get better data and see if parts works or not
</span>

In [1]:
TODO make a production vesion so we can get better data and see if parts works or not

SPARK_HOME="../../sparkBin/spark-3.1.2-bin-hadoop3.2"
import findspark
findspark.init( SPARK_HOME )

In [2]:
import numpy as np
import pandas as pd
import pathlib as Path

In [3]:
!pwd

dataRoot = Path.Path("testData/sparkDESeqTest2")
rootOutDir = dataRoot.joinpath('output')
rootOutDir.mkdir(exist_ok=True)

/private/home/aedavids/extraCellularRNA/juypterNotebooks/spark


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DESeqMasterETL") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

# .config("spark.some.config.option", "some-value") \

In [5]:
from pyspark.sql import functions as sqlFunc

# Step 1) Load Salmon Quant files

In [6]:
def loadSalmonQuantFiles( fileList, debug=False ):
    
    # pre allocate slots to store data frames in
    quantSDFs = [None] * len(fileList)
    
    quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "

    for i in range( len(quantSDFs) ):
        quantFile =  fileList[i] 
        
        if debug:
            print(quantFile)
            
        df = spark.read.load( quantFile.as_posix(), format="csv", sep="\t", 
                                 schema=quantSchema, header="true")
        quantSDFs[i] = df    
        
    return quantSDFs

In [7]:
# order is important, must match the colData row name
sampleNames = [ "ctrl_1", "ctrl_2", "ctrl_3", "kras_1", "kras_2", "kras_3" ]
# fileList = [ dataRoot + "/" + name + ".quant.sf" for name in sampleNames ]
fileList = [ dataRoot.joinpath(name + ".quant.sf") for name in sampleNames ]

quantSparkDFList = loadSalmonQuantFiles( fileList, debug=True )

testData/sparkDESeqTest2/ctrl_1.quant.sf
testData/sparkDESeqTest2/ctrl_2.quant.sf
testData/sparkDESeqTest2/ctrl_3.quant.sf
testData/sparkDESeqTest2/kras_1.quant.sf
testData/sparkDESeqTest2/kras_2.quant.sf
testData/sparkDESeqTest2/kras_3.quant.sf


# Step 2) combine the quant files into a single count data frame

In [8]:
def createCountDF( quantSparkDFList, sampleNamesList, debug=False ):
    # initialize master counts spark data frame
    firstSampleName = sampleNamesList[0]
    rawCountsSDF = quantSparkDFList[0].select( ["Name", "NumReads"] )\
                            .withColumnRenamed( "NumReads", firstSampleName )

    # Register the DataFrame as a SQL temporary view
    rawCountsSDF.createOrReplaceTempView("rawCounts")
    
    if debug:
        print(firstSampleName)

    for i in range( 1, len(sampleNamesList)):
        sampleName = sampleNamesList[i]
        
        if debug:
            print(sampleName)

        # select the key and counts from the sample. 
        sampleSDF = quantSparkDFList[i].select( ["Name", "NumReads", ] )\
            .withColumnRenamed( "NumReads", sampleName )
        
        sampleSDF.createOrReplaceTempView("sample")

        sqlStmt = ' select rc.*, {}  \n\
                          from \n\
                              rawCounts as rc, \n\
                              sample  \n\
                          where \n\
                              rc.Name == sample.Name  \n'.format(sampleName)
        #print(sqlStmt)

        rawCountsSDF = spark.sql( sqlStmt )
        rawCountsSDF.createOrReplaceTempView("rawCounts")
        #masterCountSDF.show()
        
    return rawCountsSDF

In [9]:
rawCountSparkDF = createCountDF( quantSparkDFList, sampleNames, debug=True )
rawCountSparkDF.createOrReplaceTempView("rawCounts")

rawCountSparkDF.show( truncate=False )

ctrl_1
ctrl_2
ctrl_3
kras_1
kras_2
kras_3
+-------+------+------+------+------+------+------+
|Name   |ctrl_1|ctrl_2|ctrl_3|kras_1|kras_2|kras_3|
+-------+------+------+------+------+------+------+
|txId_1 |0.0   |0.1   |0.2   |0.0   |0.1   |0.2   |
|txId_2 |11.0  |11.1  |11.2  |110.0 |110.1 |110.2 |
|txId_3 |12.0  |12.1  |0.0   |120.0 |120.1 |120.2 |
|txId_4 |13.0  |13.1  |13.2  |130.0 |130.1 |130.2 |
|txId_5 |14.0  |14.1  |14.2  |140.0 |140.1 |140.2 |
|txId_6 |15.0  |15.1  |15.2  |150.0 |150.1 |150.2 |
|txId_7 |16.0  |16.1  |16.2  |160.0 |160.1 |160.2 |
|txId_8 |17.0  |17.1  |17.2  |170.0 |170.1 |170.2 |
|txId_9 |18.0  |18.1  |18.2  |180.0 |180.1 |180.2 |
|txId_10|19.0  |19.1  |19.2  |190.0 |190.1 |190.2 |
+-------+------+------+------+------+------+------+



# Step 3, group by gene id and count

In [10]:
def groupByGeneAndSum( txId2GeneIdFile, rawCountSDF, debug=False):
    tx2geneSchema = " `txId` STRING, `geneId` STRING "
    txt2geneSparkDF = spark.read.load( txId2GeneIdFile, format="csv",  
                                    schema=tx2geneSchema, header="false")
    if debug:
        print(txt2geneSparkDF)
        txt2geneSparkDF.show()
    
    txt2geneSparkDF.createOrReplaceTempView("txt2gene")

    sqlStmt = ' select geneId, rc.*  \n\
                      from \n\
                          rawCounts as rc, \n\
                          txt2gene  \n\
                      where \n\
                          rc.Name == txt2gene.txId  \n'

    rawCountSDF = spark.sql( sqlStmt )
    rawCountSDF.createOrReplaceTempView("rawCounts")
    
    retSparkDF = rawCountSDF.groupBy("geneId").sum()# do not sort it does not matter .sort("geneId").show() 
    
    return retSparkDF

In [11]:
csvFile = dataRoot.joinpath("txId2GeneId.csv")
countsSparkDF = groupByGeneAndSum(csvFile.as_posix(), rawCountSparkDF, debug=True)

DataFrame[txId: string, geneId: string]
+-------+------+
|   txId|geneId|
+-------+------+
|   txId|geneId|
| txId_1|gene_1|
| txId_2|gene_2|
| txId_3|gene_3|
| txId_4|gene_4|
| txId_5|gene_5|
| txId_6|gene_6|
| txId_7|gene_7|
| txId_8|gene_8|
| txId_9|gene_8|
|txId_10|gene_8|
+-------+------+



In [12]:
print("txId_8, txId_9, txId_10 map to gene_8")
countsSparkDF.show(truncate=False)

txId_8, txId_9, txId_10 map to gene_8
+------+-----------+------------------+------------------+-----------+-----------+-----------------+
|geneId|sum(ctrl_1)|sum(ctrl_2)       |sum(ctrl_3)       |sum(kras_1)|sum(kras_2)|sum(kras_3)      |
+------+-----------+------------------+------------------+-----------+-----------+-----------------+
|gene_2|11.0       |11.1              |11.2              |110.0      |110.1      |110.2            |
|gene_7|16.0       |16.1              |16.2              |160.0      |160.1      |160.2            |
|gene_6|15.0       |15.1              |15.2              |150.0      |150.1      |150.2            |
|gene_4|13.0       |13.1              |13.2              |130.0      |130.1      |130.2            |
|gene_1|0.0        |0.1               |0.2               |0.0        |0.1        |0.2              |
|gene_5|14.0       |14.1              |14.2              |140.0      |140.1      |140.2            |
|gene_3|12.0       |12.1              |0.0           

In [13]:
def testCountsSparkDF( df ):
    expectedRowStr = "Row(geneId='gene_8', sum(ctrl_1)=54.0, sum(ctrl_2)=54.300000000000004, sum(ctrl_3)=54.599999999999994, sum(kras_1)=540.0, sum(kras_2)=540.3, sum(kras_3)=540.5999999999999)"
    retRow = df.filter( df.geneId == "gene_8").take(1)[0]
    assert str(retRow) == expectedRowStr , "group by gene count is wrong"
    
testCountsSparkDF( countsSparkDF )

# Step 4) convert counts to integers

In [14]:
from pyspark.sql.functions import col, round
# columns[1:], skip geneId col
# https://mrpowers.medium.com/performing-operations-on-multiple-columns-in-a-pyspark-dataframe-36e97896c378
# https://treyhunner.com/2018/10/asterisks-in-python-what-they-are-and-how-to-use-them/
# countsSparkDF = countsSparkDF.select(*(col(c).cast("integer").alias(c) for c in countsSparkDF.columns[1:]))

# round before casting to int to match R implementation

countsSparkDF = countsSparkDF.select( col('geneId') , \
        *(round(col(c)).alias(c) for c in countsSparkDF.columns[1:]) )

countsSparkDF = countsSparkDF.select( col('geneId') , \
        *(col(c).cast("integer").alias(c) for c in countsSparkDF.columns[1:]) )

countsSparkDF.show()

def testConvertToInt( df ):
    expectedPDFDict = {'geneId': {0: 'gene_2', 1: 'gene_7', 2: 'gene_6', 3: 'gene_4', 4: 'gene_1', 5: 'gene_5', 6: 'gene_3', 7: 'gene_8'}, 
                       'sum(ctrl_1)': {0: 11, 1: 16, 2: 15, 3: 13, 4: 0, 5: 14, 6: 12, 7: 54}, 
                       'sum(ctrl_2)': {0: 11, 1: 16, 2: 15, 3: 13, 4: 0, 5: 14, 6: 12, 7: 54}, 
                       'sum(ctrl_3)': {0: 11, 1: 16, 2: 15, 3: 13, 4: 0, 5: 14, 6: 0, 7: 55}, 
                       'sum(kras_1)': {0: 110, 1: 160, 2: 150, 3: 130, 4: 0, 5: 140, 6: 120, 7: 540}, 
                       'sum(kras_2)': {0: 110, 1: 160, 2: 150, 3: 130, 4: 0, 5: 140, 6: 120, 7: 540}, 
                       'sum(kras_3)': {0: 110, 1: 160, 2: 150, 3: 130, 4: 0, 5: 140, 6: 120, 7: 541}}

    expectedPDF = pd.DataFrame( expectedPDFDict )
#     print("\n expected")
#     print(expectedPDF)
    retPDF = df.toPandas()
   

    # spark df is int32, pandas from dict is int64
    pd.testing.assert_frame_equal( retPDF, expectedPDF, check_dtype=False )
    
testConvertToInt( countsSparkDF )

+------+-----------+-----------+-----------+-----------+-----------+-----------+
|geneId|sum(ctrl_1)|sum(ctrl_2)|sum(ctrl_3)|sum(kras_1)|sum(kras_2)|sum(kras_3)|
+------+-----------+-----------+-----------+-----------+-----------+-----------+
|gene_2|         11|         11|         11|        110|        110|        110|
|gene_7|         16|         16|         16|        160|        160|        160|
|gene_6|         15|         15|         15|        150|        150|        150|
|gene_4|         13|         13|         13|        130|        130|        130|
|gene_1|          0|          0|          0|          0|          0|          0|
|gene_5|         14|         14|         14|        140|        140|        140|
|gene_3|         12|         12|          0|        120|        120|        120|
|gene_8|         54|         54|         55|        540|        540|        541|
+------+-----------+-----------+-----------+-----------+-----------+-----------+



# Step 5) pre filter
saves a lot of memory

<span style="color:red">Not needed. is this computationly efficent. does this mess up row sums?</span>

In [15]:
from functools import reduce
from operator import add
from pyspark.sql.functions import col

def rowSums( countsSparkDF, columnNames ):
    # https://stackoverflow.com/a/54283997/4586180
    retDF = countsSparkDF.na.fill(0).withColumn("rowSum" ,reduce(add, [col(x) for x in columnNames]))
    return retDF

# Step 6) estimate Scaling Factors for each sample
account for differences in library size and library composition.

library composition issue, we want to compare two sample with the same size. Assume in disease/knock out one gene is not expessed. This results in the remaining genes getting more counts than otherwise. I.e. the extra reads for the knock out gene get distributed to the remaining genes

## DESeq implementation</span>
<span style="color:red"> This will not work in spark</span>
```
a) calculate the log of all values
    i) logs are no easily swad by outliners
    ii) we may have zero counts, log is undefined
    
b) calculate the 'geometic average' of the rows
    i) mean( rowsum )
    
c) filter out genes with average = +/- infinity 
    i) removes genes with zero in one or more samples. 
    that are type specific. I.e. we want to focus on the house keeping genes.
    These are genes that are trascripted at simpilar levels regradless of tissue type
    
d) subtract the avereage log values from the log(counts)
    i) this is equal to log( numRead_x / average numRead_x)
    
e) calculate the median of the ratio for each sample 
    i) median is robust
    
f) convert the medians back to linear scale
```

## Spark implementation
DESeq is implemented in R and uses an R implementation detail to remove genes with zero in one or more samples. This will not work in spark. R defines log(0) = -inf. All arithimetic operations on -inf = -inf. DESeq calculates log of all values then calculates the row means. It then removes rows where row mean = -inf.

In spark log(0) is defined to null. It seems like spark will treat null as zero. That is to say that if some genes have zero counts the rowSums will not be -inf. We will need to filter these genes out before calculating row means.

```
a) calculate the log of all values
    i) logs are no easily swad by outliners
    ii) we may have zero counts, log is undefined. These values will be null
    
b) filter out genes with one or more nulls
   i) removes genes with zero in one or more samples. 
    that are type specific. I.e. we want to focus on the house keeping genes.
    These are genes that are trascripted at simpilar levels regradless of tissue type
    
c) calculate the 'geometic average' of the rows
    i) mean( rowsum )

d) subtract the avereage log values from the log(counts)
    i) this is equal to log( numRead_x / average numRead_x)
    
e) calculate the median of the ratio for each sample 
    i) median is robust
    
f) convert the medians back to linear scale
```

In [16]:
# 6.a) calculate log of all counts
from pyspark.sql.functions import log

# skip first column, i.e. gene_id
columns = countsSparkDF.columns[1:]
colNameAndAlias = ( (c, c.replace('sum', "log"))  for c in columns )

logCountsSparkDF = countsSparkDF.select( col('geneId') ,
                                        *(log(c).alias(a) for c,a in colNameAndAlias ) ) 
                                           
logCountsSparkDF.show()

+------+------------------+------------------+------------------+------------------+------------------+------------------+
|geneId|       log(ctrl_1)|       log(ctrl_2)|       log(ctrl_3)|       log(kras_1)|       log(kras_2)|       log(kras_3)|
+------+------------------+------------------+------------------+------------------+------------------+------------------+
|gene_2|2.3978952727983707|2.3978952727983707|2.3978952727983707| 4.700480365792417| 4.700480365792417| 4.700480365792417|
|gene_7| 2.772588722239781| 2.772588722239781| 2.772588722239781| 5.075173815233827| 5.075173815233827| 5.075173815233827|
|gene_6|  2.70805020110221|  2.70805020110221|  2.70805020110221|5.0106352940962555|5.0106352940962555|5.0106352940962555|
|gene_4|2.5649493574615367|2.5649493574615367|2.5649493574615367| 4.867534450455582| 4.867534450455582| 4.867534450455582|
|gene_1|              null|              null|              null|              null|              null|              null|
|gene_5|2.639057

In [17]:
def testConvertToLog( df ):
    testPDF = df.select(['geneId', 'log(ctrl_1)', 'log(kras_3)']).toPandas()
#     print(testPDF)
#     print(testPDF.to_dict())

    expectedPDFDict = {
     'geneId': {0: 'gene_2', 1: 'gene_7', 2: 'gene_6', 3: 'gene_4', 4: 'gene_1', 5: 'gene_5', 6: 'gene_3', 7: 'gene_8'}, 
     'log(ctrl_1)': {0: 2.3978952727983707, 1: 2.772588722239781, 2: 2.70805020110221,   3: 2.5649493574615367, 4: 'nan', 5: 2.6390573296152584, 6: 2.4849066497880004, 7: 3.9889840465642745}, 
     'log(kras_3)': {0: 4.700480365792417,  1: 5.075173815233827, 2: 5.0106352940962555, 3: 4.867534450455582,  4: 'nan', 5: 4.941642422609304,  6: 4.787491742782046,  7: 6.293419278846481}
    }

    import numpy as np

    # print(expectedPDF)
    expectedPDF = pd.DataFrame( expectedPDFDict )
    selectRows = expectedPDF.loc[:,"geneId"] == "gene_1"
    expectedPDF.loc[selectRows, ["log(ctrl_1)", "log(kras_3)"] ] = np.NaN #pd.NaN
    #print(expectedPDF)

    # expectedPDF

    pd.testing.assert_frame_equal( testPDF, expectedPDF, check_dtype=False )
    
testConvertToLog( logCountsSparkDF )

In [18]:
# 6.b) filter out genes with one or more nulls
#    i) removes genes with zero in one or more samples. 
#     that are type specific. I.e. we want to focus on the house keeping genes.
#     These are genes that are trascripted at simpilar levels regradless of tissue type

filteredDF = logCountsSparkDF.na.drop()

def testFilter( df ) :
    filteredPDF = df.select("geneId").toPandas()
    # expectedDict = retPDF.to_dict()
    # print(expectedDict)
    expectedDict = {'geneId': {0: 'gene_2', 1: 'gene_7', 2: 'gene_6', 3: 'gene_4', 4: 'gene_5', 5: 'gene_8'}}

    expectedPDF = pd.DataFrame( expectedDict )
    pd.testing.assert_frame_equal(filteredPDF , expectedPDF )
    
testFilter( filteredDF )

In [19]:
# 6.c) calculate the mean of the row sum
# skip gene_id
columns = filteredDF.columns[1:]
rowSumsDF = rowSums( filteredDF, columns )
rowSumsDF.show()

# rowSumsDF.select(["geneId", "rowSum"]).show()
# rowSumsPDF = rowSumsDF.select(["geneId", "rowSum"]).toPandas()
#print(rowSumsPDF.to_dict())

def testRowSum( df ):
    df.select(["geneId", "rowSum"]) #.show()
    pdf = rowSumsDF.select(["geneId", "rowSum"]).toPandas()
    #print(pdf.to_dict())
    
    expectedDict =  {
        'geneId': {0: 'gene_2', 1: 'gene_7', 2: 'gene_6', 3: 'gene_4', 4: 'gene_5', 5: 'gene_8'}, 
        'rowSum': {0: 21.29512691577236, 1: 23.54328761242082, 2: 23.156056485595393, 3: 22.297451423751358, 4: 22.74209925667369, 5: 30.861858836324142}
    }
    
    expectedPDF = pd.DataFrame( expectedDict )

    pd.testing.assert_frame_equal(pdf , expectedPDF )
    
testRowSum(rowSumsDF)

+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|geneId|       log(ctrl_1)|       log(ctrl_2)|       log(ctrl_3)|       log(kras_1)|       log(kras_2)|       log(kras_3)|            rowSum|
+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|gene_2|2.3978952727983707|2.3978952727983707|2.3978952727983707| 4.700480365792417| 4.700480365792417| 4.700480365792417| 21.29512691577236|
|gene_7| 2.772588722239781| 2.772588722239781| 2.772588722239781| 5.075173815233827| 5.075173815233827| 5.075173815233827| 23.54328761242082|
|gene_6|  2.70805020110221|  2.70805020110221|  2.70805020110221|5.0106352940962555|5.0106352940962555|5.0106352940962555|23.156056485595393|
|gene_4|2.5649493574615367|2.5649493574615367|2.5649493574615367| 4.867534450455582| 4.867534450455582| 4.867534450455582|22.297451423751358|
|gene_

In [20]:
n = len( rowSumsDF.columns ) -2 # do not count geneId or rowSum columns
rowMeansDF = rowSumsDF.withColumn("rowMean", (rowSumsDF.rowSum / n ))
rowMeansDF.show()

# AEDWIP should be " use RStudio symbolic debugger"
# # Browse[7]> loggeomeans
# #   gene_1   gene_2   gene_3    gene_4    gene_5    gene_6    gene_7    gene_8 
# #     -Inf 3.549188     -Inf   3.716242 3.790350  3.859343  3.923881  5.143643

# fix unit test. we corrected teh bug but did not fix the unit test becase we wanted to
# fix the int( round(count) ) bug first to save time

def testRowMeans( df ):
    df.select(["geneId", "rowSum", "rowMean"]) #.show() 
    rowMeansPDF = df.select(["geneId", "rowSum", "rowMean"]).toPandas()
    
    expectedDict = rowMeansPDF.to_dict()
#     print(expectedDict)

#     expectedDict = {
#         'geneId':  {0: 'gene_2',          1: 'gene_7',           2: 'gene_6',           3: 'gene_4',           4: 'gene_5',           5: 'gene_8'}, 
#         'rowSum':  {0: 21.29512691577236, 1: 23.54328761242082,  2: 23.156056485595393, 3: 22.297451423751358, 4: 22.74209925667369,  5: 30.841659558367784}, 
#         'rowMean': {0: 2.661890864471545, 1: 2.9429109515526024, 2: 2.894507060699424,  3: 2.7871814279689198, 4: 2.8427624070842112, 5: 3.855207444795973}}

    expectedDict = {
        'geneId':  {0: 'gene_2',           1: 'gene_7',          2: 'gene_6',           3: 'gene_4',           4: 'gene_5',          5: 'gene_8'}, 
        'rowSum':  {0: 21.29512691577236,  1: 23.54328761242082, 2: 23.156056485595393, 3: 22.297451423751358, 4: 22.74209925667369, 5: 30.861858836324142}, 
        'rowMean': {0: 3.5491878192953936, 1: 3.923881268736803, 2: 3.859342747599232,  3: 3.7162419039585597, 4: 3.790349876112282, 5: 5.143643139387357}}
   
    expectedPDF = pd.DataFrame( expectedDict )
    pd.testing.assert_frame_equal(rowMeansPDF , expectedPDF )
    
testRowMeans( rowMeansDF )

+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|geneId|       log(ctrl_1)|       log(ctrl_2)|       log(ctrl_3)|       log(kras_1)|       log(kras_2)|       log(kras_3)|            rowSum|           rowMean|
+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|gene_2|2.3978952727983707|2.3978952727983707|2.3978952727983707| 4.700480365792417| 4.700480365792417| 4.700480365792417| 21.29512691577236|3.5491878192953936|
|gene_7| 2.772588722239781| 2.772588722239781| 2.772588722239781| 5.075173815233827| 5.075173815233827| 5.075173815233827| 23.54328761242082| 3.923881268736803|
|gene_6|  2.70805020110221|  2.70805020110221|  2.70805020110221|5.0106352940962555|5.0106352940962555|5.0106352940962555|23.156056485595393| 3.859342747599232|
|gene_4|2.5649493574615367|2.56494

In [21]:
# 6.d) subtract the avereage log values from the log(counts)
#     i) this is equal to log( numRead_x / average numRead_x)

def subtractRowMeanFromLogCounts( sparkDF, columnNames ):
    colNameAndAlias = ( (c, c.replace('log(','').replace(')',''))  for c in columns )
    retDF = sparkDF.select(  col('geneId'),
                           *((col(c) - col('rowMean')).alias(a) for c,a in colNameAndAlias) )
    return retDF


# skip the first and last 2 columns, ie. geneId, rowSum, rowMean
columnNames = rowMeansDF.columns[1:-2]
print(columnNames)

ratioDF = subtractRowMeanFromLogCounts( rowMeansDF, columnNames )
ratioDF.show()

def testRatio( df ):
    ratioPDF = df.toPandas()
    expectedDict = ratioPDF.to_dict()
#     print(expectedDict)
    
    expectedDict = {
        'geneId': {0: 'gene_2', 1: 'gene_7', 2: 'gene_6', 3: 'gene_4', 4: 'gene_5', 5: 'gene_8'}, 
        'ctrl_1': {0: -1.151292546497023, 1: -1.151292546497022, 2: -1.151292546497022, 3: -1.151292546497023, 4: -1.1512925464970234, 5: -1.1546590928230822}, 
        'ctrl_2': {0: -1.151292546497023, 1: -1.151292546497022, 2: -1.151292546497022, 3: -1.151292546497023, 4: -1.1512925464970234, 5: -1.1546590928230822}, 
        'ctrl_3': {0: -1.151292546497023, 1: -1.151292546497022, 2: -1.151292546497022, 3: -1.151292546497023, 4: -1.1512925464970234, 5: -1.1363099541548856}, 
        'kras_1': {0: 1.151292546497023, 1: 1.1512925464970234, 2: 1.1512925464970234, 3: 1.1512925464970225, 4: 1.151292546497022, 5: 1.1479260001709637}, 
        'kras_2': {0: 1.151292546497023, 1: 1.1512925464970234, 2: 1.1512925464970234, 3: 1.1512925464970225, 4: 1.151292546497022, 5: 1.1479260001709637}, 
        'kras_3': {0: 1.151292546497023, 1: 1.1512925464970234, 2: 1.1512925464970234, 3: 1.1512925464970225, 4: 1.151292546497022, 5: 1.1497761394591244}}
    
    expectedPDF = pd.DataFrame( expectedDict )

    pd.testing.assert_frame_equal(ratioPDF , expectedPDF )
    
testRatio(ratioDF)

['log(ctrl_1)', 'log(ctrl_2)', 'log(ctrl_3)', 'log(kras_1)', 'log(kras_2)', 'log(kras_3)']
+------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|geneId|             ctrl_1|             ctrl_2|             ctrl_3|            kras_1|            kras_2|            kras_3|
+------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|gene_2| -1.151292546497023| -1.151292546497023| -1.151292546497023| 1.151292546497023| 1.151292546497023| 1.151292546497023|
|gene_7| -1.151292546497022| -1.151292546497022| -1.151292546497022|1.1512925464970234|1.1512925464970234|1.1512925464970234|
|gene_6| -1.151292546497022| -1.151292546497022| -1.151292546497022|1.1512925464970234|1.1512925464970234|1.1512925464970234|
|gene_4| -1.151292546497023| -1.151292546497023| -1.151292546497023|1.1512925464970225|1.1512925464970225|1.1512925464970225|
|gene_5|-1.1512925464970234

In [22]:
# 6.calculate the median of the ratio for each sample 
#     i) median is robust

def median( sparkDataFrame, columnNames ):
    '''
    https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.percentile_approx.html
    if you have an odd number of rows it does not calculate the value between the 2 middle values
    '''
    
    retDF = sparkDataFrame.select( *(sqlFunc.percentile_approx( c, 0.5, accuracy=1000000 ) 
                                     for c in columnNames) )
    return retDF

def testMedian() :
    def singleRowDFToNumpyArray( sparkDF ):
        pdf = sparkDF.toPandas()
        np = pdf.values[0]  
        return np

    # test even number of rows
    evenPDF = pd.DataFrame( {
        "a" : [i * 1.0 for i in range(6)],
        "b" : [i * 2.0 for i in range(6)],
    })
    evenSparkDF = spark.createDataFrame( evenPDF )
    resultDF = median( evenSparkDF, evenSparkDF.columns )
    resultMediansNP = singleRowDFToNumpyArray( resultDF )

    # test odd number of rows
    expectMedianNP = np.array( [2., 4.] )
    np.testing.assert_array_equal(resultMediansNP, expectMedianNP)

    oddPDF = pd.DataFrame( {
        "c" : [i * 1.0 for i in range(7)],
        "d" : [i * 2.0 for i in range(7)],
    })
    oddSparkDF = spark.createDataFrame( oddPDF )
    resultDF = median( oddSparkDF, oddSparkDF.columns )
    resultMediansNP = singleRowDFToNumpyArray( resultDF )
    expectMedianNP = np.array( [3., 6.] )
    np.testing.assert_array_equal(resultMediansNP, expectMedianNP)
    
testMedian()

In [23]:
# skip geneId
columnNames = ratioDF.columns[1:]
estimatedScalingFactorsDF = median( ratioDF, columnNames )
#print(estimatedScalingFactorsDF.columns)

# rename columns
def getSampleNames(colName):
    #x = "percentile_approx(ctrl_1, 0.5, 1000000)"
    #name = x.split("(")[1].split(",")[0]
    ret = colName.split("(")[1].split(",")[0]
#     print(ret)
    return ret

newColNames = [getSampleNames(c) for c in estimatedScalingFactorsDF.columns]
logScalingFactorsDF = estimatedScalingFactorsDF.toDF( *newColNames )

def testLogScaleFactor( df ):
    logScalingFactorsPDF = df.toPandas()
#     print(logScalingFactorsPDF.to_dict())
    
    expectedDict = {'ctrl_1': {0: -1.151292546497023}, 'ctrl_2': {0: -1.151292546497023}, 
                     'ctrl_3': {0: -1.151292546497023}, 'kras_1': {0: 1.1512925464970225}, 
                     'kras_2': {0: 1.1512925464970225}, 'kras_3': {0: 1.1512925464970225}}
    
    expectedPDF = pd.DataFrame( expectedDict )
    pd.testing.assert_frame_equal(logScalingFactorsPDF , expectedPDF )
    
testLogScaleFactor( logScalingFactorsDF )

In [24]:
# 6.f) convert the medians back to linear scale
print("logScalingFactorsDF")
logScalingFactorsDF.show()

scalingFactorsDF = logScalingFactorsDF.select( *(sqlFunc.exp( c ) 
                                     for c in logScalingFactorsDF.columns) )
print("\nscalingFactorsDF")
scalingFactorsDF.show()

logScalingFactorsDF
+------------------+------------------+------------------+------------------+------------------+------------------+
|            ctrl_1|            ctrl_2|            ctrl_3|            kras_1|            kras_2|            kras_3|
+------------------+------------------+------------------+------------------+------------------+------------------+
|-1.151292546497023|-1.151292546497023|-1.151292546497023|1.1512925464970225|1.1512925464970225|1.1512925464970225|
+------------------+------------------+------------------+------------------+------------------+------------------+


scalingFactorsDF
+------------------+------------------+------------------+-----------------+-----------------+-----------------+
|       EXP(ctrl_1)|       EXP(ctrl_2)|       EXP(ctrl_3)|      EXP(kras_1)|      EXP(kras_2)|      EXP(kras_3)|
+------------------+------------------+------------------+-----------------+-----------------+-----------------+
|0.3162277660168379|0.3162277660168379|0.3

In [25]:
# write to a single file
# estimatedScalingFactorsOutFile = rootOutDir.joinpath("estimatedScalingFactors.csv")
# scalingFactorsDF.coalesce(1).write.csv( estimatedScalingFactorsOutFile.as_posix(), 
#                                                 mode='overwrite', header=True)


def testScalingFactor( df ):
    scalingFactorsPDF = scalingFactorsDF.toPandas()
    expectedDict = scalingFactorsPDF.to_dict()
    #print( expectedDict)

    
    # DESeq scaling factors. 
    # used RStudio symbolic debuger
    # Browse[7]> sf
    # [1] 0.3162278 0.3162278 0.3162278 3.1622777 3.1622777 3.1622777  
    
    expectedDict = {'EXP(ctrl_1)': {0: 0.3162277660168379}, 'EXP(ctrl_2)': {0: 0.3162277660168379}, 
                    'EXP(ctrl_3)': {0: 0.3162277660168379}, 'EXP(kras_1)': {0: 3.162277660168378}, 
                    'EXP(kras_2)': {0: 3.162277660168378},  'EXP(kras_3)': {0: 3.162277660168378}}

    expectedPDF = pd.DataFrame( expectedDict )
    pd.testing.assert_frame_equal(scalingFactorsPDF , expectedPDF )

testScalingFactor(scalingFactorsDF)

## Save scaling factors

In [26]:
def saveScalingFactors( df, outfile ):
    def getSampleNames(pdf):
        #x = "EXP(ctrl_1)"
        columnNames = pdf.columns.to_list()
        ret = [ c.replace( "EXP(", "" ).replace( ")","" ) for c in columnNames ] 
    
        return ret
        
    # this should easily fit into driver memory
    # we want to save as 2 columns. It easier to transpose in pandas
    scalingFactorsPDF = df.toPandas()
    sampleNames = getSampleNames( scalingFactorsPDF )    
#     print("sampleNames:{}".format(sampleNames))
    scalingFactorsPDF = scalingFactorsPDF.transpose()
    
    # replace "EXP(sample name ) row names with sample name"
    scalingFactorsPDF = scalingFactorsPDF.set_axis( sampleNames, axis='index')
    
    scalingFactorsPDF.to_csv(scalingFactorsOutFile, sep="\t", index=True, header=False)
    
scalingFactorsOutFile = rootOutDir.joinpath("estimatedScalingFactors.tsv")
saveScalingFactors(scalingFactorsDF , scalingFactorsOutFile)

In [27]:
! cat $scalingFactorsOutFile

ctrl_1	0.3162277660168379
ctrl_2	0.3162277660168379
ctrl_3	0.3162277660168379
kras_1	3.162277660168378
kras_2	3.162277660168378
kras_3	3.162277660168378


## to scale, divide  all original read counts by scalling factors
we need to scale all the genes in original count matrix. This includes
genes we filtered out when calculated our scaling factors

# Create master count spark data frame

# invoke an action
This will cause the optimized query plan to run

In [28]:
#masterCountSDF.show(n=3)

# Save gene count

In [29]:
outFile = rootOutDir.joinpath("txidGroupedByGeneidCounts")
partsOutFile = rootOutDir.joinpath("txidGroupedByGeneidCountsParts")

In [30]:
def fixColNames( countDF ) :
    '''
    fix column names. the should be sample names
    # 'sum(ctrl_1)'' should be 'ctrl_1'
    '''
    
    # skip the the geneId colummn It is value like gene_2
    oldNames = countDF.columns[1:]

    sampleNames = ( (c, c.replace('sum(','').replace(')',''))  for c in oldNames )
    # print(*sampleColNames)

    retDF = countDF.select( col('geneId'), 
                           *((col(c).alias(a) for c,a in sampleNames) ) ) 
    return( retDF )
                           
countsSparkDF = fixColNames(countsSparkDF)

In [31]:
# countsSparkDF.rdd.getNumPartitions()
countsSparkDF.rdd.getNumPartitions()

200

In [32]:
# write to 2 files
# deseq balks if there are three files
# Error in estimateDispersionsFit(object, fitType = fitType, quiet = quiet) : 
#   all gene-wise dispersion estimates are within 2 orders of magnitude
#   from the minimum value, and so the standard curve fitting techniques will not work.
#   One can instead use the gene-wise estimates as final estimates:
#   dds <- estimateDispersionsGeneEst(dds)
#   dispersions(dds) <- mcols(dds)$dispGeneEst
#   ...then continue with testing using nbinomWaldTest or nbinomLRT

numFiles = 3
# countsSparkDF.coalesce(numFiles) \
hack = 5 # when we get here there is only one partition because of show()
countsSparkDF.repartition(hack).coalesce(numFiles) \
    .write \
    .csv( partsOutFile.as_posix(), mode='overwrite', sep='\t', header=True)

print("saved: {}".format(partsOutFile))
# masterCountSDF.show()
countsSparkDF.show()

saved: testData/sparkDESeqTest2/output/txidGroupedByGeneidCountsParts
+------+------+------+------+------+------+------+
|geneId|ctrl_1|ctrl_2|ctrl_3|kras_1|kras_2|kras_3|
+------+------+------+------+------+------+------+
|gene_2|    11|    11|    11|   110|   110|   110|
|gene_7|    16|    16|    16|   160|   160|   160|
|gene_6|    15|    15|    15|   150|   150|   150|
|gene_4|    13|    13|    13|   130|   130|   130|
|gene_1|     0|     0|     0|     0|     0|     0|
|gene_5|    14|    14|    14|   140|   140|   140|
|gene_3|    12|    12|     0|   120|   120|   120|
|gene_8|    54|    54|    55|   540|   540|   541|
+------+------+------+------+------+------+------+



In [33]:
# masterCountSDF.rdd.getNumPartitions()
countsSparkDF.rdd.getNumPartitions()

200

In [34]:
# write to one file
# write to a single file
# masterCountSDF.coalesce(1).write.csv( masterCountsOutFile.as_posix(), mode='overwrite', sep='\t', header=True)
countsSparkDF.coalesce(1).write.csv( outFile.as_posix(), mode='overwrite', sep='\t', header=True)

print("saved: {}".format(outFile))

saved: testData/sparkDESeqTest2/output/txidGroupedByGeneidCounts


In [35]:
ugly hack to try and get data into file DESeq will not complain about
the partitions are not of equal size

does not fix bug

# copy the last row from one file to the other

$ tail -n 1 part-00000-3cb6ac58-373a-44ad-90ea-62712a2bc6b8-c000.csv >> \
    part-00001-3cb6ac58-373a-44ad-90ea-62712a2bc6b8-c000.csv 

# remove the last row
$ wc -l part-00000-b3fd4a3f-bcf3-4dcf-8eea-03f2742ff9b3-c000.csv 
6 part-00000-b3fd4a3f-bcf3-4dcf-8eea-03f2742ff9b3-c000.csv

$ head -n 5 part-00000-b3fd4a3f-bcf3-4dcf-8eea-03f2742ff9b3-c000.csv > \
    part-00000-b3fd4a3f-bcf3-4dcf-8eea-03f2742ff9b3-c000.csv


SyntaxError: invalid syntax (<ipython-input-35-3f5b518eb8f8>, line 1)