# Instructions

Click on the *Runtime* menu and then on *Run all* to execute the program.

The output can be seen at the end of the document after the *Run Analysis* code section.



# CSV data parsing functions

In [0]:
def downloadFile(url):
  '''download csv file from internet'''
  import urllib.request
  import io
  response = urllib.request.urlopen(url)
  data = response.read()     
  text = data.decode('utf-8')
  return io.StringIO(text)


def parse_question(question, yes_n, no_n):
  '''parse question from csv'''
  single = False
  try:
    int_yes_n = int(yes_n)
    int_no_n = int(no_n)
    single = (int_yes_n == 1 or int_no_n == 1)
  except ValueError:
    single = False 
  if question in ["PH", "GH", "YW", "PW", "BB", "RB"]:  
    return None
  elif question in ["D", "ND"]:
    return {'type': 'pattern', 'value': question, 'single': single}
  elif question in ["SP", "SQ", "C", "T"]:
    return {'type': 'shape', 'value': question, 'single': single}
  elif question in ["PI", "R", "PU", "B", "W", "G", "O", "Y"]:
    return {'type': 'color', 'value': question, 'single': single}

  
def parse_csv(file):
  '''extract records from csv file'''
  import csv
  records = {}
  rows = csv.DictReader(file, delimiter=',')
  for row in rows:
    uid = row["ID"]
    record = records.get(uid, None)
    question = parse_question(row["Question"], row["Y"], row["N"])
    if record is None and question is not None:
      records[uid] = {
          "condition": row["Condition"],
          "age": row["Age (Months)"],
          "questions": [question]
      }
    elif record is not None and question is not None:
      records[uid]["questions"].append(question)
  return records

