In [None]:
import Pkg

pkgs = [
"DataFrames",
"uCSV",
"ProgressMeter",
"StatsPlots",
"StatsBase",
"FASTX"
]
Pkg.add(pkgs)
for pkg in pkgs
    eval(Meta.parse("import $pkg"))
end

In [None]:
# function parse_blast_report(blast_report)
#     # example header line 
#     # "# Fields: query id, subject id, subject acc., subject acc.ver, subject title, query length, subject length, q. start, q. end, s. start, s. end, evalue, bit score, score, alignment length, % identity, identical, mismatches, subject tax id"
#     header_lines = collect(Iterators.filter(x -> occursin(r"# Fields:", x), eachline(blast_report)))
#     if isempty(header_lines)
#         @info "no hits found, returning empty table"
#         return DataFrames.DataFrame()
#     end
#     header_line = first(header_lines)
#     header = split(last(split(header_line, ": ")), ", ")
#     blast_col_types = Dict(
#         "query id" => String,
#         "query title" => String,
#         "subject id" => String,
#         "subject gi" => String,
#         "subject acc." => String,
#         "subject acc.ver" => String,
#         "subject title" => String,
#         "query length" => Int,
#         "subject length" => Int,
#         "q. start" => Int,
#         "q. end" => Int,
#         "s. start" => Int,
#         "s. end" => Int,
#         "evalue" => Float64,
#         "bit score" => Float64,
#         "score" => Float64,
#         "alignment length" => Int,
#         "% identity" => Float64,
#         "identical" => Int,
#         "mismatches" => Int,
#         "subject tax id" => Int,
#         "subject sci name" => String,
#         "subject com names" => String,
#         "subject blast name" => String,
#         "subject super kingdom" => String,
#         "subject tax ids" => String,
#         "subject sci names" => String,
#         "subject com names" => String,
#         "subject blast names" => String,
#         "subject super kingdoms" => String,
#         "subject title" => String,
#         "subject titles" => String
#     )
#     data, _ = uCSV.read(
#         blast_report,
#         delim='\t',
#         comment='#',
#         # skipmalformed=true,
#         allowmissing=true,
#         encodings=Dict("N/A" => missing),
#         types=[blast_col_types[h] for h in header])
#     return DataFrames.DataFrame(data, header, makeunique=true)
# end

In [None]:
viral_tax_ids = Set(parse.(Int, filter(!isempty, readlines(`conda run -n taxonkit taxonkit list --ids 10239 --indent ""`))))

In [None]:
basedir = "/labs/mpsnyder/share/exposome_data"

In [None]:
megablast_results = String[]
blastn_results = String[]
for batch_directory in [
    joinpath(basedir, "extracted1"),
    joinpath(basedir, "Expo_pliot1_extracted_hifi_fastqs")
]
    for x in filter(x -> occursin(r"\.nt\.megablast\.txt$", x), readdir(batch_directory, join=true))
        push!(megablast_results, x)
    end
    for x in filter(x -> occursin(r"\.nt\.blastn\.txt$", x), readdir(batch_directory, join=true))
        push!(blastn_results, x)
    end
end

In [None]:
joint_blast_results_table = DataFrames.DataFrame()

In [None]:
    blast_col_types = Dict(
        "query id" => String,
        "query title" => String,
        "subject id" => String,
        "subject gi" => String,
        "subject acc." => String,
        "subject acc.ver" => String,
        "subject title" => String,
        "query length" => Int,
        "subject length" => Int,
        "q. start" => Int,
        "q. end" => Int,
        "s. start" => Int,
        "s. end" => Int,
        "evalue" => Float64,
        "bit score" => Float64,
        "score" => Float64,
        "alignment length" => Int,
        "% identity" => Float64,
        "identical" => Int,
        "mismatches" => Int,
        "subject tax id" => Int,
        "subject sci name" => String,
        "subject com names" => String,
        "subject blast name" => String,
        "subject super kingdom" => String,
        "subject tax ids" => String,
        "subject sci names" => String,
        "subject com names" => String,
        "subject blast names" => String,
        "subject super kingdoms" => String,
        "subject title" => String,
        "subject titles" => String
    )

