# Train logistic regression with Keras
Martin Kircher provided two training_data files: one is human readable and the other one is one-hot-encoded.
Now we need to check that the information is actually in the same order.

In [1]:
import pandas as pd
import numpy as np
import pickle
import sklearn
import csv
import sys
from keras.models import Model, Sequential
from keras.layers import Input, Dense
from multiprocessing.dummy import Pool as ThreadPool
from Crypto.Random.random import randint
import dask.dataframe as dd
from itertools import islice
from random import shuffle
from sklearn.model_selection import train_test_split
from sklearn.model_selection import ShuffleSplit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix

Using TensorFlow backend.


## 0. How many lines does the dataset have?
Let's create a shuffled index list and store it in our system, to be able to create batches of data to pass to the fit_generator

In [2]:
training_imputed = "/s/project/kipoi-cadd/data/raw/v1.3/training_data/training_data.imputed.csv"
shuffled_index_file = "/s/project/kipoi-cadd/data/raw/v1.3/training_data/shuffle_splits/shuffled_index.pickle"
training = "/s/project/kipoi-cadd/data/raw/v1.3/training_data/training_data.tsv"

In [None]:
%%time
size_training_set = !cat {training_imputed}  | wc -l # 35043061
size_training_set = int(size_training_set[0]) - 1 # we remove the header line

In [22]:
%%time
def onthefly(n):
    numbers=np.arange(n,dtype=np.uint32)
    for i in range(n):
        j = randint(i, n-1)
        numbers[i], numbers[j] = numbers[j], numbers[i]
        yield numbers[i]

gen = onthefly(size_training_set)
shuffled_index = []
for i in range(size_training_set):
    shuffled_index.append(next(gen))
shuffled_index[:20]

with open(training_imputed, 'wb') as f:
    pickle.dump(shuffled_index, f)

In [3]:
with open(shuffled_index_file, 'rb') as f:
    shuffled_index = pickle.load(f)

## 1. Select idex list for batch and extract with dask

In [4]:
def subset_df(df, list_index):
    print(df.shape)
    print(df.index.values[:5])
    subset = set(list_index) & set(df.index)
    return(df.loc[subset])


