# Demonstrate and profile insertion

**Set environment**

In [1]:
#######################################################
### Set environment
###++++++++++++++++++++++++++++++++++++++++++++++++++++

### import common packages
import numpy  as np
import itertools as it
import sys, os, gzip
from   functools import reduce

### update print
from functools import partial
print = partial(print, flush=True)

### set working directories
FD_RES = "/gpfs/fs1/data/reddylab/Kuei/out/proj_combeffect"

### import specific packages
import sqlite3
# https://stackoverflow.com/questions/49456158/integer-in-python-pandas-becomes-blob-binary-in-sqlite
sqlite3.register_adapter(np.int64, lambda val: int(val))
sqlite3.register_adapter(np.int32, lambda val: int(val))


### import packages for benchmark performance
import cProfile, pstats, time, timeit
import matplotlib.pyplot as plt

**Global variables of database and file**

In [2]:
#######################################################
### parse arguments
###++++++++++++++++++++++++++++++++++++++++++++++++++++
CHROM   = "chr17"                          #args.chrom
FD_OUT  = os.path.join(FD_RES, "database") #args.fout
FD_INP  = os.path.join(FD_RES, "nuc")      #args.finp
PREFIX  = "test_insert"                    #args.prefix
VERBOSE = True                             #args.verbose

#######################################################
### Global varialbes and I/O
###++++++++++++++++++++++++++++++++++++++++++++++++++++

### file path of fragment database
fdiry  = FD_OUT
fname  = f"{PREFIX}_{CHROM}.db"
FP_DTB = os.path.join(fdiry, fname)

### file path of fragment table
sample  = "Input1_20x"
fdiry   = os.path.join(FD_INP, sample)
fname   = "chr17.bed.gz"
FP_FRG  = os.path.join(fdiry, fname)

### show info
if (VERBOSE):
    print("Global variables:")
    print("Chromsome:  ", CHROM)
    print("Database:   ", FP_DTB)
    print("Input file: ", FP_FRG)
    print()

Global variables:
Chromsome:   chr17
Database:    /gpfs/fs1/data/reddylab/Kuei/out/proj_combeffect/database/test_insert_chr17.db
Input file:  /gpfs/fs1/data/reddylab/Kuei/out/proj_combeffect/nuc/Input1_20x/chr17.bed.gz



## Setup helper function for reading files

In [3]:
##################################################
### Helper functions
### ++++++++++++++++++++++++++++++++++++++++++++++

### helper function to get a chunk of file
def get_chunks(gen, rows=10):
    """Divides the data into chunks with size as rows"""
    iterable = iter(gen)
    while True:
        x = list(it.islice(iterable, rows))
        if not x:
            return
        yield x

### helper function to process each row
def prep_line(line):
    """Function to process each line"""
    ### Decode
    lst = line.decode('ASCII').strip().split('\t') 

    ### parse info
    key = "_".join(lst[0:3])
    val = lst[0:3] + lst[4:-1]
    return [key] + val

def gen_line(file, n_chunksize=None, n_lines=None):
    """generate lines or chunks of lines from the file"""
    ### remove file header
    header = file.readline()
    lines  = file
    
    ### preprocess each line
    fun = prep_line
    gen = map(fun, lines)
    
    ### set number of lines read if specified
    if n_lines is not None:
        gen = it.islice(gen, n_lines)
    
    ### set chunks if specified
    if n_chunksize is not None:
        gen = get_chunks(gen, n_chunksize)

    return gen

## Setup SQL query and functions for creating and inserting into fragment table

In [4]:
##################################################
### Set SQL query
### ++++++++++++++++++++++++++++++++++++++++++++++

query_reset_table = "DROP TABLE IF EXISTS Fragment"

query_table_frag = ("""
    CREATE TABLE IF NOT EXISTS Fragment(
        fragment TEXT PRIMARY KEY, 
        chrom    TEXT,
        start    INTEGER,
        end      INTEGER,
        pct_at   REAL,
        pct_gc   REAL,
        num_A    INTEGER,
        num_C    INTEGER,
        num_G    INTEGER,
        num_T    INTEGER,
        num_N    INTEGER,
        num_oth  INTEGER
    );""")

query_table_auto = ("""
    CREATE TABLE IF NOT EXISTS Fragment(
        fragment TEXT, 
        chrom    TEXT,
        start    INTEGER,
        end      INTEGER,
        pct_at   REAL,
        pct_gc   REAL,
        num_A    INTEGER,
        num_C    INTEGER,
        num_G    INTEGER,
        num_T    INTEGER,
        num_N    INTEGER,
        num_oth  INTEGER
    );""")

