# Taxonomy tables 

Make a table that has rows as taxa, columns as samples, and values as abundance. \
Because the taxonomic classification is hierarcical we need to sum abundance from the rank and all lower rank. \
Let's say we had this: 


| Order     | Family      | Genus     | Species     | Abundance| 
| ----------| ------------|-----------| ------------|----------|   
| Bryales   | Bryaceae    | Bryum     | capillare   | 10       |
| Bryales   | Bryaceae    | Bryum     |             | 23       |
| Bryales   | Bryaceae    |           |             | 45       |
| Bryales   |             |           |             | 123      |  

So the abundances we want to record for each rank are: \
Order   Bryales   123+45+23+10 \
Family  Bryaceae  45+23+10 \
Genus   Bryum     23+10 \
Species Bryum     10

Do this for all named taxa at all ranks

Also normalise data within each Superkingdom
Normalisation - maximum absolute scaling. The maximum absolute scaling rescales each feature between -1 and 1 by dividing every observation by its maximum absolute value. We can apply the maximum absolute scaling in Pandas using the .max() and .abs() methods, as shown below.
[link](https://www.geeksforgeeks.org/data-normalization-with-pandas)

### Load the tables

In [1]:
import os
import io
import math
import duckdb
import numpy
from duckdb import CatalogException, BinderException
import pandas as pd
from pandasql import sqldf
from minio import Minio, S3Error
from pathlib import PurePath, Path

creds = json.load(open("credentials.json"))

client = Minio("10.4.1.4:9000",
               secure=False,
               access_key=creds["accessKey"],
               secret_key=creds["secretKey"],
               )


def get_object(bucket_name, file_format, file_name, verbose=True):
    if verbose:
        print(f"{bucket_name=} - {file_format=} - {file_name=}")
    try:
        response = client.get_object(bucket_name, file_name)
        buffer = io.BytesIO(response.read())
    except S3Error:
        raise
    finally:
        if file_format == "parquet":
            df = pd.read_parquet(buffer, engine='pyarrow')
        elif file_format == "csv":
            df = pd.read_csv(buffer)
        else:
            raise ValueError(f"Unknown {file_format=}")
        response.close()
        response.release_conn()
        if verbose:
            print(f"Downloaded {file_name} into dataframe")
    return df

In [13]:
TABLE_VERSION = "v2"

# MGF parquet tables to dataframes
bucket_name = "emo-bon-tables"
objects = client.list_objects(bucket_name, recursive=True)
mgf_parquet_dfs = {}

for obj in objects:
    # print(f"{obj.object_name=}")
    # Get v2 objects
    if obj.object_name.split("/")[0] == TABLE_VERSION:
        name = obj.object_name.split(".")[-2]
        # print(f"Getting... {name}")
        df = get_object(bucket_name, "parquet", obj.object_name, verbose=False)
        mgf_parquet_dfs[name] = df

# Sample metadata
# Get the latest Batch combined logsheets file
# Remember we are downloading from MinIO
# This breaks when new versions are available
batch_file = "Batch1and2_combined_logsheets_2024-09-11.csv"
sample_metadata = ("emo-bon-metadata-tables", "csv", batch_file)
sample_metadata = get_object(*sample_metadata, verbose=False)

# Observatory metadata - from the GoogleSheets
observatory_metadata = ("emo-bon-metadata-tables", "csv",
                        "Observatory_combined_logsheets_validated.csv")
observatory_metadata = get_object(*observatory_metadata, verbose=False)

# Into duckdb
try:
    duckdb.sql("DROP TABLE SAMPLE_METADATA")
    duckdb.sql("DROP TABLE OBS_METADATA")
    for table_name in mgf_parquet_dfs:
        cmd = f"DROP TABLE {table_name}"
        duckdb.sql(cmd)
except CatalogException:
    pass
duckdb.sql("CREATE TABLE SAMPLE_METADATA AS SELECT * FROM sample_metadata")
duckdb.sql("SELECT COUNT(*) FROM SAMPLE_METADATA")
duckdb.sql("CREATE TABLE OBS_METADATA AS SELECT * FROM observatory_metadata")
duckdb.sql("SELECT COUNT(*) FROM OBS_METADATA")
for table_name in mgf_parquet_dfs:
    df = mgf_parquet_dfs[table_name]
    cmd = f"CREATE TABLE {table_name} AS SELECT * FROM df"
    duckdb.sql(cmd)

duckdb.sql("SHOW TABLES")
# duckdb.sql("COPY LSU TO 'LSU_output.csv' (HEADER, DELIMITER ',');")

┌─────────────────┐
│      name       │
│     varchar     │
├─────────────────┤
│ LSU             │
│ OBS_METADATA    │
│ SAMPLE_METADATA │
│ SSU             │
│ go              │
│ go_slim         │
│ ips             │
│ ko              │
│ pfam            │
└─────────────────┘

### Create tables by superkingdom

In [None]:
import duckdb
import pandas as pd

sk_dfs = {}
for sk in ["Eukaryota", "Bacteria", "Archaea"]:
    query = f"""
            SELECT
            *
            FROM LSU
            WHERE LSU.superkingdom = '{sk}'
            """
    # duckdb.sql(query).show(max_width=12, max_rows=4)
    df = duckdb.sql(query).to_df()
    sk_dfs[sk] = df

for name, df in sk_dfs.items():
    df.drop('superkingdom', axis=1, inplace=True)
    duckdb.sql(f"CREATE TABLE LSU_{name} AS SELECT * FROM df")

In [16]:
duckdb.sql("SHOW TABLES")

┌─────────────────┐
│      name       │
│     varchar     │
├─────────────────┤
│ LSU             │
│ LSU_Archaea     │
│ LSU_Bacteria    │
│ LSU_Eukaryota   │
│ OBS_METADATA    │
│ SAMPLE_METADATA │
│ SSU             │
│ go              │
│ go_slim         │
│ ips             │
│ ko              │
│ pfam            │
├─────────────────┤
│     12 rows     │
└─────────────────┘

In [20]:
query = """
SELECT class, sum(abundance) FROM LSU_Bacteria
WHERE class = 'Thermoanaerobaculia' 
GROUP BY class
"""
duckdb.sql(query).show(max_width=250, max_rows=117)

┌─────────────────────┬────────────────┐
│        class        │ sum(abundance) │
│       varchar       │     double     │
├─────────────────────┼────────────────┤
│ Thermoanaerobaculia │         8445.0 │
└─────────────────────┴────────────────┘



##### Testing SELECT query

In [68]:
rank = "family"

query = f"""
SELECT
ref_code,
{rank},
sum(abundance) AS sum_abundance
FROM LSU_Archaea
GROUP BY {rank}, ref_code
ORDER BY {rank}, sum(abundance) DESC
"""
duckdb.sql(query).show(max_width=250, max_rows=117)
# r = duckdb.sql(query).to_df()

┌─────────────┬────────────────────────────────┬───────────────┐
│  ref_code   │             family             │ sum_abundance │
│   varchar   │            varchar             │    double     │
├─────────────┼────────────────────────────────┼───────────────┤
│ EMOBON00121 │ Archaeoglobaceae               │           2.0 │
│ EMOBON00138 │ Archaeoglobaceae               │           1.0 │
│ EMOBON00136 │ Candidatus_Methanoperedenaceae │           2.0 │
│ EMOBON00237 │ Candidatus_Methanoperedenaceae │           1.0 │
│ EMOBON00226 │ Candidatus_Methanoperedenaceae │           1.0 │
│ EMOBON00238 │ Candidatus_Methanoperedenaceae │           1.0 │
│ EMOBON00138 │ Candidatus_Methanoperedenaceae │           1.0 │
│ EMOBON00142 │ Cenarchaeaceae                 │          11.0 │
│ EMOBON00238 │ Cenarchaeaceae                 │           8.0 │
│ EMOBON00094 │ Cenarchaeaceae                 │           7.0 │
│ EMOBON00195 │ Cenarchaeaceae                 │           5.0 │
│ EMOBON00243 │ Cenarchae

### Sum abundances per taxon in LSU per superkingdom, pivot, normalise, and take sqrt()

In [69]:
# Need to load imports and LSU table above

def create_sqrt_normalised_table_of_rank_abundances(table_name,  write_tables=True, to_duckdb=False):
    """ Create tables of sqrt(normalised(summed abundance)) for each taxon of each rank across all samples"""

    # The rank name "order" interfers with ORDER being reserved word in SQL, so we need to change it
    RANKS = ["kingdom", "phylum", "class",
             "orderx", "family", "genus", "species"]

    # Check if column 'order' exists prior to changing, ie on first read
    try:
        duckdb.sql(f"ALTER TABLE {table_name} RENAME COLUMN 'order' TO orderx")
    except BinderException as e:
        # Exception occurs when alread
        assert str(
            e) == f'Binder Error: Table "{table_name}" does not have a column with name "order"'
        pass

    dfs = {}
    for rank in RANKS:
        # if rank == "phylum" and table_name in ["LSU_Archaea", "LSU_Bacteria"]:
        #    print(f"Skipping {rank} for {table_name}")
        #    continue
        # print(f"{rank=}")
        QUERY = f"""
        PIVOT (
        SELECT
        ref_code,
        {rank},
        sum(abundance) AS sum_abundance
        FROM {table_name}
        GROUP BY {rank}, ref_code
        ORDER BY {rank}, sum(abundance) DESC   
        )
        ON ref_code
        USING sum(sum_abundance)
        ORDER BY {rank}
        """
        duckdb.sql(QUERY)
        df = duckdb.sql(QUERY).to_df()
        df.set_index(f"{rank}")
        dfs[f"{table_name}_{rank}"] = df

    # Normalise and sqrt using loop over dataframe (can this be vectorised?)
    def normalise_and_sqrt(dfs):
        new_dfs = {}
        for table_n, df in dfs.items():
            new_df = df.copy()
            print(f"Normalising and converting data frame to sqrt: {table_n}")
            # in-place normalisation
            for column in new_df.iloc[:, 1:].columns:
                new_df[column] = numpy.sqrt(
                    new_df[column] / new_df[column].abs().max())
            new_dfs[table_n] = new_df
        return new_dfs
    ns_dfs = normalise_and_sqrt(dfs)

    # Write out normalised and sqrt tables
    if write_tables:
        for table_n, df in ns_dfs.items():
            save_dir = Path("transformed_tables")
            outfile_name = f"{table_n}.csv"
            df.to_csv(PurePath(save_dir, outfile_name))
            print(f"Written {PurePath(save_dir, outfile_name)}")

    if to_duckdb:
        for table_n, df in ns_dfs.items():
            duckdb.sql(f"CREATE TABLE {table_n} AS SELECT * FROM df")
            print(f"Created {table_n} in duckdb")


# For each superkingdom table, create a table of sqrt(normalised(summed abundance))
for table_name in ["LSU_Archaea", "LSU_Eukaryota", "LSU_Bacteria"]:
    create_sqrt_normalised_table_of_rank_abundances(
        table_name, write_tables=True, to_duckdb=False)

Normalising and converting data frame to sqrt: LSU_Archaea_kingdom
Normalising and converting data frame to sqrt: LSU_Archaea_phylum
Normalising and converting data frame to sqrt: LSU_Archaea_class
Normalising and converting data frame to sqrt: LSU_Archaea_orderx
Normalising and converting data frame to sqrt: LSU_Archaea_family
Normalising and converting data frame to sqrt: LSU_Archaea_genus
Normalising and converting data frame to sqrt: LSU_Archaea_species
Written transformed_tables/LSU_Archaea_kingdom.csv
Written transformed_tables/LSU_Archaea_phylum.csv
Written transformed_tables/LSU_Archaea_class.csv
Written transformed_tables/LSU_Archaea_orderx.csv
Written transformed_tables/LSU_Archaea_family.csv
Written transformed_tables/LSU_Archaea_genus.csv
Written transformed_tables/LSU_Archaea_species.csv
Normalising and converting data frame to sqrt: LSU_Eukaryota_kingdom
Normalising and converting data frame to sqrt: LSU_Eukaryota_phylum
Normalising and converting data frame to sqrt: LSU_