In [None]:
import os 
print(os.getcwd())

In [None]:
import sys
from utils.pipeline import Context, Step
from utils.pipeline_lib import *
from tqdm import tqdm
from sqlalchemy import create_engine
from math import floor

# generated file paths
saved_context_ct_path = "data/ss/ss_v10_Ctx"
export_seq_id_path = "data/ss/"

# RSCU, dinucleotides and other attributes

In [None]:
ct = Context()
ct_path = saved_context_ct_path

# DATA
class Data(Step):
    def run(self):
        return {
            'output': pd.read_csv(...),         # replace with your data
            'metadata_feature_names': [...],    # the list of metadata column names from your file
            'data_feature_names': ["CDS"]       
        }
S_global_data = Data(name_alias="H1N1")
S_global_data_1 = Input.AssignHostType(depends_on=S_global_data)
S_global_data_2 = Input.RSCU(depends_on=S_global_data_1, ctx=ct)
S_transformation_1 = DataTransform.LogBySynCount(depends_on=S_global_data)
S_transformation_2 = DataTransform.PlainAndLogDinucleotide(depends_on=S_transformation_1, ctx=ct)

S_partition_data = WindowSelector(date_range=DateRange.from_iso_weeks('2008-01', '2010-10'), start="2008-01", end="2010-10", depends_on=S_transformation_2, name_alias="NarrowGlobalData", ctx=ct)
S_ordered_data = AbsoluteSortByCollectionDateAndIndex(depends_on=S_partition_data, ctx=ct)
S_global_geolocated_sorted_bmc_annotated = InputH1N1.AttachBMC_GenomicsCluster(depends_on=S_ordered_data)
result_input_data = S_global_geolocated_sorted_bmc_annotated.materialize()
# Add Week name
result_input_data.output['Week'] = result_input_data.output['Collection_Date'].dt.strftime("%G-%V")

# Assert the DF is still sorted by Sort_Key
pd.testing.assert_series_equal(result_input_data.output.Sort_Key, result_input_data.output.Sort_Key.sort_values())

# del ct['GeoapifyBatchJobRequest']
ct.store(ct_path)
print(result_input_data.output.shape)
print(S_ordered_data.materialize().output.shape)

# Data Filtering

In [89]:
result_input_data.output[['bmc_cluster_label']].value_counts(dropna=False).sort_index(level=[0])

bmc_cluster_label
0                    2642
1                     663
3                      68
4                       3
Name: count, dtype: int64

In [90]:
swine_df = result_input_data.output[result_input_data.output.Host_Type == "swine"]
human_df = result_input_data.output[result_input_data.output.Host_Type == "human"]

# final partitions
human_bmc_cluster_1 = human_df[(human_df.bmc_cluster_label == 1)]
human_bmc_cluster_0 = human_df[(human_df.bmc_cluster_label == 0)]

In [92]:
# select time period, sorting and renaming
normal_df = human_bmc_cluster_1[
        (human_bmc_cluster_1.Collection_Date >= pd.to_datetime("2009-01-25"))          # first sequences in late april-may
        & (human_bmc_cluster_1.Collection_Date <= pd.to_datetime("2009-03-14"))
    ].sort_values(by="Collection_Date", ascending=True)
outlier_df = human_bmc_cluster_0[
        (human_bmc_cluster_0.Collection_Date >= pd.to_datetime("2008-07-28")) 
        & (human_bmc_cluster_0.Collection_Date <= pd.to_datetime("2010-01-10"))        # last sequences in studied period
        & ~(pd.isna(human_bmc_cluster_0.Lineage))
    ].sort_values(by="Collection_Date", ascending=True).iloc[-100:,:]

print("Normal sequences", normal_df.shape[0])
print("Outlier sequences", outlier_df.shape[0])
print("Total", normal_df.shape[0] + outlier_df.shape[0])

Normal sequences 168
Outlier sequences 100
Total 268


In [97]:
normal_df['true_outlier'] = False
outlier_df['true_outlier'] = True
final_data = pd.concat([normal_df, outlier_df])

Collection Date range for 
- 100 normal sequences
- 100 outlier sequences
- additional 30 normal sequences

In [98]:
def print_range(dataset_df):
    d = pd.to_datetime(dataset_df.Collection_Date.describe()[['min', 'max']])
    print(" ".join(
        d.dt.strftime("%Y/%m/%d").tolist() + [str((d['max'] - d['min']).days), "days"]
    ))
