In [None]:
## Packages

import csv
import re
import pandas as pd
import os
import sys
import argparse





In [1]:
## -------------------------------
##  DEBUGGING & LOCAL RUNS 
## -------------------------------
# .... Params uncomment when debugging on local
os.chdir('/Users/david/tmp/') ## use if local
sheet_name = 'CTG_SampleSheet.labsheet.test.csv' ## use if local

# Get batchid from labsheet name - add to demux-samplesheet
# batchid_sheetname=sheet_name.split(".")[2]

# # The Sample_Project, Sample_ID, and Sample_Name columns accept alphanumeric characters, hyphens (-), and underscores (_).
force_Sample_Name = True  # if to force Sample_Name(s) supplied in [Data] column to the same as Sample_ID
fastq_suffix = "_001.fastq.gz" # "Suffix needed to auto-generate fastq file names generated by bcl2fastq. If NULL no bam file names will be genrerated"
bam_suffix = "_Aligned.sortedByCoord.out.bam"  ## "Suffix needed to auto generate bam file names (typically generated by STAR). If NULL no bam file names will be genrerated"
force_fastq_names = False # Set to true if topai overwrite fastq filenames. By defualt (fastq_1/fastq_2) columns will not be overwritten if present (even though fastq_suffix is supplied)
force_bam_names = False # Set to true if to overwrite bam filenames. By defualt (bam) column will not be overwritten if present (even though bam_suffix is supplied)

## ADD UNIQUE FASTQ IF MULTIPLE LANES * collapse lanes * Special cases when same sample is distributed over multiple lanes within a single project.
allow_dups_over_lanes = True # If to allow duplicates (within one project) on multiple lanes. Rare on NovaSeq but can be found for S4 with lane divider. One sample may be run on both lane 1/2 or on 3/4.
collapse_lanes = True ## Like allow_dups_over_lanes, affects project specific samplsheets NOT demux sheet.  special cases - when a single (same) sample is present in multiple lanes AND --noLaneSplitting is True in bcl2fastq. Then SampleSheet should be collapsed from Lane to individual sample (fastq R1/R2 files )
#force_fastq_names = False

cwd = os.path.basename(os.getcwd())
runfolder_root="/projects/fs1/nas-sync/upload/"

In [2]:
## -------------------------------------
##   LOOKUP DICTIONARIES & VARIABLES 
## -------------------------------------

## Pipeline dict. check allowed Pipeline & pipeline profiles
lookup_pipelines = {
    'seqonly': ['bcl2fastq_default','fastq_demux','rawdata_runfolder'],
    'ctg-rnaseq': ['rnaseq_mrna','rnaseq_total','uroscan','fastq_demux','rawdata_runfolder','rawdata'],
    'dna-dragen': ['panel_twist_comprehensive_dragen','panel_gmck_dragen','panel_gms_dragen','bam_alignment_dragen','wgs_dragen'],
    'demux-runfolder': ['bcl2fastq_default']
    }



## a dictionary is used to find the corresponding [Data] section to a [Header] param
## key = [Header] param name
##  DataCol: [Data] column name
##  Catenate: boolean if to collapse multiple entries. This is id Data column contanins multiple (non unique) values, if to collapse these in Header section with semicolon.
##  RegExp: regexp to parse. what characters are allowed for this entry. Leave blank if to use the default character setup set in default_regexp
##  Controlled: if the entry has a controlled vocab or not (not yet implemented)
default_regexp='[^0-9a-zA-Z\_\.\-\+\@\(\)\;\,\'\"\| ]+'