def write_dic_to_csv(csv_file, csv_columns, dict_data):
    "writes dictionary to csv file"
    import csv
    try:
        with open(csv_file, 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
            writer.writeheader()
            for data in dict_data:
                writer.writerow(data)
    except IOError as strerror:
        print("I/O: {1}".format(strerror))

# Constants

In [0]:
# Monster features and possible values
PATTERNS = ["D", "ND"]
SHAPES = ["SP", "SQ", "C", "T"]
COLORS = ["PI", "R", "PU", "B", "W", "G", "O", "Y"]

# Initial hypothesis space
ALL_HYPS = {
  "D": {"level": 0, "pattern": "D", "color": None, "shape": None},
  "T": {"level": 1, "shape": "T", "pattern": "D", "color": None},
  "SP": {"level": 1, "shape": "SP", "pattern": "D", "color": None},
  "G": {"level": 2, "color": "G", "shape": "T", "pattern": "D"},
  "R": {"level": 2, "color": "R", "shape": "T", "pattern": "D"},
  "B": {"level": 2, "color": "B", "shape": "SP", "pattern": "D"},
  "W": {"level": 2, "color": "W", "shape": "SP", "pattern": "D"},

  "ND": {"level": 0, "pattern": "ND", "color": None, "shape": None},
  "C": {"level": 1, "shape": "C", "pattern": "ND", "color": None},
  "SQ": {"level": 1, "shape": "SQ", "pattern": "ND", "color": None},
  "O": {"level": 2, "color": "O", "shape": "C", "pattern": "ND"},
  "PU": {"level": 2, "color": "PU", "shape": "C", "pattern": "ND"},
  "Y": {"level": 2, "color": "Y", "shape": "SQ", "pattern": "ND"},
  "PI": {"level": 2, "color": "PI", "shape": "SQ", "pattern": "ND"}
}

DEBUG = False

# Hypothesis space

In [0]:
import math

class Space:
  '''Represents the hypothesis space'''

  def __init__(self):
    self.hyps = []
    self.hyps[:] = ALL_HYPS.values()
    self.questions = []
    self.questionCounter = -1

  def get_size(self):
    '''Get number of hyps in the space'''
    return len(self.hyps)
  
  def count_hyps(self, feature, values, levels = [0, 1, 2]):
    '''Count remaining hyps in the space where feature in values at levels'''  
    tmp = [x for x in self.hyps if (x[feature] in values and x['level'] in levels)]
    return len(tmp) 

  def ask_all_questions(self):
    '''Ask all questions''' 
    for question in self.questions:
      self.questionCounter = self.questionCounter + 1
      self.ask_question(question)
      

  def ask_question(self, question):
    '''Ask a question and update the hyp space'''

    # compute n prior (space size before asking question)
    n_prior = self.get_size()

    # compute n_post_no 
    # by simulating the removal of hyps in case the answer was NO
    question['answer'] = 'NO'
    n_post_no = self.remove_hyps(question, simulated=True)

    # compute n_post_some
    # by simulating the removal of hyps in case the answer was SOME
    question['answer'] = 'SOME'
    n_post_some = self.remove_hyps(question, simulated=True)


    # compute n_post_yes
    # by simulating the removal of hyps in case the answer was YES
    question['answer'] = 'YES'
    n_post_yes = self.remove_hyps(question, simulated=True) 


    
    # DEBUG
    if DEBUG:
      print()
      if n_post_no + n_post_some + n_post_yes != n_prior:
        print('######### SUM(n_post_X) > n_prior !!!!!!!!!!!')
      print('question_type: {} | question_value: {}'.format(
          question['type'], question['value']
      ))
    
    # compute question infogain
    question['infogain'] = self.compute_infogain(n_prior, n_post_no, n_post_some, n_post_yes)
    
    # compute question real answer (NO, SOME or YES)
    question_answer = self.compute_answer(question)

    # set question real answer 
    # and remove invalidated hyps from the space (this time for real)
    question['answer'] = question_answer
    self.remove_hyps(question)
    
    # DEBUG
    if DEBUG:
      print('question_answer: {}'.format(question['answer']))
    
   
  def compute_infogain(self, n_prior, n_post_no, n_post_some, n_post_yes):
    '''Compute question infogain value'''
    
    if n_prior == 0:
      # DEBUG
      if DEBUG:
        print('n_prior: {} | n_post_no: {} | n_post_some: {} | n_post_yes: {} \ninfogain: {}'.format(
           n_prior, n_post_no, n_post_some, n_post_yes, 0
        ))
      return 0

    # compute h_prior
    h_prior = abs(math.log2(n_prior))

    # compute probabilities
    p_no = n_post_no / n_prior
    p_some = n_post_some / n_prior
    p_yes = n_post_yes / n_prior

    # compute h_post
    h_no = abs(math.log2(n_post_no)) if n_post_no > 0 else 0
    h_some = abs(math.log2(n_post_some)) if n_post_some > 0 else 0
    h_yes = abs(math.log2(n_post_yes)) if n_post_yes > 0 else 0
    h_post = (p_no * h_no) + (p_some * h_some) +  (p_yes * h_yes) 
    
    # compute and return infogain
    infogain = h_prior - h_post
    
    # DEBUG
    if DEBUG:
      print('n_prior: {} | n_post_no: {} | n_post_some: {} | n_post_yes: {} \ninfogain: {}'.format(
          n_prior, n_post_no, n_post_some, n_post_yes, infogain
      ))
    
    return infogain

  
  def compute_answer(self, question):
    '''Compute the answer to a question. Returns NO, SOME or YES'''

    # if asking about pattern X and no other patterns remaining in space (at any level)
    # --> Anser: SOME
    if question['type'] == 'pattern':
        other_patterns = [x for x in PATTERNS if x != question['value']]
        if self.count_hyps('pattern', other_patterns) == 0:
            return 'SOME'
    
    # if asking about shape X and no more other shapes remaining in space (at any level)
    # --> Answer: SOME
    if question['type'] == 'shape':
        other_shapes = [x for x in SHAPES if x != question['value']]
        if self.count_hyps('shape', other_shapes) == 0:
            return 'SOME'

    # if this the last asked question --> the answer is YES
    if self.questionCounter == (len(self.questions) - 1):
        return 'YES'

    # in every other case --> the answer is NO
    return 'NO'
  
  
  def remove_hyps(self, question, simulated = False):
    '''Remove from the space the hyps that are invalidated by 
    the question/answer.
    
    If simulated == True it only simulates the removal.
    
    Returns the number of hyps left in the space after the removal
    '''

    new_hyps = []
    
    #================== PATTERN QUESTION ==================#
 
    # If question type is PATTERN and answer is NO:
    # --> remove all hyps with that pattern (at any level)
    if ALL_HYPS[question['value']]['level'] == 0 and question['answer'] == 'NO': 
      new_hyps = [x for x in self.hyps if not x['pattern'] == question['value']]

    # If question type is PATTERN and answer is SOME:
    # --> remove hyp with same pattern at level 0
    # --> remove all hyps with the other pattern (at any level)
    elif ALL_HYPS[question['value']]['level'] == 0 and question['answer'] == 'SOME':
      other_patterns = [x for x in PATTERNS if x != question['value']]
      new_hyps = [x for x in self.hyps if not (
          (x['pattern'] == question['value'] and x['level'] == 0)
          or (x['pattern'] in other_patterns)
      )]
      
    # If question type is PATTERN and answer is YES:
    # --> keep only itself
    elif ALL_HYPS[question['value']]['level'] == 0 and question['answer'] == 'YES':
       new_hyps = [x for x in self.hyps if (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and x['level'] == 0) 
       )]

        
    #================== SHAPE QUESTION ==================#
    
    # If question type is SHAPE and anwser is NO:
    # --> remove hyp with same shape at level 0
    # --> remove all hyps with that shape (at any level, which is lvl 1 and 2)
    elif ALL_HYPS[question['value']]['level'] == 1 and question['answer'] == 'NO':
      new_hyps = [x for x in self.hyps if not (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and x['level'] == 0)
          or (x['shape'] == question['value'])
      )] 

    # If question type is SHAPE and anwser is SOME:
    # --> remove hyps with same pattern at levels 0,1
    # --> remove hyps with same pattern and different shape at level 2 
    # --> remove all hyps with the other pattern (at any level)
    elif ALL_HYPS[question['value']]['level'] == 1 and question['answer'] == 'SOME':
      other_patterns = [x for x in PATTERNS if x != ALL_HYPS[question['value']]['pattern']]
      new_hyps = [x for x in self.hyps if not (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and (x['level'] in [0, 1]))
          or (x['pattern'] == ALL_HYPS[question['value']]['pattern']  and x['shape'] != ALL_HYPS[question['value']]['shape'] and x['level'] == 2)
          or (x['pattern'] in other_patterns)
      )]
      
    # If question type is SHAPE and answer is YES:
    # keep only itself and its ancestors, which means:
    # --> keep hyp with same shape at level 1 (itself)
    # --> keep hyp with same pattern at level 0
    elif ALL_HYPS[question['value']]['level'] == 1 and question['answer'] == 'YES':
       new_hyps = [x for x in self.hyps if (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and x['level'] == 0) 
          or (x['shape'] == ALL_HYPS[question['value']]['shape'] and x['level'] == 1)
      )]


        
    #================== COLOR QUESTION ==================#
    
    # If question type is COLOR and answer is NO:
    # --> remove itself and its ancestors at any level
    elif ALL_HYPS[question['value']]['level'] == 2 and question['answer'] == 'NO':   
        new_hyps = [x for x in self.hyps if not (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and x['level'] == 0) 
          or (x['shape'] == ALL_HYPS[question['value']]['shape'] and x['level'] == 1)
          or (x['color'] == ALL_HYPS[question['value']]['color'] and x['level'] == 2)
        )]
      
    # If question type is COLOR and answer is YES:
    # keep only itself and its ancestors, which means:
    # --> keep hyp with same color at level 2 (itself)
    # --> keep hyp with same shape at level 1
    # --> keep hyp with same pattern at level 0
    elif ALL_HYPS[question['value']]['level'] == 2 and question['answer'] == 'YES':
       new_hyps = [x for x in self.hyps if (
          (x['pattern'] == ALL_HYPS[question['value']]['pattern'] and x['level'] == 0) 
          or (x['shape'] == ALL_HYPS[question['value']]['shape'] and x['level'] == 1)
          or (x['color'] == ALL_HYPS[question['value']]['color'] and x['level'] == 2)
       )]
      
      
    # remove invalidated hyps from space (if not simulated)
    if not simulated:
        self.hyps[:] = new_hyps

    # return the new hypothesis space size
    return len(new_hyps)
  	