print("Collection_Date range of 100 G1 normal sequences")
print_range(normal_df.iloc[:100])
print("Collection_Date range of 100 G2 outlier sequences")
print_range(outlier_df.iloc[:100])
print("Collection_Date range of 30 G1 noise/normal sequences")
print_range(normal_df.iloc[100:130])

Collection_Date range of 100 G1 normal sequences
2009/01/26 2009/02/20 25 days
Collection_Date range of 100 G2 outlier sequences
2009/12/14 2010/01/10 27 days
Collection_Date range of 30 G1 noise/normal sequences
2009/02/20 2009/03/01 9 days


Export of sequence IDs

In [None]:
def generate_dataset_id(df):
    return df[['Isolate_Id', 'Isolate_Name']]

generate_dataset_id(normal_df.iloc[:100]).to_csv(f"{export_seq_id_path}/g1_dataset.csv", index=False)
generate_dataset_id(outlier_df.iloc[:100]).to_csv(f"{export_seq_id_path}/g2_dataset.csv", index=False)
generate_dataset_id(normal_df.iloc[100:130]).to_csv(f"{export_seq_id_path}/noise_dataset.csv", index=False)

# Helper data stucture

In [None]:
try:
    del ct['Data']
except KeyError:
    pass
S_data = Step.stepify({
    'output': final_data, 
    'data_feature_names': result_input_data.data_feature_names, 
    'metadata_feature_names': result_input_data.metadata_feature_names}
    , name_alias="Data", ctx=ct)
_ = S_data.materialize()

In [None]:
input_exported_columns = ['Lineage', 'Clade', 'Location', 'Host', 'Collection_Date', 'Submission_Date', 'Host_Type', 'Flu_Season', 'Sort_Key', 'bmc_cluster_label', 'Week', 'true_outlier']

# Measure Warnings based on Window, K, Noise

In [None]:
print("Save base context in", ct_path)
ct.store(ct_path)