query_insert = ("""
    INSERT OR IGNORE INTO Fragment
        (fragment, chrom, start, end, pct_at, pct_gc,
         num_A, num_C, num_G, num_T, num_N, num_oth) 
    VALUES 
        (?,?,?,?,?,?,?,?,?,?,?,?)
    """)

In [5]:
def fun_insert_line_by_line(cursor, query, lines):
    """the core function to profile for line-by-line insertion"""
    for line in lines:
        cursor = cursor.execute(query, line)
    return cursor

def fun_insert_line_by_chunk(cursor, query, chunks):
    """the core function to profile for batch insertion"""
    for chunk in chunks:
        cursor = cursor.executemany(query, chunk)
    return cursor

In [6]:
##################################################
### Set database function
### ++++++++++++++++++++++++++++++++++++++++++++++

def refresh(query_table, fpath_database):
    """
    Helper function to refresh the database by 
    deleting original table and create a new one
    """
    with sqlite3.connect(fpath_database) as conn:
        ### init
        cursor = conn.cursor()

        ### reset table
        query  = query_reset_table
        cursor = cursor.execute(query)

        ### create table
        query  = query_table
        cursor = cursor.execute(query)
        
def insert_line_by_line(n_lines, query_table, fpath_database, fpath_table):
    """Helper function to insert rows into table line by line"""
    
    ### init a new table
    refresh(query_table, fpath_database)
    
    with sqlite3.connect(fpath_database) as conn:
        ### initiation
        cursor = conn.cursor()
        query  = query_insert
        
        ### read file
        with gzip.open(fpath_table, "rb") as file:
            
            ### generate lines
            lines = gen_line(file, n_lines=n_lines)

            ### insert line by line
            #for line in lines:
            #    cursor.execute(query, line)
            cursor = fun_insert_line_by_line(cursor, query, lines)
                
def insert_line_by_chunk(n_lines, n_chunksize, query_table, fpath_database, fpath_table):
    """Helper function to insert rows into table by chunk"""
    ### init a new table
    refresh(query_table, fpath_database)
    
    with sqlite3.connect(fpath_database) as conn:
        ### initiation
        cursor = conn.cursor()
        query  = query_insert
        
        ### read file
        with gzip.open(fpath_table, "rb") as file:
            
            ### generate chunks
            chunks = gen_line(file, n_chunksize=n_chunksize, n_lines=n_lines)
            
            ### insert chunk by chunk
            #for chunk in chunks:
            #    cursor.executemany(query, chunk)
            cursor = fun_insert_line_by_chunk(cursor, query, chunks)

In [7]:
def check_table_size(n_lines, fpath_database=FP_DTB):
    """count the number of rows/lines in a table created in a database"""
    with sqlite3.connect(fpath_database) as conn:
        ### initiation
        cursor = conn.cursor()
        query  = "select count(*) from Fragment"
        
        ### get the table size of table
        cursor = cursor.execute(query)
        counts = cursor.fetchall()
        counts = counts[0][0]
    
    if counts == n_lines:
        print("Check table size:  passed!")    
    else:
        print("Check table size:  failed.")
    

def check_table_lines(fpath_database=FP_DTB, fpath_table=FP_FRG):
    """check the integrity of the table creation"""
    
    def get_line_from_database():
        """generator of lines from database"""
        with sqlite3.connect(fpath_database) as conn:
            ### initiation
            cursor = conn.cursor()
            query  = "select * from Fragment"

            ### get the table size of table
            cursor = cursor.execute(query)
            for line in cursor:
                yield line
                
    def get_line_from_file():    
        """generator of lines from file"""
        with gzip.open(fpath_table, "rb") as file:
            lines = gen_line(file)
            for line in lines:
                yield line
    
    ### compare line by line
    lines_base = get_line_from_database()
    lines_file = get_line_from_file()
    fun = lambda x, y: str(x) == str(y)
    
    for line_base, line_file in zip(lines_base, lines_file):
        ### compare elements for each pair of lines and 
        ### return a list of true/false whether each pair of elements are equal
        res = list(it.starmap(fun, zip(line_base, line_file)))
        
        ### if all elements are equal for each line, continue
        ### if not, exit the function and print the failed message
        if all(res):
            continue
        else:
            print("Check table lines: failed.")
            return
        
    ### all lines are equal
    print("Check table lines: passed!")    

## Test function

In [8]:
n_lines        = 10
n_chunksize    = 3
fpath_database = FP_DTB
fpath_table    = FP_FRG