In [None]:
ProgressMeter.@showprogress for blast_report in megablast_results
    header_line = first(Iterators.filter(x -> occursin(r"# Fields:", x), eachline(blast_report)))
    header = split(last(split(header_line, ": ")), ", ")
    viral_lines = String[]
    for line in eachline(blast_report)
        if !occursin("#", line)
            subject_tax_ids = map(x -> parse(Int, x), split(last(split(line, '\t')), ';'))
            if any(x -> x in viral_tax_ids, subject_tax_ids)
                push!(viral_lines, line)
            end
        end
    end
    data, _ = uCSV.read(IOBuffer(join(viral_lines, '\n')), delim='\t')
    results_table = DataFrames.DataFrame(data, header)
    results_table[!, "method"] .= "megablast"
    results_table[!, "sample"] .= replace(basename(blast_report), ".bam.fasta.blastn.nt.megablast.txt" => "")
    append!(joint_blast_results_table, results_table, promote=true)
end

In [None]:
ProgressMeter.@showprogress for blast_report in blastn_results
    try
        header_line = first(Iterators.filter(x -> occursin(r"# Fields:", x), eachline(blast_report)))
        header = split(last(split(header_line, ": ")), ", ")
        viral_lines = String[]
        for line in eachline(blast_report)
            if !occursin("#", line)
                subject_tax_ids = map(x -> parse(Int, x), split(last(split(line, '\t')), ';'))
                if any(x -> x in viral_tax_ids, subject_tax_ids)
                    push!(viral_lines, line)
                end
            end
        end
        data, _ = uCSV.read(IOBuffer(join(viral_lines, '\n')), delim='\t', types=[blast_col_types[h] for h in header])
        results_table = DataFrames.DataFrame(data, header)
        results_table[!, "method"] .= "blastn"
        results_table[!, "sample"] .= replace(basename(blast_report), ".bam.fasta.blastn.nt.blastn.txt" => "")
        append!(joint_blast_results_table, results_table, promote=true)
    catch
        println("skipping $(blast_report)")
    end
end

In [None]:
StatsBase.describe(joint_blast_results_table[!, "alignment length"])

In [None]:
# StatsPlots.histogram(joint_blast_results_table[!, "alignment length"])

In [None]:
sort!(joint_blast_results_table, "alignment length", rev=true)

In [None]:
joint_blast_results_table_filtered = joint_blast_results_table[joint_blast_results_table[!, "alignment length"] .> 1000, :]

In [None]:
sequence_identifiers_of_interest = Set(unique(joint_blast_results_table_filtered[!, "query id"]))

In [None]:
fasta_sequences = []
ProgressMeter.@showprogress for fasta_file in replace.(megablast_results, ".blastn.nt.megablast.txt" => "")
    for record in FASTX.FASTA.Reader(open(fasta_file))
        if FASTX.identifier(record) in sequence_identifiers_of_interest
            push!(fasta_sequences, record)
        end
    end
end
fasta_sequences

In [None]:
# i = 1 labrador retriever
# i = 2 unknown
# i = 3 bacteriophage
# i = 4 bacteriophage
# i = 5 dog
# i = 6 dog
# i = 7 dog
# i = 8 dog
# i = 9 dog
# i = 10 dog
# i = 11 e. coli or e. coli phage
# i = 12 e. coli or e. coli phage
# i = 13 e. coli or e. coli phage
# i = 14  e. coli or e. coli phage
# i = 15  e. coli or e. coli phage
# i = 16  e. coli or e. coli phage
record = fasta_sequences[i]
println(record)

In [None]:
# for x in sort(unique(joint_blast_results_table_filtered[!, "subject title"]))
#     println(x)
# end