In [1]:
import re
import os
from globals import *
from collections import defaultdict
import logging
import pandas as pd

ImportError: attempted relative import with no known parent package

In [32]:
def check_extension(df):
    """checks all files have same extension from pandas df, to use in generete sample table function"""
    uniques = df['ext'].unique()
    if len(uniques) > 1:
        logging.error(f"{RED}Your input directory has multible fastq file extensions, please check directory.{NC}")
        exit(1)

    else:
        return uniques

def check_PE(df):
    """checks all files either single or paried ended from pandas df, to use in generete sample table function"""
    uniques = df['PE'].unique()
    if len(uniques) > 1:
        logging.error(f"{RED}Your input directory has both Paired and single files, please check directory.{NC}")
        exit(1)
    else:
        return uniques

def check_R(df):
    """checks all files have same naming patterns from pandas df, to use in generete sample table function"""
    uniques = df['read_num'].unique()
    if len(uniques) > 1:
        logging.error(f"{RED}Your input directory has multible fastq file naming patterns, please check directory.{NC}")
        exit(1)
    else:
        return uniques

def check_pattern(df):
    """checks all files have same naming patterns from pandas df, to use in generete sample table function"""
    uniques = df['matched_pattern'].unique()
    if len(uniques) > 1:
        logging.error(f"{RED}Your input directory has multible fastq file naming patterns, please check directory.{NC}")
        exit(1)
    else:
        return uniques

In [None]:
def recogize_pattern(file_name):
    # Matches the file name to a pattern
    # naming patterns for sample recognition
    patterns = {
        "illumina": "(((.+)_(S\d+)_(L00\d))_(R1|R2|r1|r2|read1|read2)_(00\d)\.(fastq\.gz|fastq|fq\.gz|fq))",
        "SRR": "(((SRR)(\d+))(_|\.)(1|2|R1|R2|r1|r2|read1|read2)\.(fastq\.gz|fastq|fq\.gz|fq))",
        "general": "(((.+))(_|\.)(1|2|R1|R2|r1|r2|read1|read2)\.(fastq\.gz|fastq|fq\.gz|fq))"
    }
    matched_pattern = None
    for ptrn_name, pattern in patterns.items():
        try:
            matched = re.match(pattern, file_name) 
        except:
            continue
        
        if bool(matched) :
            matched_pattern = ptrn_name
            break
            
        else:
            continue

    # Extracts sample information

    if matched_pattern == "illumina":
        file_name, sample_name, sample_id, read_num, lane, tail, ext = matched.groups()[0], matched.groups()[1], matched.groups()[2], matched.groups()[5], matched.groups()[4], matched.groups()[6], matched.groups()[7]

    elif matched_pattern == "SRR":
        file_name, sample_name, sample_id, read_num, lane, tail, ext = matched.groups()[0], matched.groups()[1], matched.groups()[3], matched.groups()[5], "", "", matched.groups()[6]

    elif matched_pattern == "general":
        file_name, sample_name, sample_id, read_num, lane, tail, ext = matched.groups()[0], matched.groups()[1], matched.groups()[1], matched.groups()[4], "", "", matched.groups()[5]

    else:
        file_name = sample_name = sample_id = read_num = lane = tail = ext = None

    # Returns a dictionary of sample information
    return {
        "file_name": file_name,
        "sample_name": sample_name,
        "sample_id": sample_id,
        "read_num": read_num,
        "lane": lane,
        "tail": tail,
        "ext": ext,
        "matched_pattern": ptrn_name
    }