**Fragment as Primary Key; insert line by line**

In [9]:
insert_line_by_line(n_lines, query_table_frag, fpath_database, fpath_table)

check_table_size(n_lines)
check_table_lines()

Check table size:  passed!
Check table lines: passed!


**Fragment as Primary Key; insert line by chunk**

In [10]:
insert_line_by_chunk(n_lines, n_chunksize, query_table_frag, fpath_database, fpath_table)

check_table_size(n_lines)
check_table_lines()

Check table size:  passed!
Check table lines: passed!


**Primary key is autogenerated; insert line by line**

In [11]:
insert_line_by_line(n_lines, query_table_auto, fpath_database, fpath_table)

check_table_size(n_lines)
check_table_lines()

Check table size:  passed!
Check table lines: passed!


**Primary key is autogenerated; insert line by chunk**

In [12]:
insert_line_by_chunk(n_lines, n_chunksize, query_table_auto, fpath_database, fpath_table)

check_table_size(n_lines)
check_table_lines()

Check table size:  passed!
Check table lines: passed!


## Profile function

In [13]:
n_lines        = 100000
n_chunksize    = 1000
fpath_database = FP_DTB
fpath_table    = FP_FRG
query_table    = query_table_frag

**Profile insertion line by line**

In [14]:
### init a new table
refresh(query_table, fpath_database)

with sqlite3.connect(fpath_database) as conn:
    ### initiation
    cursor = conn.cursor()
    query  = query_insert

    ### read file
    with gzip.open(fpath_table, "rb") as file:

        ### generate lines
        lines = gen_line(file, n_lines=n_lines)

        ### start the profiler
        pr = cProfile.Profile()
        pr.enable()

        ### execution
        cursor = fun_insert_line_by_line(cursor, query, lines)

        ### end the profiler
        pr.disable()
        ps = pstats.Stats(pr).print_stats()

         1011912 function calls in 1.415 seconds

   Random listing order was used

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      794    0.001    0.000    0.001    0.000 {method 'read' of '_io.BufferedReader' objects}
   100000    0.084    0.000    0.160    0.000 {method 'readline' of '_io.BufferedReader' objects}
     4764    0.001    0.000    0.001    0.000 {built-in method builtins.len}
   100000    0.065    0.000    0.065    0.000 {method 'split' of 'str' objects}
   100000    0.022    0.000    0.022    0.000 {method 'join' of 'str' objects}
   100000    0.021    0.000    0.021    0.000 {method 'strip' of 'str' objects}
      794    0.001    0.000    0.001    0.000 {method 'cast' of 'memoryview' objects}
   100000    0.040    0.000    0.040    0.000 {method 'decode' of 'bytes' objects}
   100000    0.180    0.000    0.329    0.000 /tmp/ipykernel_49165/1093682430.py:16(prep_line)
        1    0.152    0.152    1.415    1.415 /tmp/ipykernel_49165/35687

**Profile insertion by chunk**

In [15]:
### init a new table
refresh(query_table, fpath_database)

with sqlite3.connect(fpath_database) as conn:
    ### initiation
    cursor = conn.cursor()
    query  = query_insert

    ### read file
    with gzip.open(fpath_table, "rb") as file:

        ### generate chunks
        chunks = gen_line(file, n_chunksize=n_chunksize, n_lines=n_lines)

        ### start the profiler
        pr = cProfile.Profile()
        pr.enable()

        ### execution
        cursor = fun_insert_line_by_chunk(cursor, query, chunks)

        ### end the profiler
        pr.disable()
        ps = pstats.Stats(pr).print_stats()

         912114 function calls in 1.187 seconds

   Random listing order was used

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      794    0.002    0.000    0.002    0.000 {method 'read' of '_io.BufferedReader' objects}
   100000    0.060    0.000    0.135    0.000 {method 'readline' of '_io.BufferedReader' objects}
        1    0.000    0.000    0.000    0.000 {built-in method builtins.iter}
     4764    0.001    0.000    0.001    0.000 {built-in method builtins.len}
   100000    0.060    0.000    0.060    0.000 {method 'split' of 'str' objects}
   100000    0.020    0.000    0.020    0.000 {method 'join' of 'str' objects}
   100000    0.017    0.000    0.017    0.000 {method 'strip' of 'str' objects}
      794    0.001    0.000    0.001    0.000 {method 'cast' of 'memoryview' objects}
   100000    0.029    0.000    0.029    0.000 {method 'decode' of 'bytes' objects}
      101    0.087    0.001    0.736    0.007 /tmp/ipykernel_49165/1093682430.py:6(get_chu