In [0]:
import pandas as pd
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from glow.levels.linear_model import RidgeReducer, RidgeRegression
import pyspark.sql.functions as f

In [0]:
spark = SparkSession.builder.appName('levels').getOrCreate()
spark.conf.set('spark.sql.execution.arrow.enabled', 'true')

In [0]:
test_data_root = '/Users/leland.barnard/glow/glow-wgr/test-data/levels/ridge-regression' #path to glow levels test data

We need three objects to get started:
* A Spark DataFrame representing the block genotype matrix
* A Spark DataFrame containing a mapping of sample block ID to corresponding list of sample ids
* A Pandas DataFrame containing phenotypes

In [0]:
blockdf_lvl0 = spark.read.parquet(f'{test_data_root}/blockedGT.snappy.parquet') #block genotype matrix
indexdf = spark.read.parquet(f'{test_data_root}/groupedIDs.snappy.parquet') #sample block ID to sample list mapping
labeldf =  pd.read_csv(f'{test_data_root}/pts.csv').set_index('sample_id') #phenotype data

#### The block genotype matrix as a DataFrame
If we imagine the block genotype matrix conceptually, we think of an *NxM* matrix *X* where each row *n* represents an individual sample, each column *m* represents a variant, and each cell *(n, m)* contains a genotype value for sample *n* at variant *m*.  We then imagine laying a coarse grid on top of this matrix such that matrix cells within the same coarse grid cell are all assigned to the same block *x*.  Each block *x* is indexed by a sample block ID (corresponding to a list of rows belonging to the block) and a header block ID (corresponding to a list of columns belonging to the block).  The sample block IDs are generally just integers 0 through the number of sample blocks.  The header block IDs are strings of the form 'chr_C_block_B', which refers to the Bth block on chromosome C.  The Spark DataFrame representing this block matrix can be thought of as the transpose of each block *xT* all stacked one atop another.  Each row represents the values from a particular column from *X*, for the samples corresponding to a particular sample block.  The fields in the DataFrame are:
* header:  Corresponds to a column name in the conceptual matrix *X*.
* size:  If the matrix is sparse (so that genotype values of 0 are implicit), the values for this header are represented as a sparse vector, and this column contains the size of that sparse vector.  Corresponds to the number of individuals in the sample block for the row.
* indices:  (Optional, present of the matrix is sparse) Indices of the non-zero entries in the sparse vector for this header in this sample block.
* values:  Genotype values for this header in this sample block.  If the matrix is sparse, contains only non-zero values.
* header_block:  An ID assigned to the block *x* containing this header.
* sample_block:  An ID assigned to the block *x* containing the group of samples represented on this row.
* position:  An integer assigned to this header that specifies the correct sort order for the headers in this block.
* mu:  The mean of the genotype calls for this header
* sig:  The standard deviation of the genotype calls for this header

In [0]:
blockdf_lvl0.show(4)

#### The sample block mapping
This is a comparitively simple key-value store where each key is a sample block ID and each value is a list of sample IDs contained in that sample block.  As a Spark DataFrame, this is represented as a two column DataFrame with the following fields:
* sample_block:  ID for a sample block
* sample_ids: Array of sample IDs for the samples in this sample block.  The order of these IDs must match the order of the values arrays in the block genotype DataFrame.

In [0]:
indexdf.show(4)

#### The phenotype data
The phenotype data is represented as a Pandas DataFrame indexed by the sample ID.  Each column represents a single phenotype, and it is assumed that there are no missing phenotype values, and that the phenotypes mean centered at 0.

In [0]:
labeldf.head()

#### Reducer model fitting
The first step in the fitting procedure is to apply a dimensionality reduction to the block matrix *X* using the `RidgeReducer`.  This is accomplished by fitting multiple ridge models within each block *x* and producing a new block matrix where each column represents the prediction of one ridge model applied within one block.  This approach to model building is generally referred to as **stacking**.  We will call the block genotype matrix we started with the **level 0** matrix in the stack *X0*, and the output of the ridge reduction step the **level 1** matrix *X1*.  The `RidgeReducer` class is used for this step, which is initiallized with a list of ridge regularization values (referred to here as alpha).  Since ridge models are indexed by these alpha values, the `RidgeReducer` will generate one ridge model per value of alpha provided, which in turn will produce one column per block in *X0*, so the final dimensions of matrix *X1* will be *Nx(LxK)*, where *L* is the number of header blocks in *X0* and *K* is the number of alpha values provided to the `RidgeReducer`.  In practice, we can estimate a span of alpha values in a reasonable order of magnitude based on guesses at the heritability of the phenotype we are fitting, but here we will just pick some values.