params_dict = {
    'ProjectID': {'DataCol': 'Sample_Project','Catenate': True,'RegExp': '[^0-9a-zA-Z\_\|]+','Controlled': False},
    'PipelineName': {'DataCol': 'PipelineName','Catenate': False,'RegExp': '','Controlled': True},
    'PipelineVersion': {'DataCol': 'PipelineVersion','Catenate': False,'RegExp': '','Controlled': True},
    'PipelineProfile': {'DataCol': 'PipelineProfile','Catenate': False,'RegExp': '','Controlled': True},
    'Species': {'DataCol': 'Sample_Species','Catenate': False,'RegExp': '','Controlled':False},
    'ReferenceGenome': {'DataCol': 'Sample_ReferenceGenome','Catenate': False,'RegExp': '','Controlled':False},
    'email-ctg-lab': {'DataCol': 'email_ctg_lab','Catenate': False,'RegExp': '[^0-9a-zA-Z\.\-\_\@\|]+','Controlled': False},
    'email-ctg-bnf': {'DataCol': 'email_ctg_bnf','Catenate': False,'RegExp': '[^0-9a-zA-Z\.\-\_\@\|]+','Controlled': False},
    'email-ctg-all': {'DataCol': 'email_ctg_all','Catenate': False,'RegExp': '[^0-9a-zA-Z\.\-\_\@\|]+','Controlled': False},
    'name-pi': {'DataCol': 'name_pi','Catenate': False,'RegExp': '','Controlled':False},
    'email-customer': {'DataCol': 'email_customer','Catenate': False,'RegExp': '[^0-9a-zA-Z\.\-\_\@\|]+','Controlled': False},
    'Assay': {'DataCol': 'Assay','Catenate': False,'RegExp': '','Controlled': True},
    'IndexAdapters': {'DataCol': 'IndexAdapters','Catenate': False,'RegExp': '','Controlled': True},
    'Strandness': {'DataCol': 'Sample_Strandness','Catenate': False,'RegExp': '','Controlled': True},
    'FragmentationTime': {'DataCol': 'fragmentation_time','Catenate': False,'RegExp': '','Controlled': True},
    'PCR-cycles': {'DataCol': 'pcr_cycles','Catenate': False,'RegExp': '','Controlled': True},
    'PairedEnd': {'DataCol': 'Sample_PairedEnd','Catenate': False,'RegExp': '','Controlled': True},
    'PoolConcNovaSeq': {'DataCol': 'Pool_Conc_NovaSeq','Catenate': False,'RegExp': '','Controlled': True},
    'PoolMolarityNovaSeq': {'DataCol': 'Pool_Molarity_NovaSeq','Catenate': False,'RegExp': '','Controlled': True}}

# params_datacols - lookup dictionary for [Data] column -> [Header] param
params_datacols=[params_dict[c]['DataCol'] for c in params_dict.keys()]
params_datacols=dict.fromkeys(params_datacols)
for c in params_dict.keys():
    params_datacols[params_dict[c]['DataCol']]=c

print(params_datacols)


{'Sample_Project': 'ProjectID', 'PipelineName': 'PipelineName', 'PipelineVersion': 'PipelineVersion', 'PipelineProfile': 'PipelineProfile', 'Sample_Species': 'Species', 'Sample_ReferenceGenome': 'ReferenceGenome', 'email_ctg_lab': 'email-ctg-lab', 'email_ctg_bnf': 'email-ctg-bnf', 'email_ctg_all': 'email-ctg-all', 'name_pi': 'name-pi', 'email_customer': 'email-customer', 'Assay': 'Assay', 'IndexAdapters': 'IndexAdapters', 'Sample_Strandness': 'Strandness', 'fragmentation_time': 'FragmentationTime', 'pcr_cycles': 'PCR-cycles', 'Sample_PairedEnd': 'PairedEnd', 'Pool_Conc_NovaSeq': 'PoolConcNovaSeq', 'Pool_Molarity_NovaSeq': 'PoolMolarityNovaSeq'}


In [None]:
## -------------------------------------
##     FUNCTIONS 
## -------------------------------------

## get csv type (if semicolon or comma)
def find_csv_delimiter(sheet_name=None):
    ## function to determine if csv file uses comma or semicolon as separator
    ## simply counts the number of ',' and ';'. Who wins this battle will be thew winner
    ## Input: csv file path
    ## Output: a character - separator (',' or ';')

  #sheet_name='/Users/david/tmp/CTG_SampleSheet.labsheet.test.csv'
  print(f' ... determining csv file separator')
  print(f' ... ... Reading: {sheet_name} ')

  count_comma = 0
  count_semic = 0
  csvfile = open(sheet_name, "r")
  for i in csvfile:
    for c in i:
      if c == ',': count_comma += 1
      elif c == ';': count_semic += 1
  csvfile.close()
  print(f' ... ... {count_comma} commas vs  {count_semic} semicolons')
  if count_comma >= count_semic:
    return_char=','
  else:
    return_char=','
  print(f' ... ... returning "{return_char}" ' )
  return(return_char)