# Run Analysis

In [0]:
from google.colab import files
import time

# Uncomment the following line to print DEBUG values
DEBUG = True

print('START')

print('Downloading input data...')
# Get data file and extract records
passwd = ''
url = 'https://CSV-URL-HERE.csv'
csv_file = downloadFile(url)

print('Parsing input data...')
records = parse_csv(csv_file)
results = []



# For each records...
for key in records:
  
  # print user id
  if DEBUG:
    print('\n\nUID: {}'.format(key))
    print('==================')
  else:
    print('Analyzing uid {}...'.format(key))
  record = records[key]
  
  # create new hypothesis space
  # and ask all the questions
  space = Space()  
  space.questions = record["questions"]
  space.ask_all_questions()

  # initial stats values
  tot_ig = 0
  tot_ig_no_zero = 0
  n_questions_no_zero = 0
  nquestions = len(space.questions)
  nquestion_zeroig = 0
  questions_zeroig_values = []
  
  # For each asked question...
  for idx, question in enumerate(space.questions):
    
    # update total_ig
    tot_ig += question['infogain']
    
    # if infogain > 0 --> update corresponding stats
    if question['infogain'] > 0 and idx < nquestions - 1:
      tot_ig_no_zero += question['infogain']
      n_questions_no_zero = n_questions_no_zero + 1
      
    # if infogain == 0 --> update corresponding stats
    if question['infogain'] == 0 and idx < nquestions - 1:
      nquestion_zeroig = nquestion_zeroig + 1
      questions_zeroig_values.append("["+question['type']+":"+question['value']+";single:"+str(question['single'])+"]")
      
  results.append({
    "id": key,
    "age": record["age"],
    "condition": record["condition"],
    "ig_med": tot_ig / nquestions,
    "ig_med_last_exluded": tot_ig / (nquestions - 1),
    "ig_med_last_exluded_no_zeros": tot_ig_no_zero / n_questions_no_zero,
    "ig_last_question": record["questions"][-1]['infogain'],
    "ig_first_question": record["questions"][0]['infogain'],
    "nquestions_zeroig_last_exluded": nquestion_zeroig,
    "questions_zeroig_last_exluded": ",".join(questions_zeroig_values)
  })

# export results as csv
csv_columns = list(results[-1].keys())
#out_temp = StringIO()
#write_dic_to_csv(out_temp, csv_columns, results)

print('Exporting CSV file...')
timestr = time.strftime("%Y%m%d_%H%M%S")
file_name = 'wiz_analysis_out_' + timestr + '.csv'
write_dic_to_csv(file_name, csv_columns, results)
files.download(file_name)
print('CSV file downloaded.')
print('END')