In [0]:
alphas_lvl0 = np.logspace(2, 5, 10)
stack_lvl0 = RidgeReducer(alphas_lvl0)

When the `RidgeReducer` is initialized, it will assign names to the provided alphas and store them in a dict accessible as `RidgeReducer.alphas`.  This is mostly just to give an easily readable and sortable name to the models produced for each ridge value.

In [0]:
stack_lvl0.alphas

The `RidgeReducer.fit(blockdf, labeldf, indexdf)` method generates a Spark DataFrame representing the model that we can use to reduce *X0* to *X1*.

In [0]:
modeldf_lvl0 = stack_lvl0.fit(blockdf_lvl0, labeldf, indexdf)

In explicit terms, the reduction of a block *x0* from *X0* to the corresponding block *x1* from *X1* is accomplished by the matrix multiplication *x0 * B = x1*, where *B* is a coefficient matrix of size *mxK*, where *m* is the number of columns in block *x0* and *K* is the number of alpha values used in the reduction.  As an added wrinkle, if the ridge reduction is being performed against multiple phenotypes at once, each phenotype will have its own *B*, and for convenience we panel these next to each other in the output into a single matrix, so *B* in that case has dimensions *mx(K*P)* where *P* is the number of phenotypes.  Each matrix *B* is specific to a particular block in *X0*, so the Spark DataFrame produced by the `RidgeReducer` can be thought of all of as the matrices *B* from all of the blocks stacked one atop another.  The fields in the model DataFrame are:
* header_block:  An ID assigned to the block *x0* corresponding to the coefficients in this row.
* sample_block:  An ID assigned to the block *x0* corresponding to the coefficients in this row.
* header:  The name of a column from the conceptual matrix *X0* that correspond with a particular row from the coefficient matrix *B*.
* alphas:  List of alpha names corresponding to the columns of *B*.
* labels:  List of label (i.e., phenotypes) corresponding to the columns of *B*. 
* coefficients:  List of the actual values from a row in *B*

In [0]:
modeldf_lvl0.show(4)

#### Reducer transformation
After fitting, the `RidgeReducer.transform(blockdf, labeldf, modeldf)` method can be used to generate `X1` from `X0`.

In [0]:
blockdf_lvl1 = stack_lvl0.transform(blockdf_lvl0, labeldf, modeldf_lvl0)

The output of the transformation is closely analogous to the block matrix DataFrame we started with.  The main difference is that, rather than representing a single block matrix, it really represents multiple block matrices, with one such matrix per label (phenotype).  Comparing the schema of this block matrix DataFrame (`blockdf_lvl1`) with the DataFrame we started with (`blockdf_lvl0`), the new columns are:
* alpha:  This is the name of the alpha value used in fitting the model that produced the values in this row.
* label:  This is the label corresponding to the values in this row.  Since the genotype block matrix *X0* is phenotype-agnostic, the rows in `blockdf_lvl0` were not restricted to any label/phenotype, but the level 1 block matrix *X1* represents ridge model predictions for the labels the reducer was fit with, so each row is associated with a specific label.

In [0]:
blockdf_lvl1.show(4)

The headers in the *X1* block matrix are derived from a combination of the source block in *X0*, the alpha value used in fitting the ridge model, and the label they were fit with.  These headers are assigned to header blocks that correspond to the chromosome of the source block in *X0*.

In [0]:
blockdf_lvl1.select('header', 'header_block').show(4, False)

#### Regression fitting
The block matrix *X1* can be used to fit a final predictive model that can generate phenotype predictions *y_hat* using the `RidgeRegression` class.  As with the `RidgeReducer` class, this class is initialized with a list of alpha values.

In [0]:
alphas_lvl1 = np.logspace(1, 4, 10)
estimator_lvl1 = RidgeRegression(alphas_lvl1)

In [0]:
modeldf_lvl1_est, cvdf_lvl1 = estimator_lvl1.fit(blockdf_lvl1, labeldf, indexdf)

The `RidgeRegression.fit(blockdf, labeldf, indexdf)` works in much the same way as the `RidgeReducer.fit(blockdf, labeldf, indexdf)` method, except that it returns two DataFrames:

A model DataFrame analogous to the model DataFrame provided by the `RidgeReducer`.  An important difference is that the header block ID for all rows will be 'all', indicating that all headers from all blocks have been used in a single fit, rather than fitting within blocks.