## extract param function. Return value from second instance if found. else return blank
def get_param(param_name=None, myDict=None):
    if param_name in myDict.keys(): return(myDict[param_name][1])
    else: return('')
## end function


## harmonize params ... 
def harmonize_header_params(input_row=None, data_mat=None, data_col=None, allowMultiple=None, ingoreBlanks=None):
    ## function for harmonizing parameters that are present in [Header] and [Data] (individual samples)
    ## [Header] and [Data] param pairs often do not have identical names
    ## Main principle is to look at values in [Data] column and replace the [Header] with that value(s)
    ##  - if unique value in [Data] - Replace!
    ##  - if >1 value collapse 'multiple' (default), or separate values by comma.
    return_row = input_row
    if data_col in data_mat.columns.tolist():
        if len(data_mat[data_col].unique())== 1:
            return_row[1] = data_mat[data_col].tolist()[0]
        else:
            return_row[1] = 'multiple'
            if allowMultiple==False:
                raise ValueError(f'Error: Multiple values found in [Data] column "{data_col}" when harmonizing [Header] and [Data] params. Multiple values are not allowed within one and the same project as defined by the "params_dict" object in this python script. Values found were:  {data_mat[data_col].unique()}' )
        if not return_row[1]==input_row[1]:
            print(f' ... ... Harmonizing values. [Header] param "{input_row[0]}" changed from "{input_row[1]}" to [Data] "{data_col}" columns value: {return_row[1]}')
        # if return_row[1]==input_row[1]: ## no action

    return(return_row)
    ## end function

def replace_characters_foo(list_in=None, RegExp='[^0-9a-zA-Z\_\.\-\+\@\(\)\;\,\'\"\| ]+', my_sub=''):
    ## function to replace non-allowed characters with a character. 
    ## input is a list. loops thorugh the entire list
    p = re.compile(RegExp)
    list_out = list_in
    l_index = 0
    for list_i in list_in:
        substring=p.sub(my_sub, list_i)    
        if not substring == list_i:
            print(f' ... ... ... Illegal character in "{list_i}". replacing with "{substring}"')
            list_out[l_index]=substring
        l_index+=1
    # print(f'{list_out}')   
    return(list_out)

def replace_booleans_foo(list_in):
    ## replace booleans with lowercase (fit for bash/nextflow scripting)
    list_out = list_in
    l_index = 0
    for list_i in list_in:
        if list_i.lower() in ['true','false']:
            list_out[l_index]=list_i.lower()
        l_index+=1
    return(list_out)