def get_batch_indexes(index_list, batch_size, file, output, num_batches=None, sep=','):
    num_indexes = len(index_list)
    amount_batches = (num_indexes // batch_size) + 1
    num_loops = min(num_batches, amount_batches) if num_batches else amount_batches
    batch_indexes = []
    
    print(amount_batches, num_loops)
    
    for i in range(num_loops):
        start = (i)*batch_size
        end = min(num_indexes, start + batch_size)
        extraction = {
            'file': file,
            'output': output+str(i+1)+".pickle",
            'sep': sep,
            'index_list': set(index_list[start:end])
        }
        batch_indexes.append(extraction)
    
    return(batch_indexes)
        

def generate_batch(file, index_list, batch_size, num_batches, sep=','):
    with open(file) as input_file:
        reader = csv.reader(input_file, delimiter=sep)
        header = next(reader)

        for batch in range(num_batches):
            start = (batch)*batch_size
            end = min(len(index_list), start + batch_size)
            batch_indexes = set(index_list[start:end])

            desired_rows = [row for row_number, row in enumerate(reader)
                            if row_number in batch_indexes]
            
            rows_df = pd.DataFrame(desired_rows, index=batch_indexes, columns=header)
            rows_df.y = [0 if r == -1 else r for r in rows_df.y]
            yield (rows_df.iloc[:,1:], rows_df.y)

            if end == len(index_list):
                break


def generate_one_batch(file, index_list, sep=','):
    with open(file) as input_file:
        reader = csv.reader(input_file, delimiter=sep)
        header = next(reader)

        desired_rows = [row for row_number, row in enumerate(reader)
                        if row_number in index_list]

        yield pd.DataFrame(desired_rows, index=index_list, columns=header)


def yield_one_line(file, index_list, batch_size, sep=','):
    with open(file) as input_file:
        reader = csv.reader(input_file, delimiter=sep)
        header = next(reader)
        
        for i in range(batch_size):
            ix = index_list[i]
            row = next(islice(reader, int(ix), None))
            # yield row
            row[0] = 0 if row[0] == -1 else 1
            row_df = pd.DataFrame([row], index=[ix], columns=header)
            yield (row_df.iloc[:,1:], row_df.y)
            reader = next(next(csv.reader(input_file, delimiter=sep)))


def generate_one_batch_noattributes(extraction):
    file = extraction.get('file')
    output = extraction.get('output')
    sep = extraction.get('sep')
    index_list = extraction.get('index_list')

    with open(file) as input_file:
        reader = csv.reader(input_file, delimiter=sep)
        header = next(reader)

        desired_rows = [row for row_number, row in enumerate(reader)
                        if row_number in index_list]

        df = pd.DataFrame(desired_rows, index=index_list, columns=header)
        with open(output, 'wb') as f:
            pickle.dump(df, f)


def generate_one_batch_singlethreaded(extraction):
    for ex in extraction:
        file = ex.get('file')
        output = ex.get('output')
        sep = ex.get('sep')
        index_list = ex.get('index_list')

        with open(file) as input_file:
            reader = csv.reader(input_file, delimiter=sep)
            header = next(reader)

            desired_rows = [row for row_number, row in enumerate(reader)
                            if row_number in index_list]

            df = pd.DataFrame(desired_rows, index=index_list, columns=header)
            with open(output, 'wb') as f:
                pickle.dump(df, f)


In [None]:
batches = 

In [82]:
"""
Threads = 1, num_batches = 2
CPU times: user 8min 14s, sys: 32.2 s, total: 8min 46s
Wall time: 8min 46s

Threads = 2, num_batches = 2
CPU times: user 9min 21s, sys: 46.3 s, total: 10min 7s
Wall time: 9min 41s

Threads = 5, num_batches = 2
CPU times: user 9min 21s, sys: 44.1 s, total: 10min 5s
Wall time: 9min 41s

Threads = 3, num_batches = 3
CPU times: user 16min 54s, sys: 1min 46s, total: 18min 41s
Wall time: 16min 38s

Threads = 1, num_batches = 3
CPU times: user 12min 21s, sys: 47.8 s, total: 13min 9s
Wall time: 13min 9s
"""

'\nThreads = 1, num_batches = 2\nCPU times: user 8min 14s, sys: 32.2 s, total: 8min 46s\nWall time: 8min 46s\n\nThreads = 2, num_batches = 2\nCPU times: user 9min 21s, sys: 46.3 s, total: 10min 7s\nWall time: 9min 41s\n\nThreads = 5, num_batches = 2\nCPU times: user 9min 21s, sys: 44.1 s, total: 10min 5s\nWall time: 9min 41s\n'

In [87]:
%%time
# Test paralellism
pool = ThreadPool(3)
results = pool.map(generate_one_batch_noattributes, batch_indexes) # 9min 35s
pool.close() 
pool.join()

CPU times: user 16min 54s, sys: 1min 46s, total: 18min 41s
Wall time: 16min 38s


In [88]:
%%time
generate_one_batch_singlethreaded(batch_indexes)

CPU times: user 12min 21s, sys: 47.8 s, total: 13min 9s
Wall time: 13min 9s


## 1. Use single threaded generator and feed it to keras.models.fit_generator()
Previous tests have "demonstrated" that multithreading only makes things slower. Now, we use a generator that yields a single line, as required by the `fit_generator` method from `keras.models`. Right now, I'm taking inspiration from this tutorial in Medium: [Simple Logistic Regression using Keras](https://medium.com/@the1ju/simple-logistic-regression-using-keras-249e0cc9a970).

In [11]:
%%time
# batch_generator = yield_one_line(training, shuffled_index, sep='\t')
batch_generator = generate_batch(training_imputed, shuffled_index, 32, 1, sep=',')

CPU times: user 8 µs, sys: 0 ns, total: 8 µs
Wall time: 15.3 µs


In [7]:
%%time
sample = next(batch_generator) # CPU times: user 28min 30s, sys: 50.1 s, total: 29min 21s. Wall time: 29min 19s
print(sample[0].shape)

(32, 1063)
CPU times: user 28min 30s, sys: 50.1 s, total: 29min 21s
Wall time: 29min 19s


In [12]:
%%time
# Build the model
output_dim = 1 # One binary class
input_dim = 1063 # number of features of the input (102 for training, and 1063 for training_imputed)
model = Sequential() 
model.add(Dense(output_dim, input_dim=input_dim, activation='softmax'))
batch_size = 32
nb_epoch = 10

# Compile the model
model.compile(optimizer='sgd', loss='binary_crossentropy', metrics=['accuracy']) 
# history = model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch,verbose=1, validation_data=(X_test, Y_test)) 
history = model.fit_generator(batch_generator, steps_per_epoch=1, epochs=nb_epoch, workers=4, use_multiprocessing=True, shuffle=False)
score = model.evaluate(X_test, Y_test, verbose=0) 
print('Test score:', score[0]) 
print('Test accuracy:', score[1])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10


StopIteration: 

## 1. Read in csv by batch

In [80]:
training = "/s/project/kipoi-cadd/data/raw/v1.3/training_data/training_data.tsv"
training_imputed = "/s/project/kipoi-cadd/data/raw/v1.3/training_data/training_data.imputed.csv"

def get_first_n_batches(file, num_batches, delimiter=','):
    with open(file, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=delimiter)
        header = next(reader) # skip header

        batch_size = 10000
        batch = []
        count = 0
        count_batches = 0

        for row in reader:
            if count >= batch_size:
                yield pd.DataFrame(batch, columns=header)
                count_batches += 1
                batch = []
                count = 0

            batch.append(row)
            count += 1
            
            if count_batches >= num_batches:
                break


def get_batch_from_line_isslice(file, start_line, delimiter=','):
    with open(file, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=delimiter)
        header = next(reader) # skip header

        batch_size = 1000000
        batch = []
        count = 0
        line = start_line
        index = []
        

        for row in islice(reader, start_line, None):
            if count >= batch_size:
                yield pd.DataFrame(batch, columns=header, index=index)
                break

            batch.append(row)
            index.append(line)
            count += 1
            line += 1

def get_batch_from_line_skip(file, start_line, delimiter=','):
    with open(file, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=delimiter)
        header = next(reader) # skip header

        batch_size = 1000000
        batch = []
        count = 0
        line = 0

        for _ in range(start_line): # skip the first start_line rows
            next(reader)
        
        for row in reader:
            if count >= batch_size:
                yield pd.DataFrame(batch, columns=header)
                break

            batch.append(row)
            count += 1

In [66]:
def find_positive_class(file, num_pos, delimiter=','):
    with open(file, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=delimiter)
        header = next(reader) # skip header
        positive_examples = []
        count_pos = 0
        index = []
        line_num = -1
        
        for row in reader:
            line_num += 1
            if count_pos >= num_pos:
                yield pd.DataFrame(positive_examples, columns=header, index=index)
                # yield row[0]
                break
            if row[0] != '0':
                positive_examples.append(row)
                index.append(line_num)
                count_pos += 1
            else:
                pass

In [67]:
%%time
pos_examples = next(find_positive_class(training, 10, delimiter='\t'))

CPU times: user 2min 39s, sys: 7.26 s, total: 2min 46s
Wall time: 2min 46s


In [68]:
pos_examples

Unnamed: 0,y,Chrom,Pos,Ref,Alt,Type,Length,isTv,Consequence,GC,...,SIFTcat,SIFTval,mirSVR-Score.na,targetScan.na,cDNApos.na,CDSpos.na,protPos.na,Grantham.na,PolyPhenVal.na,SIFTval.na
17521530,1,1,887521,T,G,SNV,0,1,CS,0.58,...,UD,0,1,1,1,1,1,1,1,1
17521531,1,1,887553,G,T,SNV,0,1,I,0.55,...,UD,0,1,1,1,1,1,1,1,1
17521532,1,1,887952,C,T,SNV,0,0,NS,0.59,...,deleterious,0,1,1,0,0,0,0,0,0
17521533,1,1,887985,G,A,SNV,0,0,S,0.6,...,UD,0,1,1,1,1,1,1,1,1
17521534,1,1,887987,A,G,SNV,0,0,S,0.6,...,UD,0,1,1,1,1,1,1,1,1
17521535,1,1,888184,T,C,SNV,0,0,I,0.68,...,UD,0,1,1,1,1,1,1,1,1
17521536,1,1,888213,C,T,SNV,0,0,I,0.67,...,UD,0,1,1,1,1,1,1,1,1
17521537,1,1,888503,C,T,SNV,0,0,I,0.62,...,UD,0,1,1,1,1,1,1,1,1
17521538,1,1,888504,G,A,SNV,0,0,I,0.62,...,UD,0,1,1,1,1,1,1,1,1
17521539,1,1,888537,C,T,SNV,0,0,I,0.62,...,UD,0,1,1,1,1,1,1,1,1


In [42]:
%%time
myframe = next(get_first_n_batches(training, 1, delimiter='\t'))
# myframe.head()

CPU times: user 249 ms, sys: 46.8 ms, total: 295 ms
Wall time: 295 ms


In [83]:
%%time
# myotherframe = next(get_batch_from_line_skip(training, 3000000, delimiter='\t')) # 29.3 s
# myotherframe = next(get_batch_from_line_isslice(training, 17000000, delimiter='\t')) # 4min 5s
# myotherframe = next(get_batch_from_line_isslice(training_imputed, 17000000)) # 23min 59s
# myotherframe = next(get_batch_from_line(3000000))

CPU times: user 21min 45s, sys: 2min 13s, total: 23min 59s
Wall time: 24min 1s


In [85]:
# np.unique(myframe.y)
#np.unique(myotherframe.y)
#myotherframe.shape
myotherframe.tail()

Unnamed: 0,y,RefxA,RefxC,RefxG,RefxT,RefxN,AltxA,AltxC,AltxG,AltxT,...,YxM,YxN,YxP,YxQ,YxR,YxS,YxT,YxV,YxW,YxY
17999995,1,0,0,1,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,0
17999996,1,0,0,1,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,0
17999997,1,0,1,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
17999998,1,0,0,1,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
17999999,1,0,0,1,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,0


In [105]:
y = training_df.y
X = training_imp_df.drop(columns="y")

# Inserting some artificial positive examples
np.random.seed(10)
msk = np.random.rand(len(y)) < 0.2
y[msk] = 1

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


In [106]:
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8, test_size=0.2)

