<a href="https://colab.research.google.com/github/DPariser/machinelearning/blob/main/Read_gz_df.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Environment Setups

## System info - use watermark

* Show version of the python system

In [None]:
!pip install watermark -q

%load_ext watermark
%watermark -v -p torch

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m54.3 MB/s[0m eta [36m0:00:00[0m
[?25hPython implementation: CPython
Python version       : 3.9.16
IPython version      : 7.9.0

torch: 1.13.1+cu116



## Google drive - Mount

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Common functions




## Memory usage - Dataframe

In [None]:
import pandas as pd
import gzip
import io

def report_mem(df):
  memory_used = df.memory_usage(deep=True).sum() / (1024 * 1024)
  print('>>>>>')
  print(f"Memory usage of dataframe: {memory_used:.2f} MB")
  print('>>>>>')

# Reading csv - in gz

* We want to read the gz file without extracting it. It makes sense to do so for large files

## Small file - read into Dataframe

In [None]:
# Read GSE158055 cell annotation csv
path = "/content/drive/My Drive/LungMk/Data files/GSE158055_cell_annotation.csv.gz"

df = pd.read_csv(path, compression = 'gzip')
report_mem(df)

>>>>>
Memory usage of dataframe: 379.56 MB
>>>>>


In [None]:
df.head()

Unnamed: 0,cellName,sampleID,celltype,majorType
0,AACAGGGGTCGGATTT-0,S-S070-1,Mono_c1-CD14-CCL3,Mono
1,AACCAACGTCCGAAAG-0,S-S070-1,B_c02-MS4A1-CD27,B
2,AACCTTTGTAGCACGA-0,S-S070-1,B_c01-TCL1A,B
3,AAGCATCTCTATCGCC-0,S-S070-1,Mono_c2-CD14-HLA-DPB1,Mono
4,AATCACGGTCATAAAG-0,S-S070-1,B_c01-TCL1A,B


## Large file - check line count

In [None]:
def count_lines(fp):
  with gzip.open(fp, 'rb') as f:
      num_total_rows = sum(1 for line in f)
      return num_total_rows

import time
start_time = time.time()

# Print the total number of rows in the file
print(f"Total number of rows: {count_lines(path)}")
print("Time taken %s seconds" % (time.time() - start_time))

Total number of rows: 1462703
Time taken 1.1179499626159668 seconds


## Large file - read part into DF

In [None]:
df = pd.read_csv(path, compression = 'gzip', nrows=1000, skiprows=range(1, 3))
report_mem(df)
df.head()

>>>>>
Memory usage of dataframe: 0.26 MB
>>>>>


Unnamed: 0,cellName,sampleID,celltype,majorType
0,AACCTTTGTAGCACGA-0,S-S070-1,B_c01-TCL1A,B
1,AAGCATCTCTATCGCC-0,S-S070-1,Mono_c2-CD14-HLA-DPB1,Mono
2,AATCACGGTCATAAAG-0,S-S070-1,B_c01-TCL1A,B
3,AATCACGGTGGTTCTA-0,S-S070-1,B_c02-MS4A1-CD27,B
4,AATGAAGCAACTGGTT-0,S-S070-1,Macro_c2-CCL3L1,Macro


# Read mtx - in gz

## Process line ranges

* For very large files, we can pick the lines we are interested to work on.
* Here we are not loading the file into any objects, just process selected lines as we read them into memory.

In [None]:
mtx_fp = '/content/drive/My Drive/Data files/GSE158055_covid19_counts.mtx.gz'

def read_mtx(fp, callback, start=0, end=-1):
  with gzip.open(fp, 'rt') as f:
    bound = f
    if end > 0:
      bound = range(end)
    for i in bound:
          line = next(f)
          if i >= start:
            callback(line.strip())

read_mtx(mtx_fp, lambda l: print(l), 3, 8)

18558 1 62
18565 1 8
18564 1 29
18562 1 96
18563 1 8


### Split into chunks

* The below split function can split the file into chunks.
* The different bound identifying function and chunk processor can be plugged in to achieve the need to break the file into chunks or process the chunks in memory.
* The end of the cell show 3 different use-cases.
* Use-case 1 split the file into column-based chunks and saves the chunks in separate files. The assumption is that the mtx file is sorted by column.
* Use-case 2 split the file into size-based chunks and save the chunks in separate files.
* Use-case 3 split the file into colmn-based chunks and make a generator out of it so that it can be process in parallel efficiently later.

In [None]:
import gzip


def split(fname, search_bound, max_chunk, chunk_processor, skip=3, encoder='utf-8'):
    comp = fname.split('.')
    suffix = '.'.join(comp[1:])
    prefix = comp[0]
    global col_val
    col_val = None


    with gzip.open(fname, 'rt') as f:
        chunk = 1
        buffer = []
        chunk_size = 0
        header_size = 0
        for line_number, line in enumerate(f):
            if line_number < skip:
                chunk_size += len(line.encode(encoder))
                header_size = chunk_size
                buffer.append(line)
                continue

            # print(f"Chunk {chunk}, Line: {line_number}", end='\r')

            result = None
            if search_bound(line) is False:
                buffer.append(line)
                chunk_size += len(line.encode(encoder))
            else:
                # Create new chunk
                result = chunk_processor(prefix, suffix, chunk, buffer)
                chunk += 1
                buffer = buffer[0:skip]
                buffer.append(line)
                chunk_size = header_size + len(line.encode(encoder))

            if chunk_size >= max_chunk:
                result = chunk_processor(prefix, suffix, chunk, buffer)
                chunk += 1
                buffer = buffer[0:skip]
                chunk_size = header_size

            if result is not None:
                yield result

        # Write the remainder of the file
        if buffer:
            chunk_processor(prefix, suffix, chunk, buffer)


class UnsupportedFileFormatError(Exception):
    pass


# Chunk by column
def column_based(line):
    global col_val
    if line[0:1] == '%':
        # These are header rows
        return False
    else:
        vals = line.split()
        new_val = vals[1]
        if col_val is None:
            col_val = new_val
        if int(col_val) > int(new_val):
            print(f"col_val={col_val} and new_val={new_val}")
            raise UnsupportedFileFormatError("Columns are not sorted in ascending order")
        if col_val == new_val:
            return False
        else:
            # We found the boundary, start the next chunk
            col_val = new_val
            return True


# Chunk purely by chunk size given
def chunk_based(line):
    return False


# Save chunk - this is the default chunk processor
def save_chunk_gz(prefix, suffix, chunk, buffer):
    filename = f"{prefix}.chunk{chunk}.{suffix}"
    with gzip.open(filename, 'wt') as file:
        file.writelines(buffer)
    return None


# Save chunk - this is the default chunk processor
def save_chunk(prefix, suffix, chunk, buffer):
    ss = suffix
    if ss[-3] == '.gz':
        ss = suffix[0:-3]
    filename = f"{prefix}.chunk{chunk}.{ss}"
    with open(filename, 'wt') as file:
        file.writelines(buffer)
    return None


def dispense(prefix, suffix, chunk, buffer):
    return buffer


infile = '/content/drive/My Drive/Data files/GSE158055_covid19_counts.mtx.gz'
max_chunk_size = 1024 * 1024 * 100  # 100 MB

use_example = 3
# Example 1: split file as column-based chunks.
if use_example == 1:
    split(infile, column_based, max_chunk_size, chunk_processor=save_chunk)

# Example 2: split file as chunks, the size of each chunk does not exceed the max chunk size
if use_example == 2:
    split(infile, chunk_based, max_chunk_size, chunk_processor=save_chunk_gz)

# Example 3: split file into column-based chunks and yield one chunk at a time
if use_example == 3:
  count = 0
  for lines in split(infile, column_based, max_chunk_size, chunk_processor=dispense):
    count += 1
    print(f"Processed another chunk of {len(lines)} lines.")
    if count > 3:
      break


Processed another chunk of 4207 lines.
Processed another chunk of 1318 lines.
Processed another chunk of 3559 lines.
Processed another chunk of 4283 lines.


### Parse mtx chunk

* This function extends on use-case 3 to process the chunks by parse it into column, rows and values to prepare it for sparse matrix construction.

In [None]:
import numpy as np

def parse_mtx_chunk(lines, skip=3):
  if len(lines) < 3:
    return (-1, [], [])

  row_indices = []
  col_index = int(lines[-1].split()[1])
  values = []
  for line in lines[skip:]:
    cells = line.split()
    row_indices.append(int(cells[0]))
    values.append(int(cells[2]))
  return (col_index, row_indices, values)

for lines in split(infile, column_based, max_chunk_size, chunk_processor=dispense):
  # We just process one chunk for demo purpose
  col_index, row_indices, values = parse_mtx_chunk(lines)
  if col_index==-1:
    print('No records found!')
  else:
    print(f"col_index={col_index}, rows={len(row_indices)}, max_value={np.amax(values)}")
  break


col_index=1, rows=4204, max_value=1995


### Build COO Matrix

* Use this to construct sparse matrix to process the rows and columns interested. 

In [None]:
from typing_extensions import Self
import numpy as np
from scipy.sparse import coo_matrix

def dim_floor(array):
  mina = min(array)
  array = list(map(lambda x: x - mina, array))
  return array

class CooMatrixBuilder:
  def __init__(self):
    self.chunks = []

  def append(self, chunk):
    self.chunks.append(chunk)
    return Self

  def build(self, shape=None):
    cols = []
    rows = []
    vals = []
    for chunk in self.chunks:
      col_index, row_indices, values = parse_mtx_chunk(chunk)
      rows.extend(row_indices)
      cols.extend(np.full(len(row_indices), col_index))
      vals.extend(values)

    if shape is None:
      # Only focus on what we have
      rows = dim_floor(rows)
      cols = dim_floor(cols)
      shape = (max(rows)+1, max(cols)+1)

    return coo_matrix((vals, (rows, cols)), shape=shape)


# We will build a coo matrix for 3 chunks. In reality, we can combine any 
# random chunks into a COO Matrix and then subsequently do matrix calculation.
count = 0
builder = CooMatrixBuilder()
for lines in split(infile, column_based, max_chunk_size, chunk_processor=dispense):
  # We just process one chunk for demo purpose
  builder.append(lines)
  count += 1
  if(count > 2):
    # For demo purpose only. In reality, adjust the exit condition 
    break

matrix = builder.build()
print(matrix.data.max())
print(matrix.shape)

1995
(27929, 3)


# Use Dask

* Dask API is a subset of pandas and it is intended for handling very large dataset
* To use Dask, simple drop-in-replace pandas with dask.

## Large csv file in gz

* WIP: It seems that we need to unzip the file first in order to be able to use the blocksize.

In [None]:
import dask.dataframe as dd

path = "/content/drive/My Drive/LungMk/Data files/GSE158055_cell_annotation.csv.gz"
blocksize=50e6
x = dd.read_csv(path, blocksize=blocksize, compression = 'gzip')  
x.head()

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(


Unnamed: 0,cellName,sampleID,celltype,majorType
0,AACAGGGGTCGGATTT-0,S-S070-1,Mono_c1-CD14-CCL3,Mono
1,AACCAACGTCCGAAAG-0,S-S070-1,B_c02-MS4A1-CD27,B
2,AACCTTTGTAGCACGA-0,S-S070-1,B_c01-TCL1A,B
3,AAGCATCTCTATCGCC-0,S-S070-1,Mono_c2-CD14-HLA-DPB1,Mono
4,AATCACGGTCATAAAG-0,S-S070-1,B_c01-TCL1A,B


## Large mtx file in gz

* Can we treat is like a csv and read it using read_csv?
* WIP: also need to unzip it to use the blocksize

In [None]:
path = '/content/drive/My Drive/Data files/GSE158055_covid19_counts.mtx.gz'
x = dd.read_csv(path, blocksize=blocksize, sep=' ', skiprows=3, header=None, compression = 'gzip')  
x.head(5)

## Large mtx file in gz

* WIP: Need to modify this to read by chank and use together with generator
* Or we can use the very cusom Build COO Matrix implementation above.

In [None]:
path = '/content/drive/My Drive/Data files/GSE158055_covid19_counts.mtx.gz'

import dask.array as da
import scipy.sparse as sp
from dask import delayed

# Define a delayed function to read the sparse matrix
@delayed
def read_sparse_matrix(filename):
    with gzip.open(filename, 'r') as f:
        # Read the header and extract the matrix dimensions
        for i in range(3):
            f.readline()

        header = f.readline()
        num_rows, num_cols, num_entries = map(int, header.split())

        # Read the sparse matrix data and create a scipy.sparse.coo_matrix
        rows = []
        cols = []
        data = []
        for line in f:
            row, col, val = map(float, line.split())
            rows.append(int(row) - 1)
            cols.append(int(col) - 1)
            data.append(val)
        coo_matrix = sp.coo_matrix((data, (rows, cols)), shape=(num_rows, num_cols))

        # Convert the scipy.sparse.coo_matrix to a Dask array
        dask_array = da.from_delayed(delayed(coo_matrix.toarray)(), shape=(num_rows, num_cols), dtype=float)
        return dask_array

# Call the delayed function to read the sparse matrix and create a Dask array
data = read_sparse_matrix(path)

# Preview the first few rows of the array
print(data.head(5).compute())