In [None]:
class StrayMixedNormalOutlierWindows(Step):
    """
    Train and test in this context are interpreted as follows:
    Fixed size partitions may need data points that are not included in the given input_data (i.e., data points preceeding the first one). Such partitions are ignored (the output range_names etc. do not contain such partitions).
    """
    def __init__(self, train_size=99, test_size=1, window_shift=1, n_wrong_outliers=0, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.train_size = train_size
        self.test_size = test_size
        self.window_size = train_size + test_size
        self.window_shift = window_shift
        self.n_wrong_outlier = n_wrong_outliers

    def run(self, input_data: ResultType) -> ResultType:
        data = input_data.output
        data_len = data.shape[0]
        assert not data.empty, "Input DataFrame is empty. Cannot extract any partition."
        assert data['true_outlier'].sum() > 0, "Input DataFrame do not contain any outlier. Cannot extract any partition."
        assert (~data['true_outlier']).sum() > 0, "Input DataFrame do not contain any non-outlier data. Cannot extract any partition."
    
        normal_data = data[~data.true_outlier].iloc[:self.window_size,:]
        outlier_data = data[data.true_outlier].iloc[-self.window_size:,:,]#.iloc[::-1]
        
        rand_samples_idx = np.array([])
        if self.n_wrong_outlier > 0:
            outlier_data = outlier_data.reset_index()
            rand_samples_idx  = np.random.choice(int(floor(outlier_data.shape[0])), size=self.n_wrong_outlier, replace=False)
            outlier_data = outlier_data.drop(index=rand_samples_idx)
            wrong_outliers = data[~data.true_outlier].reset_index().iloc[100:100+self.n_wrong_outlier,:].set_index(pd.Index(rand_samples_idx))
            outlier_data = pd.concat([outlier_data, wrong_outliers]).sort_index().set_index("index")
            try:
                assert (
                    outlier_data['true_outlier'].sum() == (self.window_size - self.n_wrong_outlier) 
                    and (~outlier_data['true_outlier']).sum() == self.n_wrong_outlier 
                    and outlier_data.shape[0] == self.window_size), "Contaimation gone wrong"
            except AssertionError:
                raise
        idx_stray_window_with_noise = 1 + rand_samples_idx
        # find position of stray window where quantity >= non-outlier quantity
        idx_g1_equal_g2_size = 0
        for w_idx in range(self.window_size):
            g1_wo_noise_size = self.window_size - w_idx
            g1_noise = (idx_stray_window_with_noise <= w_idx).sum()
            g1_w_noise_size = g1_wo_noise_size + g1_noise
            g2_size = self.window_size - g1_w_noise_size
            if g2_size >= g1_w_noise_size:
                idx_g1_equal_g2_size = w_idx
                break
        idx_stray_window_with_noise = np.sort(idx_stray_window_with_noise)
        if len(idx_stray_window_with_noise) > 0:
            last_idx_stray_window_with_noise = idx_stray_window_with_noise[-1].item()
            idx_stray_windows_with_noise_within_half_window = idx_stray_window_with_noise[idx_stray_window_with_noise < idx_g1_equal_g2_size]
            idx_stray_windows_with_noise_within_half_window = idx_stray_windows_with_noise_within_half_window[-1].item() if len(idx_stray_windows_with_noise_within_half_window) > 0 else None
        else:
            last_idx_stray_window_with_noise = None
            idx_stray_windows_with_noise_within_half_window = None

        output_data = pd.concat([normal_data, outlier_data])

        window_start_ranges = np.arange(0, self.window_size+1 , 1)
        window_ranges = np.vstack([window_start_ranges, window_start_ranges+self.window_size]).T
        assert window_ranges.shape[0] == self.window_size+1, "Should otuput one window with all normal data, then one window with one outlier, them with two, ...., finally a window with all outliers"

        iterator = iter(window_ranges)

        def next_range():
            return next(iterator)
        
        return {
            'data': output_data,
            'data_feature_names': input_data.data_feature_names, 
            'metadata_feature_names': input_data.metadata_feature_names,
            'window_ranges': window_ranges,
            '.next_range': next_range, 
            'noise_quantity': len(idx_stray_window_with_noise),
            'noise_positions': idx_stray_window_with_noise.tolist(),
            'last_idx_stray_window_with_noise': last_idx_stray_window_with_noise,
            'idx_g1_equal_g2_size': idx_g1_equal_g2_size,
            # 'injected_seq': ["O" if x else "N" for x in outlier_data.true_outlier],
            'last_idx_stray_window_with_noise_within_half': idx_stray_windows_with_noise_within_half_window
        }
    

def run_windows(S_data, stray_window_size, contaminazione):
    ct = Context.load(ct_path)
    n_wrong_outliers = int(floor(stray_window_size * contaminazione))
    S_partitioner = StrayMixedNormalOutlierWindows(train_size=stray_window_size-1, test_size=1, window_shift=1, n_wrong_outliers=n_wrong_outliers,
                                                        depends_on=S_data, ctx=ct)
    # GET WINDOW NAMES
    result_partitioner = S_partitioner.materialize()
    window_ranges = result_partitioner.window_ranges

    # GET PARTITIONS
    S_train_plus_test_partitions = [NumericIndexBasedPartition(partitioner_function_name='.next_range', 
                                                            range_start=s, range_end=e,
                                                            depends_on=S_partitioner, ctx=ct,
                                                            name_alias=f"WindowRange_{s}_{e}") for s,e in window_ranges]

    return {
        'window_ranges': window_ranges,
        'S_train_plus_test_partitions': S_train_plus_test_partitions,
        'S_partitioner': S_partitioner,
        'noise_quantity': result_partitioner.noise_quantity,
        'noise_positions': result_partitioner.noise_positions,
        'last_idx_stray_window_with_noise': result_partitioner.last_idx_stray_window_with_noise,
        'idx_g1_equal_g2_size': result_partitioner.idx_g1_equal_g2_size,
        'last_idx_stray_window_with_noise_within_half': result_partitioner.last_idx_stray_window_with_noise_within_half
        }
    
    
class AnnotateStrayWindowsInDF2(Step):
    def run(self, partitioner_output, *original_data_enriched: list[pd.DataFrame]):
        df_list = [x.output for x in original_data_enriched]
        shared_index = partitioner_output.data.index.tolist()
        table = np.zeros(( len(shared_index), len(df_list) ), dtype=bool)
        window_size = df_list[0].shape[0]
        for w_num, df in enumerate(df_list):
            assert len(df['outlier']) == window_size, f"Window {w_num} has {len(df['outlier'])} window size instead of {window_size}"
            assert df.index.tolist() == shared_index[w_num:w_num+window_size], f"Window {w_num}: DF index is {df.index} while expected is {shared_index[w_num:w_num+window_size]}"            
            try:
                table[w_num:w_num+window_size,w_num] = df['outlier']
            except:
                raise ValueError("Check window n. ", w_num)

        stray_outliers = pd.DataFrame(table, columns=list(range(len(df_list))), index=shared_index)
        return {
            'output': stray_outliers,
            'data_feature_names': np.arange(len(df_list)).tolist, 
            'metadata_feature_names': []
        }
    
class CountWarnings(Step):
    def __init__(self, k: int, idx_g1_equal_g2_size: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.k = k
        self.idx_g1_equal_g2_size = idx_g1_equal_g2_size

    def run(self, annotated_stray_window_in_df: ResultType[AnnotateStrayWindowsInDF2]):
        stray_window_df = annotated_stray_window_in_df.output
        n_runs = stray_window_df.shape[1]
        self.w = n_runs - 1

        n_warnings = 0
        plw = 0 # position last warning
        n_warnings_half = 0
        plw_half = 0

        for w_num in range(n_runs):
            warnins_in_window = stray_window_df.iloc[w_num:w_num+self.w,w_num]
            is_warning_last_seq = warnins_in_window.iloc[-1]

            if is_warning_last_seq:
                if w_num < self.idx_g1_equal_g2_size:
                    n_warnings_half += 1
                    plw_half = w_num
                n_warnings += 1
                plw = w_num
            
        return {
            'n_warnings_global': n_warnings,
            'plw_global': plw,
            'n_warnings': n_warnings_half,
            'plw': plw_half
        }

def run_stray4k(window_ranges, S_windows, S_partitioner, stray_k, idx_g1_equal_g2_size):
    # STRAY
    S_outlier_detection = [Stray.OutlierDetection(k=stray_k, depends_on=x, name_alias=f"OutlierDetection{s}_{e}") for x,(s,e) in zip(S_windows,window_ranges)]
    S_train_plus_eval_partitions_enriched = [Stray.MapOutliersToOriginalData(depends_on=[s1,s2], 
                                                                            name_alias=f"MapOutliersToOriginalData_{s}_{e}") for s1,s2,(s,e) in zip(S_windows,S_outlier_detection,window_ranges)]
    
    # tabella che indica per ciascuna sequenza (riga) se è un outlier o no nelle diverse run di stray (colonne). Una run per moving window
    S_final_matrix = AnnotateStrayWindowsInDF2(depends_on=[S_partitioner, *S_train_plus_eval_partitions_enriched])
    S_experiment_metrix = CountWarnings(stray_k, idx_g1_equal_g2_size, depends_on=S_final_matrix)
    
    # ##########  RUN
    head = S_experiment_metrix
    result = head.materialize()
    return result

def loop(window_sizes, ks, noise, repetitions):
    global kwargs
    tabular_results = []
    combinazioni = [(w,k,n) for w in window_sizes for k in ks for n in noise if k < w and (n * w) == int(floor(n*w))] * repetitions   
    print(len(combinazioni))
    for window_size,k,n in tqdm(combinazioni):
        kwargs = run_windows(S_data, window_size, n)

        for k in ks:
            if k >= window_size:
                continue
        
            try:
                result_unformatted = run_stray4k(
                    window_ranges=kwargs['window_ranges'], 
                    S_windows=kwargs['S_train_plus_test_partitions'], 
                    S_partitioner=kwargs['S_partitioner'], 
                    stray_k=k, 
                    idx_g1_equal_g2_size=kwargs['idx_g1_equal_g2_size'])
                
            except Exception as e:
                print(f"Error while testing combination: {window_size} {k} {n}")
                raise e
            
            tabular_results.append([
                window_size, k, n, kwargs['noise_quantity'], 
                result_unformatted['plw'], result_unformatted['n_warnings'], result_unformatted['plw_global'], result_unformatted['n_warnings_global'],
                kwargs['last_idx_stray_window_with_noise_within_half'], kwargs['last_idx_stray_window_with_noise'], kwargs['idx_g1_equal_g2_size'], 
                kwargs['noise_positions']
                ])             
            

    tabular_results = pd.DataFrame(tabular_results, columns=["N", "K", "Noise_perc", "Noise", 
                                                             "PLW", "N.Warnings", "PLW_global", "N.Warnings_global", 
                                                             "LNP", "LNP_global", "idx_G1>=G2", "NP"])
    return tabular_results

kwargs = None
tabular_results = loop(window_sizes=(5,10,50,100), ks=(1,3,5,10,15), noise=(0.0, 0.1, 0.2, 0.3), repetitions=10)     # this will repeat 10 times the experiment on window_size, noise  * len(ks), so 50 repetitions actually

In [105]:
tabular_results.to_parquet(f"{export_seq_id_path}/sensitivity_specificity10.parquet")