In [0]:
modeldf_lvl1_est.show(4)

A cross validation (cv) report DataFrame, which reports the results of the hyperparameter (i.e., alpha) value optimization routine.
* label:  This is the label corresponding to the cross cv results on the row.
* alpha:  The name of the optimal alpha value
* r2_mean:  The mean out of fold r2 score for the optimal alpha value

In [0]:
cvdf_lvl1.show(4)

#### Producing phenotype predictions *y_hat*
After fitting the `RidgeRegression` model, the model DataFrame and cv DataFrame are used to apply the model to the block matrix DataFrame to produce predictions (*y_hat*) for each label in each sample block using the `RidgeRegression.transform(blockdf, labeldf, modeldf, cvdf)` method

In [0]:
y_hat_lvl1 = estimator_lvl1.transform(blockdf_lvl1, labeldf, modeldf_lvl1_est, cvdf_lvl1)

The resulting *y_hat* DataFrame has the following fields:
* sample_block:  The sample block ID for the samples corresponding to the *y_hat* values on this row.
* label:  The label corresponding to the *y_hat* values on this row
* alpha:  The name of the alpha value used to fit the model that produced the *y_hat* values on this row.
* values:  The array of *y_hat* values for the samples in the sample block for this row.

In [0]:
y_hat_lvl1.show(4)

#### Fitting a second round of ridge reduction instead of ridge regression
After fitting the first ridge reduction step and producing *X1* from *X0*, we can go directly into fitting the final ridge regression model, as we have just seen.  Alternatively, we can fit a second round of ridge reduction to squeeze *X1* into an even smaller feature matrix, which we will call the **level 2** matrix *X2*.  This has some advantages when it comes to generating the leave-one-chromosome-out versions of the *y_hat*s and does not come at much additional cost.  The procedure for fitting the second round of ridge reduction is identical to the first (we will reuse the same alphas we chose for the ridge regression fit above):

In [0]:
stack_lvl1 = RidgeReducer(alphas_lvl1)

In [0]:
modeldf_lvl1 = stack_lvl1.fit(blockdf_lvl1, labeldf, indexdf)
blockdf_lvl2 = stack_lvl1.transform(blockdf_lvl1, labeldf, modeldf_lvl1)

The **level 2** block matrix DataFrame produced here has an identical schema to the **level 1** block matrix.  A key difference is that the header block ID for all headers is now "all" for all headers, indicating that there are now no more blocks to collapse.

In [0]:
blockdf_lvl2.show(4)

The headers for each column now follow the name convention 'all_block_B_alpha_A_label_L', which refer to the ridge model prediction using alpha A and for label L fit using the features from header block B from block matrix *X1*.  Since the blocks in *X1* refer to chromosomes, the block number B here can be interpreted as a chromosome.  The 'all' token reflects the fact that we are not assigning the columns in *X2* to any new blocks (i.e, *X2* only has sample blocks, but there is only one header block which encompasses the entire matrix).

In [0]:
blockdf_lvl2.select('header').show(4, False)

We can now fit a ridge regression model as we did above, except that we will use the matrix *X2* instead of *X1*

In [0]:
alphas_lvl2 = np.logspace(0, 3, 10)
estimator_lvl2 = RidgeRegression(alphas_lvl2)

In [0]:
modeldf_lvl2_est, cvdf_lvl2 = estimator_lvl2.fit(blockdf_lvl2, labeldf, indexdf)

In [0]:
modeldf_lvl2_est.show(4)

In [0]:
cvdf_lvl2.show(4)

In [0]:
y_hat_lvl2 = estimator_lvl2.transform(blockdf_lvl2, labeldf, modeldf_lvl2_est, cvdf_lvl2)

In [0]:
y_hat_lvl2.show(4)

For producing the LOCO versions of the *y_hat* vectors, it is only necessary to filter out rows from `blockdf_lvl2` corresponding to the chromosome we wish to drop before applying the transformation.  For example, if we wanted to produce *y_hat* with chromosome 1 left out (recall that the chromosomes constitute the source blocks for the headers in `blockdf_lvl2`, so headers from chromosome 1 will have headers like %block_1%):

In [0]:
y_hat_lvl2_loco1 = estimator_lvl2.transform(blockdf_lvl2.filter(f'header NOT LIKE "%block_1%"'), labeldf, modeldf_lvl2_est, cvdf_lvl2)

In [0]:
y_hat_lvl2_loco1.show(4)