In [5]:
#!/usr/bin/env python3

"""pyllelic: a tool for detection of allelic-specific variation
   in reduced representation bisulfate DNA sequencing.
"""

import os
import signal
import sys
from multiprocessing import Pool, cpu_count
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union, NamedTuple

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import pysam
from Bio import pairwise2
from scipy import stats
from tqdm.notebook import tqdm

# from . import quma
# from .config import Config

# import logging

# logging.basicConfig(filename="pyllelic_test.log", level=logging.DEBUG)

# # Initialize shared configuration object
# config = Config()

# # Initialized multiprocessing limits
# NUM_THREADS = cpu_count() - 1


# def set_up_env_variables(
#     base_path: str,
#     prom_file: str,
#     prom_start: str,
#     prom_end: str,
#     chrom: str,
#     offset: int,
# ) -> None:
#     """Helper method to set up all our environmental variables, such as for testing.

#     Args:
#         base_path (str): directory where all processing will occur, put .bam files
#                          in "test" sub-directory in this folder
#         prom_file (str): filename of genmic sequence of promoter region of interest
#         prom_start (str): start position to analyze in promoter region
#         prom_end (str): final position to analyze in promoter region
#         chrom (str): chromosome promoter is located on
#         offset (int): genomic position of promoter to offset reads
#     """

#     config.base_directory = Path(base_path)
#     config.promoter_file = Path(base_path) / prom_file
#     config.results_directory = Path(base_path) / "results"
#     config.bam_directory = Path(base_path) / "bam_output"
#     config.analysis_directory = Path(base_path) / "test"
#     config.promoter_start = prom_start
#     config.promoter_end = prom_end
#     config.chromosome = chrom
#     config.offset = offset


##################################################################################
##################################################################################

import config
def main() -> None:
    """Run a given set of Pyllelic analysis, using values from
    supplied environmental variables.
    """

    if sys.argv[1]:
        filename: str = sys.argv[1]
    else:  # pragma: no cover
        filename = "output.xlsx"

    files_set: List[str] = make_list_of_bam_files()
    positions: List[str] = index_and_fetch(files_set)
    genome_parsing()
    cell_types: List[str] = extract_cell_types(files_set)
    df_list: Dict[str, pd.DataFrame] = run_quma_and_compile_list_of_df(
        cell_types, filename
    )
    means_df: pd.DataFrame = process_means(df_list, positions, cell_types)
    modes_df: pd.DataFrame = process_modes(df_list, positions, cell_types)
    diffs_df: pd.DataFrame = find_diffs(means_df, modes_df)
    write_means_modes_diffs(means_df, modes_df, diffs_df, filename)


##################################################################################
##################################################################################


def genome_range(
    position: str, genome_string: str, offset: Optional[int] = None
) -> str:
    """Helper to return a genome string (e.g., "ATCGACTAG")
    given a position and an entire string.

    Args:
        position: genomic position on chromesome
        genome_string: string representation of genomic promoter known sequence
        offset: genomic position of promoter to offset reads

    Returns:
        str: genomic bases for indicated read / position
    """

    # OFFSET: int = 1298163  # TERT offset

    if not offset:
        offset = config.offset

    start: int = offset - (int(position) + 30)
    end: int = offset - (int(position) + 1)

    return genome_string[start:end]


def make_list_of_bam_files() -> List[str]:
    """Check analysis directory for all valid .bam files.

    Returns:
        list[str]: list of files
    """
    return [f.name for f in config.analysis_directory.iterdir() if f.suffix == ".bam"]


def index_and_fetch(files_set: List[str], process: bool = True) -> List[str]:
    """Wrapper to call processing of each sam file.

    Args:
        files_set (list[str]): list of bam/sam files
        process (bool): process and write files flag. Default True.

    Returns:
        list[str]: list of genomic positions analyzed
    """

    sam_path: List[Path] = [config.base_directory / "test" / f for f in files_set]

    all_pos: Set = set()
    for sams in tqdm(sam_path):
        pos: pd.Index = run_sam_and_extract_df(sams, process)
        all_pos.update(pos)

    return sorted(list(all_pos))


def run_sam_and_extract_df(sams: Path, process: bool = True) -> pd.Index:
    """Process samfiles, pulling out sequence and position data
    and writing to folders/files.

    Args:
        sams (Path): path to a samfile
        process (bool): process and write files flag. Default True.

    Returns:
        pd.Index: list of unique positions in the samfile
    """

    # Index samfile if index file doesn't exist
    if not sams.with_suffix(".bai").exists():
        _: bool = pysam_index(sams)  # we don't care what the output is

    # Grab the promoter region of interest
    samm: pysam.AlignmentFile = pysam.AlignmentFile(sams, "rb")
    itern = samm.fetch(
        config.chromosome, int(config.promoter_start), int(config.promoter_end)
    )

    position: List = []
    sequence: List = []

    for x in itern:
        cols = str(x).split()
        position.append(cols[3])
        sequence.append(cols[9])

    df: pd.DataFrame = pd.DataFrame(
        list(zip(position, sequence)), columns=["positions", "sequence"]
    )

    df2: pd.DataFrame = df.set_index("positions")
    # will set the inital index (on the leftmost column) to be position
    df3: pd.DataFrame = df2.stack()
    # if confused, see: https://www.w3resource.com/pandas/dataframe/dataframe-stack.php

    if process:
        write_bam_output_files(sams, df2.index.unique(), df3)

    return df2.index.unique()


def write_bam_output_files(sams: Path, positions: List[str], df: pd.DataFrame) -> None:
    """Extract alignments from sequencing reads and output text files
       in bam directory.

    Args:
        sams (Path): path to a sam file
        positions (List[str]): list of unique positions
        df (pd.DataFrame): dataframe of sequencing reads
    """

    sam_name: str = sams.name

    # Make sure bam_output directory and sam subdirectories exist
    config.base_directory.joinpath("bam_output", sam_name).mkdir(
        parents=True, exist_ok=True
    )
    for each1 in positions:
        alignments: List = []

        # # Set up query using alignment algorithm
        # query_sequence: List[str] = df.loc[each1].head(1).tolist()[0]
        # query: StripedSmithWaterman = StripedSmithWaterman(query_sequence)

        # # Set up sequences to check for alignment
        # target_sequences: List[str] = df.loc[each1].tolist()
        # for target_sequence in target_sequences:
        #     alignment = query(target_sequence)
        #     alignments.append(alignment)

        # read_file: List[str] = []
        # for index, each in enumerate(alignments):
        #     read_file.append(str(">read" + str(index)))
        #     read_file.append(each.aligned_target_sequence)
        #     # returns aligned target sequence

        # Set up query using alignment algorithm
        query_sequence: List[str] = df.loc[each1].head(1).tolist()[0]

        # Set up sequences to check for alignment
        target_sequences: List[str] = df.loc[each1].tolist()
        for target_sequence in target_sequences:
            alignment = pairwise2.align.localms(
                query_sequence, target_sequence, 2, -3, -5, -2
            )
            aligned_segment = alignment[0].seqA[alignment[0].start : alignment[0].end]
            alignments.append(aligned_segment)

        read_file: List[str] = []
        for index, each in enumerate(alignments):
            read_file.append(str(">read" + str(index)))
            read_file.append(each)
            # returns aligned target sequence

        write_individual_bam_file(sam_name, each1, read_file)
        print(read_file)

ModuleNotFoundError: No module named 'config'