def read_samplesheet_section(sheet_name=None, sheet_section=None, section_is_dataframe=False, allowMultiple=False, RegExp='[^0-9a-zA-Z\_\.\-\+\@\(\)\;\,\'\"\| ]+'):
    ## function for reading the different sections in a IEM style sample sheet.
    ## Each section is defined with a header brackets, such as [Header], [Data] etc
    ## Input: a CTG style samplesheet
    ##       accepts both comma and semicolon separated (determined using find_csv_delimiter function)
    ## Output: a samplesheet section that is parsed/curated, to use/print in a parsed sampleheet by the ctg-parse-samplesheeet script
    ## Behaviour: 
    ##   duplicate params/columns within a section is not allowed.
    ##   params with (all) blank values are removed
    ##   blank rows will always mark the end of a section, i.e. a section starts from its [Data] and ends at 1st blank row

    ## Arguments:
    ##   sheet_name: name of csv sample sheet
    ##   sheet_section: section to parse and extract. Use brackets!!  
    ##   section_type: argument (rows or matrix) will define how the section is read and returned
    ##   RegExp: Allowed characters. A default regular expression to control allowed characters within this section. Here, the regexp is pretty inclusive, listing characters allowed over all sectionss. For more stringent regexp filtering add these steps later 
    ## 
    import csv
    import re
    import pandas as pd
    import os
    import sys
    import argparse

    ## debugging:
    # sheet_name='/Users/david/tmp/CTG_SampleSheet.labsheet.test.csv'
    # section_is_dataframe=True
    # sheet_section='[Data]'
    # RegExp='[^0-9a-zA-Z\_\.\-\+\@\(\)\;\,\'\"\| ]+'

    ## Start function
    ## --------------
    n_rows=len(open(sheet_name).readlines())
    print(f' ... Fetching {sheet_section} section from "{sheet_name}"')
    print(f' ... ... ')
    sheet_delim = find_csv_delimiter(sheet_name=sheet_name) # determine delimiter
    

    with open(sheet_name, 'r', encoding='utf-8-sig') as csvfile:
        allines = csv.reader(csvfile, delimiter=(sheet_delim), quotechar='"', skipinitialspace=True)    
        myLine = 0
        s_index = 0
        read_section = False
        myDict={} ## dictionary in which to store section to be read

        ## main approach is to read until between given section header and 1st blank line
        ## -------------------------------
        print(f' ... ... Reading lines in SampleSheet ...')
        print(f' ... ... (File is {n_rows} rows )')
        for row in allines:
            myLine+=1
            if len(row) < 2: # quickfix for if csv file has no proper commas. causes problems if only the param listed but not followed by comma (and a value)
                row.append('') # Append a blank value to the 'row' object (minimum length is 2)
            
            ## Read rows that span between section of interrest and first blank row after
            if row[0] == sheet_section:
                read_section = True
                found_section = True
                print(f' .. ... found supplied sheet_section header: {sheet_section} at line {myLine}')
                print(f' ... ... ... reading data')
                continue ## section header identified, continue reading next line and store (until blank line)
            elif read_section == True and all(elem == '' for elem in row):
                read_section = False
                print(f' ... ... stopped reding at blank line: {myLine}')
                continue
            
            ## If inbetween [section] and blank row
            if read_section == True: ## parse this line and store in output dict
                
                # replace very illegal characters using replace_characters_foo
                row = replace_characters_foo(list_in=row, RegExp='[\,]', my_sub=' ') ## replacing commas if present
                row = replace_characters_foo(list_in=row, RegExp='[\s]+', my_sub=' ') ## replace all (including illegal spaces) with regular space
                row = replace_characters_foo(list_in=row, RegExp=RegExp, my_sub='') ## replace all (including illegal spaces) with regular space
                
                # set any booleans to lowercase (true/false)
                row = replace_booleans_foo(row)

                if not section_is_dataframe:
                    ## If this section is not a matrix/data frame. Then expext one value per parameter (row)
                    ## <parameter>,<value>, ... i.e. row[0] is the parameter and row[1] is the value
                    ## store the row[1] in the output dict key row[0]. Raise error & exit if param already has been read
                    if row[0] in myDict.keys():
                        raise ValueError(f' ... ... ... Error: Duplicate {sheet_section} parameter found: "{row[0]}"')                
                    ## Store parameter value in myDict dictionary (dict key is the paramter) 
                    myDict[row[0]] = row[1]
                    print(f' ... ... ... read param:  {row[0]}')

                elif section_is_dataframe:
                    ## If data frame type of section. first import into dictionary (use s_index as row index)                
                    ## First row will become header - do not allow any special characters other than underscore here
                    if s_index == 0:
                        print(f' ... ... ... reading header row for data frame. ')
                        print(f' ... ... ... ... allow ony underscore as special character')
                        row = replace_characters_foo(list_in=row, RegExp='[^0-9a-zA-Z\_+]', my_sub='') ## replace all (including illegal spaces) with regular space
                        print(row)
                    myDict[s_index] = row
                    # print(f' ... ... ... read data frame row:  {s_index}')
            s_index += 1
        print(f' ... ... ok ')
        
        ## All data is read. Now crunch dictionary into panda data frame if section is df
        if section_is_dataframe and found_section:
            # generate Pandas Data Frame 
            print(f' ... ... section_is_dataframe set to true:')
            print(f' ... ... ... generating pandas dataframe')
            df = pd.DataFrame(myDict)
            df = df.transpose() # transpose from dict
            df.rename(columns=df.iloc[0], inplace=True) # first [Data] row is headers
            df = df.iloc[1: , :]
            print(f' ... returning data frame with dimensions: {df.shape}')
            return(df)
        elif section_is_dataframe and found_section:
            print(f' ... returning dictionary')
            return(myDict)
        else:
            print(f' ... section header {sheet_section} not found. returning ""')
            return('')