In [107]:
# Drop constant columns
constant_cols = set()
dataset = X
for col in dataset:
    # print(dataset.shape, type(dataset))
    # print(len(np.unique(dataset[col])))
    if len(np.unique(dataset[col])) < 20 and col != 'y':
        constant_cols.add(col)
print("Droped", len(constant_cols), "constant cols.")

for dataset in [X_train, X_test, y_train, y_test]:
    dataset.drop(columns=list(constant_cols), inplace=True, errors='ignore')

X.drop(columns=list(constant_cols), inplace=True, errors='ignore')

Droped 824 constant cols.


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  errors=errors)


## 2. Split dataset into train and test

In [108]:
lr = LogisticRegression(penalty='l2', solver='lbfgs', n_jobs=10)
lr.fit(X_train, y_train)

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='warn', n_jobs=10,
          penalty='l2', random_state=None, solver='lbfgs', tol=0.0001,
          verbose=0, warm_start=False)

In [109]:
y_train_pred = lr.predict(X_train)

In [110]:
print("Read like \n tn, fp, \n fn, tp \n")
tn, fp, fn, tp = confusion_matrix(y_train, y_train_pred).ravel()
confusion_matrix(y_train, y_train_pred)

Read like 
 tn, fp, 
 fn, tp 



array([[6304,    0],
       [1695,    1]])