<a href="https://colab.research.google.com/github/dinisrferreira/Pseudodiagnosticity-in-a-continuous-learning-environment-Reis-2020-/blob/main/Data_Generator_for_DiagnosticNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [69]:
import random
import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from collections import Counter

In [70]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [71]:
class ConditionDataset:
    def __init__(self, environment="wicked", num_samples=1000, prob_no_decision=0.02, random_state=None):
        self.num_samples = num_samples
        self.prob_no_decision = prob_no_decision
        self.num_no_decision_samples = int(self.num_samples * self.prob_no_decision)
        self.data = []
        self.random_state = random_state

        # Set the random state if provided
        if self.random_state is not None:
            random.setstate(self.random_state)

        self._generate_data(environment)

        # Save the random state after data generation
        self.random_state = random.getstate()


    def _generate_data(self, environment):
        initial_choices = ['a', 'b', 'c', 'd']
        samples_per_type = self.num_samples - self.num_no_decision_samples

        if environment == "wicked":
            for _ in range(samples_per_type):
                initial_cell = random.choice(initial_choices)
                values = self._generate_wicked_values(initial_cell)
                input_vector = [0, 0, 0, 0]
                cell_index = {'a': 0, 'b': 1, 'c': 2, 'd': 3}
                input_vector[cell_index[initial_cell]] = values[initial_cell]
                ##########################################################################################################################################
                diagnosis, golden_label = self._calculate_diagnosis(values, initial_cell)
                ##########################################################################################################################################
                self.data.append({
                    'input': input_vector,
                    'full_values': [values['a'], values['b'], values['c'], values['d']],
                    'label': diagnosis,
                    'golden_label':golden_label,
                    'initial_choice': cell_index[initial_cell],
                    'environment': 'wicked'
                })
        elif environment == "kind":
            for _ in range(samples_per_type):
                initial_cell = random.choice(initial_choices)
                values = self._generate_kind_values(initial_cell)
                values = self._adjust_kind_values(initial_cell, values)
                input_vector = [0, 0, 0, 0]
                cell_index = {'a': 0, 'b': 1, 'c': 2, 'd': 3}
                input_vector[cell_index[initial_cell]] = values[initial_cell]
                ##########################################################################################################################################
                diagnosis, golden_label = self._calculate_diagnosis(values, initial_cell)
                ##########################################################################################################################################
                self.data.append({
                    'input': input_vector,
                    'full_values': [values['a'], values['b'], values['c'], values['d']],
                    'label': diagnosis,
                    'golden_label':golden_label,
                    'initial_choice': cell_index[initial_cell],
                    'environment': 'kind'
                })

        # Generate no-decision cases
        for _ in range(self.num_no_decision_samples):
            if random.choice([True, False]):
                a = random.randint(52, 94)
                b = random.randint(10, 43)
                values = {
                    'a': a,
                    'b': b,
                    'c': b,  # Ensuring a = d and b = c
                    'd': a
                }
            else:
                a = random.randint(10, 43)
                b = random.randint(52, 94)
                values = {
                    'a': a,
                    'b': b,
                    'c': b,  # Ensuring a = d and b = c
                    'd': a
                }

            initial_cell = random.choice(initial_choices)
            input_vector = [0, 0, 0, 0]
            cell_index = {'a': 0, 'b': 1, 'c': 2, 'd': 3}
            input_vector[cell_index[initial_cell]] = values[initial_cell]

            ##########################################################################################################################################
            diagnosis, golden_label = self._calculate_diagnosis(values, initial_cell)
            ##########################################################################################################################################

            # Append the no-decision example to the dataset
            self.data.append({
                'input': input_vector,
                'full_values': [values['a'], values['b'], values['c'], values['d']],
                'label': diagnosis,
                'golden_label': golden_label,
                'initial_choice': cell_index[initial_cell],
                'environment': 'no-decision'
            })

    def _generate_wicked_values(self, initial_cell):
        values = {}
        high_value = random.randint(56, 90)
        low_value = random.randint(10, 43)

        if initial_cell in ['a', 'b']:
            values[initial_cell] = high_value if random.choice([True, False]) else low_value
            if initial_cell == 'a':
                values['b'] = 100 - values['a']
                values['c'] = random.randint(56, 90) if values['a'] > 50 else random.randint(10, 43)
                values['d'] = 100 - values['c']
            elif initial_cell == 'b':
                values['a'] = 100 - values['b']
                values['d'] = random.randint(56, 90) if values['b'] > 50 else random.randint(10, 43)
                values['c'] = 100 - values['d']
        else:
            values[initial_cell] = high_value if random.choice([True, False]) else low_value
            if initial_cell == 'c':
                values['d'] = 100 - values['c']
                values['a'] = random.randint(56, 90) if values['c'] > 50 else random.randint(10, 43)
                values['b'] = 100 - values['a']
            elif initial_cell == 'd':
                values['c'] = 100 - values['d']
                values['b'] = random.randint(56, 90) if values['d'] > 50 else random.randint(10, 43)
                values['a'] = 100 - values['b']

        return values


    def _generate_kind_values(self, initial_cell):
            values = {}
            high_value = random.randint(56, 90)
            low_value = random.randint(10, 43)
            values[initial_cell] = high_value if random.choice([True, False]) else low_value
            #Setting same-row-as-first-choice's values randomly
            if initial_cell == 'a':
                while True:
                  values['b'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                  if values['b']!= values['a']: break

                values['c'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                values['d'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
            elif initial_cell == 'b':
                while True:
                  values['a'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                  if values['a']!= values['b']: break

                values['c'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                values['d'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
            elif initial_cell == 'c':
                while True:
                  values['d'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                  if values['d']!= values['c']: break

                values['a'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                values['b'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)

            elif initial_cell == 'd':
                while True:
                  values['c'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                  if values['c']!= values['d']: break

                values['a'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)
                values['b'] = random.randint(10, 43) if random.choice([True, False]) else random.randint(56, 90)

            return values

    def _adjust_kind_values(self, initial_cell, values):
    # Determine the pattern based on the initial values

        if initial_cell in ['a', 'b']:
            if values['a'] > values['b']:

                while True:
                    values['c'] = random.randint(11,90)
                    if values['c']!=50: break

                while True:
                    values['d'] = random.randint(10,values['c']-1)
                    if values['d']!=50: break

            elif values['a'] < values['b']:
                while True:
                    values['d'] = random.randint(11,90)
                    if values['d']!=50: break

                while True:
                    values['c'] = random.randint(10,values['d']-1)
                    if values['c']!=50: break

        elif initial_cell in ['c', 'd']:
            if values['c'] > values['d']:
                while True:
                    values['a'] = random.randint(11,90)
                    if values['a']!=50: break

                while True:
                    values['b'] = random.randint(10,values['a']-1)
                    if values['b']!=50: break

            elif values['c'] < values['d']:

                while True:
                    values['b'] = random.randint(11,90)
                    if values['b']!=50: break

                while True:
                    values['a'] = random.randint(10,values['b']-1)
                    if values['a']!=50: break

        return values



    def _calculate_diagnosis(self, values, initial_choice):
      label = 3  # Something wrong if it's kept at 3
      golden_label = [0.0, 0.0, 0.0, 0.0]  # Something wrong if it's kept at all zeros

      row_strategy_works = False
      column_strategy_works = False

      prob_disease_A = (values['a'] / (values['a'] + values['b'])) * (values['c'] / (values['c'] + values['d']))
      prob_disease_B = (values['b'] / (values['a'] + values['b'])) * (values['d'] / (values['c'] + values['d']))

      # Convert initial_choice from string to index
      initial_choice_index = {'a': 0, 'b': 1, 'c': 2, 'd': 3}[initial_choice]
      row_initial, col_initial = divmod(initial_choice_index, 2)

      index_to_key = {0: 'a', 1: 'b', 2: 'c', 3: 'd'}

      for second_choice in range(4):
          if second_choice == initial_choice_index:
              continue

          row_second, col_second = divmod(second_choice, 2)
          second_choice_key = index_to_key[second_choice]

          if row_initial == row_second:  # Row strategy
              if initial_choice in ['a','c']:
                if (values[initial_choice] > values[second_choice_key] and prob_disease_A > prob_disease_B) or \
                  (values[initial_choice] < values[second_choice_key] and prob_disease_A < prob_disease_B):
                    row_strategy_works = True

              elif initial_choice in ['b','d']:
                if (values[initial_choice] > values[second_choice_key] and prob_disease_A < prob_disease_B) or \
                  (values[initial_choice] < values[second_choice_key] and prob_disease_A > prob_disease_B):
                    row_strategy_works = True

          if col_initial == col_second:  # Column strategy
              if col_initial == 0:  # 'a' and 'c'
                  if (values[initial_choice] > 50 and values[second_choice_key] > 50 and prob_disease_A > prob_disease_B) or \
                    (values[initial_choice] < 50 and values[second_choice_key] < 50 and prob_disease_A < prob_disease_B):
                      column_strategy_works = True
              elif col_initial == 1:  # 'b' and 'd'
                  if (values[initial_choice] > 50 and values[second_choice_key] > 50 and prob_disease_A < prob_disease_B) or \
                    (values[initial_choice] < 50 and values[second_choice_key] < 50 and prob_disease_A > prob_disease_B):
                      column_strategy_works = True

      if row_strategy_works and column_strategy_works:

          golden_label = [0.0, 0.5, 0.5, 0.0] if initial_choice in ['a', 'c'] else [0.5, 0.0, 0.0, 0.5]

      elif row_strategy_works:
          if initial_choice == 'a':
              golden_label = [0.0, 1.0, 0.0, 0.0]
          elif initial_choice == 'b':
              golden_label = [1.0, 0.0, 0.0, 0.0]
          elif initial_choice == 'c':
              golden_label = [0.0, 0.0, 0.0, 1.0]
          elif initial_choice == 'd':
              golden_label = [0.0, 0.0, 1.0, 0.0]

      elif column_strategy_works:
          print("Error: Only column strategy works, which should not happen.")

      if prob_disease_A == prob_disease_B:
          golden_label = [0.33, 0.33, 0.33, 0.33]
          label = 0
      else:
          label = -1 if prob_disease_A > prob_disease_B else 1

      return label, golden_label




    def get_data(self):
        return self.data

    def get_initial_random_state(self):
        return self.random_state


In [72]:
# Create datasets for wicked and kind environments
wicked_dataset = ConditionDataset(environment="wicked", num_samples=1000)
kind_dataset = ConditionDataset(environment="kind", num_samples=1000)
universal_kind_dataset = ConditionDataset(environment="kind", num_samples=1000)


wicked_data = wicked_dataset.get_data()
kind_data = kind_dataset.get_data()
universal_kind_data = universal_kind_dataset.get_data()

In [73]:
# Example usage:
for sample in wicked_data[:5]:  # Display the first 5 samples for verification
    print(sample)
print("\n \n")
# Example usage:
for sample in kind_data[-10:]:  # Display the first 5 samples for verification
    print(sample)


{'input': [0, 0, 0, 24], 'full_values': [67, 33, 76, 24], 'label': -1, 'golden_label': [0.5, 0.0, 0.0, 0.5], 'initial_choice': 3, 'environment': 'wicked'}
{'input': [57, 0, 0, 0], 'full_values': [57, 43, 88, 12], 'label': -1, 'golden_label': [0.0, 0.5, 0.5, 0.0], 'initial_choice': 0, 'environment': 'wicked'}
{'input': [0, 74, 0, 0], 'full_values': [26, 74, 21, 79], 'label': 1, 'golden_label': [0.5, 0.0, 0.0, 0.5], 'initial_choice': 1, 'environment': 'wicked'}
{'input': [0, 68, 0, 0], 'full_values': [32, 68, 22, 78], 'label': 1, 'golden_label': [0.5, 0.0, 0.0, 0.5], 'initial_choice': 1, 'environment': 'wicked'}
{'input': [0, 29, 0, 0], 'full_values': [71, 29, 64, 36], 'label': -1, 'golden_label': [0.5, 0.0, 0.0, 0.5], 'initial_choice': 1, 'environment': 'wicked'}

 

{'input': [0, 0, 0, 12], 'full_values': [12, 82, 82, 12], 'label': 0, 'golden_label': [0.33, 0.33, 0.33, 0.33], 'initial_choice': 3, 'environment': 'no-decision'}
{'input': [0, 0, 0, 35], 'full_values': [35, 92, 92, 35], 'l

In [74]:
def count_diagnoses(data):
    diagnosis_counts = Counter([example['label'] for example in data])
    return diagnosis_counts

def count_wicked_examples(data):
    wicked_counts = len([example['golden_label'] for example in data if set(example['golden_label']) == {0.0,0.5}]) # Convert the list to a tuple
    return wicked_counts

def count_kind_examples(data):
    kind_counts = len([example['golden_label'] for example in data if set(example['golden_label']) == {0.0,1.0}]) # Convert the list to a tuple
    return kind_counts

def count_no_decision_examples(data):
    no_decision_counts = len([example['golden_label'] for example in data if set(example['golden_label']) == {0.33}]) # Convert the list to a tuple
    return no_decision_counts


def distribution_of_cell_values(data):
    cell_values = {'a': [], 'b': [], 'c': [], 'd': []}
    for example in data:
        values = example['full_values']
        cell_values['a'].append(values[0])
        cell_values['b'].append(values[1])
        cell_values['c'].append(values[2])
        cell_values['d'].append(values[3])
    return cell_values

def count_initial_choices(data):
    initial_choice_counts = Counter([example['initial_choice'] for example in data])
    return initial_choice_counts

def plot_cell_value_distribution(cell_values):
    for cell, values in cell_values.items():
        sns.histplot(values, bins=20, kde=True)
        plt.title(f'Distribution of Values for Cell {cell.upper()}')
        plt.xlabel('Value')
        plt.ylabel('Frequency')
        plt.show()

In [75]:
print("Diagnosis Counts:")
print("Correct diseases in Wicked: ", count_diagnoses(wicked_dataset.get_data()))

print("Correct diseases in Wicked: ", count_diagnoses(kind_dataset.get_data()))

print("\n")

print("                        Wicked Dataset:                   ")
print("Wicked examples in Wicked: ",count_wicked_examples(wicked_dataset.get_data()))
print("Kind examples in Wicked: ",count_kind_examples(wicked_dataset.get_data()))
print("No decision examples in Wicked: ",count_no_decision_examples(wicked_dataset.get_data()))

print("\n")

print("                        Kind Dataset:                   ")
print("Wicked examples in Kind: ", count_wicked_examples(kind_dataset.get_data()))
print("Kind examples in Kind: ", count_kind_examples(kind_dataset.get_data()))
print("No decision examples in Kind: ", count_no_decision_examples(kind_dataset.get_data()))
print("\n")


Diagnosis Counts:
Correct diseases in Wicked:  Counter({1: 508, -1: 472, 0: 20})
Correct diseases in Wicked:  Counter({1: 518, -1: 462, 0: 20})


                        Wicked Dataset:                   
Wicked examples in Wicked:  980
Kind examples in Wicked:  0
No decision examples in Wicked:  20


                        Kind Dataset:                   
Wicked examples in Kind:  497
Kind examples in Kind:  483
No decision examples in Kind:  20




In [76]:
cell_values = distribution_of_cell_values(kind_data)
#plot_cell_value_distribution(cell_values)

diagnosis_counts = count_diagnoses(kind_data)
initial_choice_counts = count_initial_choices(kind_data)

A conversão em percentagens acontece depois do dataset ser criado porque há vantagens em ter operações com inteiros na criação de condições

In [77]:
def convert_percentages(data):
    for item in data:
        # Check if the first value in 'input' is already a float with two decimal places
        if isinstance(item['input'][0], float) and round(item['input'][0], 2) == item['input'][0]:
            continue  # Skip conversion if values are already in the desired format

        # Convert 'input' and 'full_values' lists
        item['input'] = [x / 100.0 for x in item['input']]
        item['full_values'] = [x / 100.0 for x in item['full_values']]
    return data

In [78]:
# Conversão em Percentagens
wicked_data = convert_percentages(wicked_data)
kind_data = convert_percentages(kind_data)
universal_kind_data = convert_percentages(universal_kind_data)

In [79]:
# Step 2: Tidying the data into a table format
wicked_df = pd.DataFrame(wicked_data)
kind_df = pd.DataFrame(kind_data)
universal_kind_df = pd.DataFrame(universal_kind_data)

In [80]:
# Stratified split for wicked dataset
wicked_df['stratify_label'] = wicked_df['label'].apply(lambda x: 1 if x == 0 else 0)
stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in stratified_split.split(wicked_df, wicked_df['stratify_label']):
    wicked_train_set = wicked_df.loc[train_index].drop(columns=['stratify_label'])
    wicked_test_set = wicked_df.loc[test_index].drop(columns=['stratify_label'])

# Stratified split for kind dataset
kind_df['stratify_label'] = kind_df['label'].apply(lambda x: 1 if x == 0 else 0)
stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in stratified_split.split(kind_df, kind_df['stratify_label']):
    kind_train_set = kind_df.loc[train_index].drop(columns=['stratify_label'])
    kind_test_set = kind_df.loc[test_index].drop(columns=['stratify_label'])

# Stratified split for universal kind dataset
universal_kind_df['stratify_label'] = universal_kind_df['label'].apply(lambda x: 1 if x == 0 else 0)
stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in stratified_split.split(universal_kind_df, universal_kind_df['stratify_label']):
    universal_kind_train_set = universal_kind_df.loc[train_index].drop(columns=['stratify_label'])
    universal_kind_test_set = universal_kind_df.loc[test_index].drop(columns=['stratify_label'])

# Now you have stratified train and test sets for both wicked and kind datasets
print(f'Wicked train set: {wicked_train_set.shape}')
print(f'Wicked test set: {wicked_test_set.shape}')
print(f'Kind train set: {kind_train_set.shape}')
print(f'Kind test set: {kind_test_set.shape}')
print(f'Universal Kind train set: {universal_kind_train_set.shape}')
print(f'Universal Kind test set: {universal_kind_test_set.shape}')

Wicked train set: (800, 6)
Wicked test set: (200, 6)
Kind train set: (800, 6)
Kind test set: (200, 6)
Universal Kind train set: (800, 6)
Universal Kind test set: (200, 6)


In [81]:
wicked_train_set.head()

Unnamed: 0,input,full_values,label,golden_label,initial_choice,environment
374,"[0.0, 0.0, 0.0, 0.34]","[0.85, 0.15, 0.66, 0.34]",-1,"[0.5, 0.0, 0.0, 0.5]",3,wicked
197,"[0.0, 0.65, 0.0, 0.0]","[0.35, 0.65, 0.43, 0.57]",1,"[0.5, 0.0, 0.0, 0.5]",1,wicked
81,"[0.0, 0.0, 0.2, 0.0]","[0.26, 0.74, 0.2, 0.8]",1,"[0.0, 0.5, 0.5, 0.0]",2,wicked
473,"[0.0, 0.0, 0.13, 0.0]","[0.11, 0.89, 0.13, 0.87]",1,"[0.0, 0.5, 0.5, 0.0]",2,wicked
795,"[0.0, 0.0, 0.89, 0.0]","[0.65, 0.35, 0.89, 0.11]",-1,"[0.0, 0.5, 0.5, 0.0]",2,wicked


In [83]:
# Get the current timestamp
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')

# Define the folder path
folder_path = '/content/drive/My Drive/Data for DiagnosticNet/'

# Save the dataframes to CSV files with a timestamp
wicked_train_set.to_csv(f'{folder_path}wicked_train_set_{timestamp}.csv', index=False)
wicked_test_set.to_csv(f'{folder_path}wicked_test_set_{timestamp}.csv', index=False)
kind_train_set.to_csv(f'{folder_path}kind_train_set_{timestamp}.csv', index=False)
kind_test_set.to_csv(f'{folder_path}kind_test_set_{timestamp}.csv', index=False)
universal_kind_train_set.to_csv(f'{folder_path}universal_kind_train_set_{timestamp}.csv', index=False)
universal_kind_test_set.to_csv(f'{folder_path}universal_kind_test_set_{timestamp}.csv', index=False)