In [34]:
def parse_samples(inpath):
    path = os.path.abspath(inpath)
    all_files = os.listdir(path)
    samples = defaultdict(dict)

    for file_name in all_files:
        if os.path.isfile(path + "/" + file_name) and ("fastq" in file_name or "fq" in file_name):
            # Captures the file path and name
            filename, file_extension = os.path.splitext(file_name)
            if "fastq" in filename or "fq" in filename:
                filename, new_ext = os.path.splitext(filename)
                file_extension = new_ext + file_extension
            sample_info = recogize_pattern(file_name)
            if "1" in sample_info["read_num"]:
                read_2 = sample_info["read_num"].replace("1","2")
                if sample_info["matched_pattern"] == "illumina":
                    read_1 = f"{sample_info['read_num']}_{sample_info['tail']}.f"
                    read_2 = f"{read_2}_{sample_info['tail']}.f"
                else:
                    read_1 = f"{sample_info['read_num']}.f"
                    read_2 = f"{read_2}.f"

                f2 = file_name.replace(read_1, read_2)
                if f2 in all_files:
                    sample_info["file2"] = f2
                    sample_info["PE"] = True
                    samples[sample_info["sample_id"]] = sample_info

                else:
                    sample_info["file2"] = ""
                    sample_info["PE"] = False
                    print(f"{RED}GUAP doesn't support single ended analysis at the moment{NC}")
                    exit(1)

    m_samples = samples
    samples= pd.DataFrame(samples).T
    samples = samples.sort_values(by=['sample_id'])

    return samples

In [35]:
samples = parse_samples("/home/abdelrahman/Desktop/RNAseq/data/samples")

In [None]:
pattern = str(check_pattern(samples)[0])
ext = str(check_extension(samples)[0])
PE = bool(check_PE(samples))
R = str(check_R(samples)[0])
compressed = False
EXT = ext


# to perform gunzipping 
if ".gz" in ext:
    compressed = True
    EXT = ext.replace(".gz","")



# check if analysis run before and created sample table 
if os.path.exists(outpath+"/"+"samples.tsv"):
    logging.warning(f"\033[;33;1mFound an exsiting sample.tsv file in output directory, will not override.\033[;39;m")
else:
    samples.to_csv(outpath+"/"+"samples.tsv",sep='\t')    

# create config file 
with open('config.yaml', 'w') as yaml_file:
    yaml.safe_dump(vars(args), yaml_file, default_flow_style=False, sort_keys=False)

with open('config.yaml', 'a') as yaml_file:
    yaml_file.writelines(f"path: {path}\n")
    yaml_file.writelines(f"working_dir: {outpath}\n")
    yaml_file.writelines(f"ext: {ext}\n")
    yaml_file.writelines(f"tail: {tail}\n")
    yaml_file.writelines(f"R: {R}\n")
    yaml_file.writelines(f"R1_pattern: _{R}1{tail}{EXT}\n")
    yaml_file.writelines(f"R2_pattern: _{R}2{tail}{EXT}\n")
    yaml_file.writelines(f"compressed: {compressed}\n")
    yaml_file.writelines(f"total_mem: {all_mem}\n")
    yaml_file.writelines(f"GUAP_DIR: {GUAP_DIR}")



In [None]:
import logging
from .globals import *

class Logger:
    def __init__(self, verbose=False):
        self.formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

        # define console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(self.formatter)
        self.logger = logging.getLogger().setLevel(logging.DEBUG)

        # add the file handler to the logger
        self.logger.addHandler(console_handler)
        # set the logging level based on verbose argument
        if verbose:
            self.logger.setLevel(logging.INFO)
        

        # define logging methods with color codes
        self.prnt_info = lambda str: self.logger.info(f"{GRY}{str}{NC}")
        self.prnt_warning = lambda str: self.logger.warning(f"{YEL}{str}{NC}")
        self.prnt_error = lambda str: self.logger.error(f"{RED}{str}{NC}")

    def add_file_handler(self, file_dir):
        # create a file handler
        file_handler = logging.FileHandler(file_dir)
        file_handler.setLevel(logging.INFO)
        # create a formatter and set it on the file handler
        file_handler.setFormatter(self.formatter)

        # add the file handler to the logger
        self.logger.addHandler(file_handler)