# Dependencies, definitions, and configuration
Triggered automatically as required. **No need to run manually.**

In [7]:
# @title Dependencies code block
# @markdown Imported libraries and authentication
# Base dependencies
import csv
import io
import re
import sys
import time

""" default gspread version 3.4.2 needs updating each runtime 🙄
    try/except block removes need to reinstall in same runtime """
try:
  if gspread.__version__ < '5.12.0':
    !pip install --upgrade gspread
    import gspread
  else:
    import gspread
except: # gspread not defined
    !pip install --upgrade gspread
    import gspread

from google.auth import default
from google.colab import files
from google.colab import drive
from google.colab import auth
from itertools import combinations
from IPython.display import display, HTML, Javascript
import ipywidgets as widgets
from decimal import Decimal, ROUND_HALF_UP

# Establish Google authentication session
auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

In [8]:
# @title Configuration code block
# @markdown These configuration options may require changing at a future point. The variables defined are:
# @markdown * Use of best attempt averages for compensation
# @markdown * File/folder IDs for programme route sheet, checking sheet, and Board report template
# @markdown * Lists of covid and pre-covid OLs
# @markdown * Grade behaviour - defines how grades will be interpreted when checking SPRs are ready to process at present:
# @markdown   * SMRs with SD and WD are excluded (the SPR is retained)
# @markdown   * For ALL other SMRs, any grades that are not P* or F* lead to the exclusion of the whole SPR record

# compensation vs condonation
""" global toggles record whether to apply 3rd compensation rule
    3rd rule = requires best att average >= 50
    + With 3rd rule, we're using COMPENSATION
    + Without 3rd rule, we're using CONDONATION
    Therefore, (COMPENSATION = False) == (CONDONATION = true) """
COV_COMPENSATION = True
""" NB NEW_COMPENSATION governs students with ruleset=best """
NEW_COMPENSATION = True

# define NEW_COMPENSATION attribute
""" if NEW_COMPENSATION is true, use the following to calculate averages """
""" valid options are 'best_attempt' or 'module_mark' """
NEW_COMP_ATTR = 'best_attempt'

# Grade behaviour
# grades that should result in the exclusion of the SMR only
gex_SMR = ['SD', 'WD']
# if ANY SMR grade does NOT match the following, exclude the whole SPR
""" Uses regex to create P* or F*; used as nand P* F* """
gex_SPR = re.compile(r'^(P|F).?')

# file and folder ids
""" File ID for programme route sheet """
prog_route_id = '1ud_IrC8hDteiVTLs434EHTCZEFoheuXMAf9iLHCv4Uk'
""" File ID for INTERNAL output spreadsheet """
output_id = '1dJ5kDk5kpFxG4nfeykt8Kgsx7vl_sE-Vh3-fGS5HABA'
""" File ID for EXTERNAL output template """
board_template_id = '180eEE4HY-k0VI2vMHvl7WztpDRYiFhqBXkE32vjU1OE'

# covid and old ols
""" these OLs are fixed, hence hard-coded lists """
covid_ols = ['2019/0 OL4','2019/0 OL5','2019/0 OL6',
            '2020/1 OL1','2020/1 OL2','2020/1 OL3',
            '2020/1 OL4','2020/1 OL5','2020/1 OL6',
            '2021/2 OL1','2021/2 OL2','2021/2 OL3',
            '2021/2 OL4','2021/2 OL5','2021/2 OL6',
            '2022/3 OL1','2022/3 OL2','2022/3 OL3',
            '2022/3 OL4','2022/3 OL5','2022/3 OL6']

old_ols = ['2017/8 OL6','2018/9 OL1', '2018/9 OL2','2018/9 OL3',
          '2018/9 OL4','2018/9 OL5','2018/9 OL6',
          '2019/0 OL1','2019/0 OL2','2019/0 OL3']

# AIT checking
""" AIT_CHECK determines whether we are checking AIT status """
AIT_CHECK = True
""" ait_field is the column name in SPR export """
ait_field = 'User Defined Field 11'
""" ait_value is the value to look for """
ait_value = 'PASS'

In [9]:
# @title Class definitions code block
# @markdown **Don't edit these.** (Don't even look at them 🙈)
# == DEFINE SPR_RECORD CLASS ==
""" Use e.g. "param=None" to assign default values """
class SPR_RECORD:
  def __init__(self,
               SPR_code, surname, forename, status, route, ait_status,
               award=None, passed_credits=0,
               marginal_credits=0, cpm_credits=0,
               rank=None, award_mark=None, department=None,
               outcome=None, ruleset=None, method=None,
               covid_rank=None, covid_am=None,
               new_am=None, new_rank=None, am_uplift=None
               ):

    """
    fixed properties; full list preferable to **kwargs
    NB default values have been given to properties
    populated by methods
    """

    # Pulled from fSPRs:
    self.SPR_code = SPR_code
    self.surname = surname
    self.forename = forename
    self.status = status
    self.route = route
    # Generated by class methods:
    self.ruleset = ruleset
    self.award = award
    self.passed_credits = passed_credits
    self.marginal_credits = marginal_credits
    self.cpm_credits = cpm_credits
    self.rank = rank
    self.award_mark = award_mark
    self.method = method # ruleset used
    self.department = department # read from programmes sheet
    self.outcome = outcome # store calculator outcome (e.g. errors)
    self.covid_am = covid_am
    self.covid_rank = covid_rank
    self.new_am = new_am
    self.new_rank = new_rank
    self.ait_status = ait_status
    self.am_uplift = am_uplift
    self.SMRs = {} # dictionary of SMR_Record objects

  # add SMR to this instance
  def add_SMR_record(self, SMR):
    self.SMRs[(SMR.code, SMR.year, SMR.ol)] = SMR

  # Check SMRs and SPR for any issues
  def ready_to_process(self, errors, routes):
    """ uses globals: gex_SMR, gex_SPR """
    route = routes[self.route]
    SMR_codes = []

    # Remove SDs and WDs
    non_SD_codes = [SMR.code for SMR in self.SMRs.values() if not SMR.grade == 'SD']
    gex_SMR_list = []
    for id, record in self.SMRs.items():
      """ SMRs on gex_SMR list? Yes = exclude SMR, continue """
      if record.grade in gex_SMR:
        gex_SMR_list.append(id)
        """ report SPR error if no non-SD SMRs """
        if record.grade == 'SD' and non_SD_codes.count(record.code) == 0:
          errors.append([self.SPR_code, record.code, "All SMRs for this module have SD grades"])
        """ report SPR error if WD is potentially credit-bearing """
        if record.grade == 'WD' and record.best_attempt >= 39.5:
          errors.append([self.SPR_code, record.code, "A WD grade is potentially credit-bearing for this module"])

    for each in gex_SMR_list:
      self.SMRs.pop(each)

    # Perform checks on each remaining SMR
    for id, record in self.SMRs.items():

      """ SMRs match gex_SPR? Yes = exclude SPR """
      if not re.match(gex_SPR, record.grade):
        errors.append([self.SPR_code, record.code, f"Invalid grade {record.grade}"])

      """ SMRs duplicated? Yes = exclude SPR """
      if record.code in SMR_codes:
        errors.append([self.SPR_code, record.code, "Duplicate SMR"])

      SMR_codes.append(record.code)

      """ SMRs belong to route? No = exclude SPR """
      if record.code not in route.route_modules:
        errors.append([self.SPR_code, record.code, f"Module not in route {self.route}"])

    # Check route complete
    route_SMRs = set([getattr(record, 'module_code') for record in routes[self.route].route_modules.values()])
    """ using sets to exclude duplicates and resolve ordering """
    if not route_SMRs == set(SMR_codes) and not self.status == 'WDN':
      errors.append([self.SPR_code, self.route, f"Incomplete route; status {self.status}"])
    elif route_SMRs == set(SMR_codes) and self.status == 'WDN':
      errors.append([self.SPR_code, self.route, f"Student has completed, but status is WDN"])

    # Check AIT status, if required
    """ ignore students that are WDN """
    if AIT_CHECK and self.ait_status != ait_value and self.status != 'WDN':
        errors.append([self.SPR_code, self.route, f"Has not completed AIT. AIT status = {self.ait_status}; student status = {self.status}"])


  def record_department(self, routes):
    route = routes[self.route]
    department = route.route_department
    self.department = department

  def determine_ruleset(self, covid_ols, old_ols, errors, SPR_records):
    covid_period = False
    new_period = False
    old_period = False

    for id, record in self.SMRs.items():
      start_year, end_year = map(int, record.year.split("/"))
      ol_string = record.year + " " + record.ol[:3]
      if start_year >= 2023:
        new_period = True
      elif ol_string in covid_ols:
        covid_period = True
      elif ol_string in old_ols:
        old_period = True
      else:
        """ error - iteration unidentifiable """
        errors.append([self.SPR_code, record.code, f"Error in iteration {record.year} {record.ol}"])
        return 1


    if covid_period and new_period:
      ruleset = "best"
    elif covid_period:
      ruleset = "covid"
    elif new_period or old_period:
      ruleset = "new"
    else:
        """ error - cannot calculate ruleset - exclude """
        errors.append([self.SPR_code, self.route, f"Cannot determine ruleset - check iterations"])
        return 1

    self.ruleset = ruleset
    return 0


  def calculate_module_mark(self):
    for id, record in self.SMRs.items():
      # call SMR_CLASS method
      record.calculate_module_mark()

  def store_credit_value(self, routes):
    for id, SMR in self.SMRs.items():
      # call SMR_CLASS method; pass routes, route
      SMR.store_credit_value(routes, self.route)

  def store_cpm(self, routes):
    for id, SMR in self.SMRs.items():
      # call SMR_CLASS method; pass routes, route
      SMR.store_cpm(routes, self.route)

  def calculate_award(self, errors, SPR_records):

    #define mark to use based on ruleset
    if self.ruleset == "covid":
      attr = 'best_attempt'
    else:
      attr = 'module_mark'

    # count credits (passed/marginal)
    pass_cred = 0; cpm_cred = 0; marg_cred = 0

    """ need to use separate _new variable to accommodate best attempt average """
    w_mark_sum_cov = 0 # used for COV MSCOL compensation
    w_mark_sum_new = 0 # used for NEW MSCOL compensation

    for id, record in self.SMRs.items():
      mark = getattr(record, attr)
      new_comp_mark = getattr(record, NEW_COMP_ATTR)
      if mark >= 49.5:
        if record.cpm:
          cpm_cred += record.credit_value
        else:
          pass_cred += record.credit_value
          # NB CPM marks not included in compensation average
          w_mark_sum_cov += (mark * record.credit_value)
          w_mark_sum_new += (new_comp_mark * record.credit_value)
      # if mark 40-49 and NOT cpm, add to marg credits.
      elif mark >= 39.5 and not record.cpm:
        w_mark_sum_cov += (mark * record.credit_value)
        w_mark_sum_new += (new_comp_mark * record.credit_value)
        marg_cred += record.credit_value
      else:
        # failed module - exclude
        record.covid_ex = True
        record.new_ex = True

    # store credit values
    self.passed_credits = pass_cred
    self.marginal_credits = marg_cred
    self.cpm_credits = cpm_cred

    pass_cpm_cred = pass_cred + cpm_cred
    all_cred = pass_cpm_cred + marg_cred
    pass_marg_cred = pass_cred + marg_cred

    # determine POTENTIAL award based on credits
    """ uses >= 180 credits to catch excess credits """

    # Multi-step conditional. Checks comp/cond rules.
    if pass_cpm_cred >= 180 or (pass_cpm_cred >= 150 and all_cred >= 180):

      # check for NEW
      if ((self.ruleset in ["new", "best"] and not NEW_COMPENSATION) or (
           self.ruleset in ["new", "best"] and
           round_float((w_mark_sum_new / pass_marg_cred), '1.') >= 50)):
        self.award_MSCOL()
        return 0

      # check for COVID
      elif ((self.ruleset == "covid" and not COV_COMPENSATION) or (
           self.ruleset == "covid" and
           round_float((w_mark_sum_cov / pass_marg_cred), '1.') >= 50)):
        self.award_MSCOL()
        return 0

      # else default to PGDIP check

    # Pause to calc mod list and ex CPM
    SMR_list = []
    for id, record in self.SMRs.items():
      # Ignore excluded modules
      if record.covid_ex or record.new_ex:
        continue
      # Ignore CPM for Dips; exclude
      elif record.cpm:
        record.covid_ex = True; record.new_ex = True
        continue
      else:
        SMR_list.append(record)

    if pass_cred >= 120 or (pass_cred >= 90 and pass_marg_cred >= 120):
      # potentially PGDIP. Check combinations.
      if self.award_PGDIP_PGCER(SMR_list, 8, 2):
        self.award = "PGDIP"
        return 0

    if pass_cred >= 60 or (pass_cred >= 45 and pass_marg_cred >= 60):
      # potentially PGCER. Check combinations.
      if self.award_PGDIP_PGCER(SMR_list, 4, 1):
        self.award = "PGCER"
        return 0
    else:
      # no award achieved - flag fail
      errors.append([{self.SPR_code},{self.route}, f"No award achievable? Credits = passed taught: {self.passed_credits}, cpm: {self.cpm_credits}, marginal: {self.marginal_credits}"])
      return 1

  def award_MSCOL(self):
  # assign credit to all modules
    for id, record in self.SMRs.items():
      record.credits_passed = record.credit_value
    self.award = "MSCOL"
    return

  def award_PGDIP_PGCER(self, SMR_list, num_mods, max_marg):

    def get_best(SMR_list, num_mods, max_marg, ruletype, ruleset):

      """
      NB get_best is a brute-force algorithm
      Given the scale of our problem, it is completely viable
      A recursive backtrack algorithm would be more efficient, though
      """

      all_com = list(combinations(SMR_list, num_mods))

      if len(all_com) > 0:
        valid_com = [com for com in all_com if meets_criteria(com, max_marg, ruleset)]

        # if no valid_com, return NULL
        if not valid_com:
          return []

        if ruletype == "new":
          # 0 = best module_mark avg
          x = 0
        else:
          # 1 = best first_attempt avg
          x = 1

        return max(valid_com, key=lambda n: calc_averages(n)[x])

      else:
        # Error! This shouldn't be possible
        pass

    def calc_averages(com):

      credit_total = sum(record.credit_value\
                          for record in com)
      first_att_sum = sum((record.first_attempt * record.credit_value)
                          for record in com)
      best_att_sum = sum((record.best_attempt * record.credit_value)
                          for record in com)
      mod_mark_sum = sum((record.module_mark * record.credit_value)
                          for record in com)
      # we could use len() as credit value is consistent, but less dynamic

      if credit_total == 0:
        # return error - something has gone wrong!
        # shouldn't be possible
        pass

      # return 0 mod_mark avg, 1 first_att avg, 2 best_att avg
      return round_float(mod_mark_sum/credit_total, '1.'),\
            round_float(first_att_sum/credit_total, '1.'),\
            round_float(best_att_sum/credit_total, '1.'),\
           round_float(mod_mark_sum/credit_total, '1.')

    def meets_criteria(com, max_marg, ruleset):

      _, _, best_att_avg, mod_mark_avg = calc_averages(com)

      if NEW_COMP_ATTR == 'best_attempt':
        new_avg = best_att_avg
      else:
        new_avg = mod_mark_avg

      if ((ruleset == "covid" and COV_COMPENSATION and best_att_avg < 50) or (
           ruleset in ["new", "best"] and NEW_COMPENSATION and new_avg < 50)):
        comp_req = False
      else:
        comp_req = True

      marg_count = sum(1 for record in com if record.best_attempt < 49.5)
      return comp_req and marg_count <= max_marg

    best_new = []
    best_covid = []

    # Check ruleset and run combination functions accordingly
    if self.ruleset in ["new", "best"]:
      best_new = get_best(SMR_list, num_mods, max_marg, "new", self.ruleset)

      if best_new:
        # mark unused modules as new_ex = True
        best_codes = set(module.code for module in best_new)
        for id, module in self.SMRs.items():
          if module.code not in best_codes:
            module.new_ex = True

    if self.ruleset in ["covid", "best"]:
      best_covid = get_best(SMR_list, num_mods, max_marg, "covid", self.ruleset)

      if best_covid:
        # mark unused modules as cov_ex = True
        best_codes = set(module.code for module in best_covid)
        for id, module in self.SMRs.items():
          if module.code not in best_codes:
            module.covid_ex = True

    # if one or both have returned a valid list = PGDIP confirmed
    if best_new or best_covid:
      return True
    else:
      return False

  # calculate classification
  def award_rank(self):

    if self.ruleset == "best":
      covid_am, covid_rank =  self.award_rank_covid()
      new_am, new_rank = self.award_rank_new()
      """ store covid and new outcomes for user comparison """
      self.covid_am = covid_am
      self.covid_rank = covid_rank
      self.new_am = new_am
      self.new_rank = new_rank

      if new_rank == covid_rank:
        self.award_mark = new_am
        self.method = "new"
        self.rank = new_rank

      elif new_rank == "Distinction":
        self.award_mark = new_am
        self.rank = "Distinction"
        self.method = "new"

      elif covid_rank == "Distinction":
        self.award_mark = covid_am
        self.rank = "Distinction"
        self.method = "covid"

      elif new_rank == "Merit":
        self.award_mark = new_am
        self.rank = "Merit"
        self.method = "new"

      else:
        self.award_mark = covid_am
        self.rank = "Merit"
        self.method = "covid"

    else:
      if self.ruleset == "new":
        am, rank = self.award_rank_new()
        self.method = "new"
      else:
        am, rank = self.award_rank_covid()
        self.method = "covid"

      self.award_mark = am
      self.rank = rank

    # For COVID method only: set am to 49.50 if less than this value
    """ must be at least lowest value consistent with a pass """
    if self.method == "covid" and self.award_mark < 50:
      self.award_mark = 49.5
      self.am_uplift = 'Yes'

  def award_rank_new(self):

    # using module_mark
    award_mark_sum = 0; credit_total = 0
    cpm_mark_sum = 0; cpm_credit = 0

    for id, record in self.SMRs.items():
      if record.new_ex:
        continue

      cw_mark = record.module_mark * record.credit_value
      award_mark_sum += cw_mark
      credit_total += record.credit_value

      if record.cpm:
        cpm_mark_sum += cw_mark
        cpm_credit += record.credit_value

    award_mark = round_float((award_mark_sum/credit_total), '1.')
    if cpm_credit != 0:
      cpm_mark = round_float((cpm_mark_sum/cpm_credit), '1.')
    else:
      cpm_mark = 0

    # award_mark_sum = sum((record.module_mark * record.credit_value) for record in self.SMRs if not record.new_ex)
    # credit_total = sum(record.credit_value for record in self.SMRs if not record.new_ex)
    # award_mark = award_mark_sum / credit_total

    # cpm_mark_sum = sum((record.module_mark * record.credit_value) for record in self.SMRs if (not record.new_ex and record.cpm))
    # cpm_credit = sum(record.credit_value for record in self.SMRs if (not record.new_ex and record.cpm))
    # cpm_mark = cpm_mark_sum / cpm_credit

    """ using rounding for rank, so award mark must be e.g. 69.5 or higher """
    if self.award == "MSCOL":
      if award_mark >= 70 and cpm_mark >= 70:
        return award_mark, "Distinction"
      elif award_mark >= 60 and cpm_mark >= 60:
        return award_mark, "Merit"
      else:
        return award_mark, "Pass"

    elif self.award == "PGDIP":
      if award_mark >= 70:
        return award_mark, "Distinction"
      elif award_mark >= 60:
        return award_mark, "Merit"
      else:
        return award_mark, "Pass"

    else:
      return award_mark, "Pass"

  def award_rank_covid(self):

    #using first_attempt
    award_mark_sum = 0; credit_total = 0
    cpm_mark_sum = 0; cpm_credit = 0
    taught_mark_sum = 0; taught_credit = 0
    first_att_of = 0; first_att_mf = 0

    for id, record in self.SMRs.items():

      if record.covid_ex:
        continue

      cw_mark = record.first_attempt * record.credit_value
      award_mark_sum += cw_mark
      credit_total += record.credit_value

      if record.cpm:
        cpm_mark_sum += cw_mark
        cpm_credit += record.credit_value
      else:
        taught_mark_sum += cw_mark
        taught_credit += record.credit_value

      if record.first_attempt < 49.5 and not record.cpm:
        first_att_mf += record.credit_value
      if record.first_attempt < 39.5 and not record.cpm:
        first_att_of += record.credit_value

    award_mark = round_float((award_mark_sum/credit_total),'1.')
    if cpm_credit != 0:
      cpm_mark = round_float((cpm_mark_sum/cpm_credit), '1.')
    else:
      cpm_mark = 0
    taught_avg = round_float((taught_mark_sum/taught_credit), '1.')

    if self.award == "MSCOL":
      if (award_mark >= 70 or taught_avg >= 70) and cpm_mark >= 50 and first_att_mf <= 15 and first_att_of == 0:
        return award_mark, "Distinction"
      elif (award_mark >= 60 or taught_avg >= 60) and cpm_mark >= 50 and first_att_mf <= 30 and first_att_of <= 15:
        return award_mark, "Merit"
      else:
        return award_mark, "Pass"

    elif self.award == "PGDIP":
      if award_mark >= 70 and first_att_mf <= 15 and first_att_of == 0:
        return award_mark, "Distinction"
      elif award_mark >= 60 and first_att_mf <= 30 and first_att_of <= 15:
        return award_mark, "Merit"
      else:
        return award_mark, "Pass"

    else:
      return award_mark, "Pass"


# == DEFINE SMR_RECORD CLASS ==

class SMR_RECORD:
  def __init__(self,
               code, year, ol, first_attempt, best_attempt, grade,
               cu, co, status, credits_passed=0, credit_value=None,
               module_mark=None, covid_ex=False, new_ex=False, cpm=False
               ):

    # properties populated by methods have default values

    # Pulled from fSMRs
    self.code = code
    self.year = year
    self.ol = ol
    self.grade = grade
    self.cu = cu
    self.co = co
    self.status = status
    # float values - account for failure
    try:
      self.first_attempt = round_float(float(first_attempt)/100, '.01')
    except ValueError:
      self.first_attempt = first_attempt
    try:
      self.best_attempt = round_float(float(best_attempt)/100, '.01')
    except ValueError:
      self.best_attempt = best_attempt

    # Generate by class methods
    self.credits_passed = credits_passed
    self.credit_value = credit_value
    self.cpm = cpm
    self.module_mark = module_mark
    self.covid_ex = covid_ex
    self.new_ex = new_ex

  def calculate_module_mark(self):
    first_att = self.first_attempt
    best_att = self.best_attempt
    if first_att == best_att:
      module_mark = first_att
    elif best_att > 50:
      module_mark = 50
    else:
      module_mark = best_att
    self.module_mark = module_mark

  def store_credit_value(self, routes, route):
    self.credit_value = routes[route].route_modules[self.code].credit_value

  def store_cpm(self, routes, route):
    if routes[route].route_modules[self.code].cpm:
      self.cpm = True


# == DEFINE PROGRAMME_ROUTE CLASS ==

class PROGRAMME_ROUTE:
  def __init__(self,
               programme_code,
               route_code,
               department,
               shortname
               ):

    self.programme_code = programme_code
    self.route_code = route_code
    self.route_department = department
    self.route_shortname = shortname
    self.route_modules = {}

  def add_route_module(self, module):
    self.route_modules[module.module_code] = module


# == DEFINE MODULE CLASS ==

class MODULE:
  def __init__(self,
               module_code,
               credit_value,
               cpm
               ):

    self.module_code = module_code
    self.cpm = cpm
    self.credit_value = credit_value

In [10]:
# @title Function definitions code block
# @markdown **Don't edit these either** 🙏

def run_ac(b):

  start_time = time.time()
  print("AC started!")

  # interpret user values
  r_period = report_period.value

  if report_type.value == 'Internal checking':
    INTERNAL = True
  else:
    INTERNAL = False

  if not r_period and not INTERNAL:
    r_period = 'PLACEHOLDER - PLEASE UPDATE'

  #retrieve route data
  """ test if required """
  global routes
  if 'routes' in globals():
    pass
  else:
    routes = retrieve_routes(prog_route_id)

  # Retrieve SPR data from uploaded files
  """ chardet id'd encoding as ISO-8859-1, which is an alias for latin_1 """
  fname_SPRs = list(fupload_SPRs.keys())[0]
  fdata_SPRs = fupload_SPRs[fname_SPRs].decode('latin_1')
  fSPRs = io.StringIO(fdata_SPRs)

  SPR_records = store_SPRs(fSPRs)

  # Retrieve SMR data from uploaded files
  fname_SMRs = list(fupload_SMRs.keys())[0]
  fdata_SMRs = fupload_SMRs[fname_SMRs].decode('latin_1')
  fSMRs = io.StringIO(fdata_SMRs)
  store_SMRs(fSMRs, SPR_records)

  # Check for errors
  errors = check_ready(SPR_records, routes)

  # Process for each record
  """ uses SPR_RECORD methods """
  for id, record in SPR_records.items():
    try:
      record.record_department(routes)
      do_rs = record.determine_ruleset(covid_ols, old_ols, errors, SPR_records)
      if do_rs != 0:
        """ returned error code - skip record """
        continue
      record.calculate_module_mark()
      record.store_credit_value(routes)
      record.store_cpm(routes)
      do_ca = record.calculate_award(errors, SPR_records)
      if do_ca != 0:
        """ returned error code - skip record """
        continue
      record.award_rank()
    except Exception as e:
      print(f"Error encountered with {record.SPR_code}.\n Error details: {e}")

  # clear new errors from SPR_records
  clear_errors(errors, SPR_records)

  # output results
  output_results(SPR_records, routes, errors, INTERNAL, r_period)

  print(f"AC complete 😅\nIt tooks {int(round((time.time() - start_time),0))} seconds!")

def retrieve_routes(id):

  # Get programme route ss by key
  routes_ss = gc.open_by_key(id)

  # Read and store route data
  routes_sh = routes_ss.worksheet('Routes')
  routes_data = routes_sh.get_all_records()
  """ get_all_records returns a list of dictionaries """

  # store route objects in routes dictionary
  routes = {}

  # initialise and store routes
  for row in routes_data:
    route_code = row['Route Code']
    """ add truthy check to eliminate blank entries """
    if route_code and route_code not in routes:
      route = PROGRAMME_ROUTE(row['Programme Code'],
                              route_code,
                              row['Department'],
                              row['Short name'])
      routes[route_code] = route

  # use routes to id required tab
  # store modules in relevant route
  for id, data in routes.items():
    modules_sh = routes_ss.worksheet(data.route_code)
    modules_data = modules_sh.get_all_records()
    for row in modules_data:
      if row['CPM'].lower() == 'yes':
        cpm = True
      else:
        cpm = False

      module = MODULE(row['Module Code'],
                      row['Credit Value'],
                      cpm)
      data.add_route_module(module)

  return routes


def store_SPRs(data):

  # Create empty DICTIONARY to store SPR_Records
  SPR_records = {}

  # Iterate over data using dict reader and initialise SPR_Record
  # Exclude duplicate SPRs (should be impossible)
  # Report errors where required values are not found (see class)

  reader = csv.DictReader(data)
  for row in reader:
    SPR_code = row['SPR Code']
    # print(f"{SPR_code}")
    if SPR_code not in SPR_records:
      SPR = SPR_RECORD(SPR_code,
                      row['Surname'],
                      row['Forename 1'],
                      row['Student Status code'],
                      row['Route code'],
                      row[ait_field])
      SPR_records[SPR_code] = SPR

  """ SPR_records dictionary is now populated
  Keys = SPR codes; values = SPR_record objects """

  return SPR_records


def store_SMRs(data, SPR_records):
  # Iterate over fSMRs; initialise and store SMR_record
  reader = csv.DictReader(data)
  for row in reader:
    SPR_id = row['SPR Code']
    if not SPR_id:
      continue

    # Create and store SMR record
    SMR = SMR_RECORD(row['Module code'],
                    row['Academic Year code'],
                    row['Period Slot code'],
                    row['First Attempt Mark'],
                    row['Agreed mark - DB field'],
                    row['Agreed grade'],
                    row['Current Attempt'],
                    row['Completed number'],
                    row['Process Status'])
    SPR_records[SPR_id].add_SMR_record(SMR)


def check_ready(SPR_records, routes):
  errors = []
  for id, record in SPR_records.items():
    record.ready_to_process(errors, routes)

  # run clear_errors to remove errored SPRs
  clear_errors(errors, SPR_records)

  return errors

def clear_errors(errors, SPR_records):
  err_SPRs = [error[0] for error in errors]
  SPR_keys = list(SPR_records.keys())

  """ using set to remove duplicates """
  for each in set(err_SPRs):
    if each in SPR_keys:
      SPR_records.pop(each)


def output_results(SPR_records, routes, errors, internal, period):

  # generate output_records dictionary, split by department
  """ organise data to minimise write actions """
  output_records = {}
  for id, record in SPR_records.items():
    key = record.department
    if key not in output_records:
      output_records[key] = []
    output_records[key].append(record)

  # determine internal or external reporting
  if internal:
    int_output(output_records, errors)
  elif errors:
    # not set to internal, but errors=True. Alert and int_output
    """ using simple error message to avoid extra dependency """
    popup("Errors detected: Defaulting to internal error report")
    int_output(output_records, errors)
  else: # no errors, EXTERNAL
    ext_output(output_records, routes, period)


def int_output(output_records, errors):

  # Retrieve output sheet
  """ gspread authorisation still in effect in client variable gc """
  ss = gc.open_by_key(output_id)

  # Export output_records

  s_list = [s.title for s in ss.worksheets()]
  template_id = '1009091024'
  ex_id = '236518903'

  for key, records in output_records.items():

    if not key in s_list:
      # copy template; rename; add name to s_list
      ss.duplicate_sheet(source_sheet_id = template_id,
                        new_sheet_name = key)
      s_list.append(key)
    s = ss.worksheet(key)

    write = []
    """ append headers to write list """
    headers = s.row_values(1)
    write.append(headers)

    for record in records:
      write.append([record.SPR_code,
                    record.forename,
                    record.surname,
                    record.status,
                    record.route,
                    record.ruleset,
                    record.award,
                    record.rank,
                    record.award_mark,
                    record.passed_credits,
                    record.cpm_credits,
                    record.marginal_credits,
                    record.method,
                    record.am_uplift,
                    record.covid_rank,
                    record.covid_am,
                    record.new_rank,
                    record.new_am
                    ])

    s.clear()
    s.append_rows(write)
    """ append finds last row + 1 automatically """

  # Export errors
  """ try to find 'Errors' tab. If it doesn't exist, create it from template """
  try:
    s_ex = ss.worksheet('Errors')
  except  gspread.exceptions.WorksheetNotFound:
    s_ex = ss.duplicate_sheet(source_sheet_id = ex_id, new_sheet_name = 'Errors')

  headers = s_ex.row_values(1)
  errors.insert(0, headers)

  s_ex.clear()
  s_ex.append_rows(errors)


def ext_output(output_records, routes, period):

 # Get folder IDs from programme sheet
  prog_ss = gc.open_by_key(prog_route_id)
  dept_sh = prog_ss.worksheet('Departments')
  dept_data = dept_sh.get_all_records()

  route_records = {}

  for key, records in output_records.items():
    for row in dept_data:
      if row.get('Department') == key:
        folder_id = row.get('Folder ID')
        break

    # create new sheet inside current folder_id
    ss_name = f"{key} Board Report: {period}"
    """ NB gspread version must be 3.5 or higher to use folder_id
        version is now enforced under dependencies """
    ss = gc.copy(board_template_id,
                folder_id=folder_id,
                title=ss_name,
                copy_comments=False)

    # split output_records by route (for this dept)
    route_records = {}
    for entry in records:
      route = entry.route
      if route not in route_records:
        route_records[route] = []
      route_records[route].append(entry)

    # sort route_records by award, rank and then award_mark
    award_order = {'MSCOL': 0, 'PGDIP': 1, 'PGCER': 2}
    rank_order = {'Distinction': 0, 'Merit': 1, 'Pass': 2}

    """ float('inf') = positive infinity. Used as default value """
    route_records[route] = sorted(route_records[route], key=lambda x: (
        award_order.get(x.award, float('inf')),
        rank_order.get(x.rank, float('inf')),
        -x.award_mark))

    """ for each route """
    # create a new tab using shortname attribute
    template_sheet = ss.worksheet('template')
    for code, route in route_records.items():
      sh_name = routes[code].route_shortname
      sh = template_sheet.duplicate(insert_sheet_index=0, new_sheet_name=sh_name)

      # create module header row and enter into row 1
      """ use the existing routes dictionary (route_modules) """
      route_modules = [module.module_code for module in routes[code].route_modules.values()]
      route_modules_blanks = [value for val in route_modules for value in (val, "", "")]
      sh.update(values=[route_modules_blanks], range_name='D1')

      # sort route
      sorted_route = sorted(route, key=lambda x: (
                            award_order.get(x.award, float('inf')),
                            rank_order.get(x.rank, float('inf')),
                            -x.award_mark))

      write = []
      # convert current route_records entry into write format and append
      """ for each SPR_RECORD in route """
      for record in sorted_route:
        record_write = [record.SPR_code, record.forename, record.surname]
        record_modules = [module.code for module in record.SMRs.values()]

        """ for each route module """
        for module in route_modules:
          matching_result = next(
            (result for result in record.SMRs.values() if result.code == module),
            None
          )

          if matching_result:
            oltp = f"{matching_result.year} {matching_result.ol}"
            record_write.extend([matching_result.first_attempt, matching_result.module_mark, oltp])
          else:
            record_write.extend(['-', '-', '-'])

        if record.method == 'covid':
          ex = 'covid_ex'
        else:
          ex = 'new_ex'

        if record.award == 'MSCOL':
          used_mod_str = 'All'
        else:
          used_modules = [module.code for module in record.SMRs.values() if getattr(module, ex) == False]
          used_mod_str = ' ,'.join(used_modules)

        record_write.extend([record.ruleset, record.award, record.rank, record.award_mark, record.method, record.am_uplift, used_mod_str])
        write.append(record_write)

      sh.append_rows(write)

    template_sheet.hide()

def popup(message):
    display(Javascript(f'alert("{message}");'))

def round_float(float_value, pattern):
  """ required as round() uses "bankers rounding" 😒 """
  decimal_value = Decimal(str(float_value))
  rounded_value = decimal_value.quantize(Decimal(pattern), rounding=ROUND_HALF_UP)
  return float(rounded_value)

---
# **Award Calculator** - *Run me!*
---
## Instructions

### Step 1 - Export

Once you have a list of SPR codes, you need to export .csv files from the student record system from the SPR and SMR screens:

* Generate a list of SPRs (including the /1, /2 etc.) using [linebeak](https://linebeak.com/)
* Go to the SPR and SMR screens and retrieve your SPR records. Export these in the required format (see below).

**Important:** when exporting, note the following strict requirements:
* Exports must be in CSV format
* Include headers (these are ticked by default)
* Do not edit default file encoding or order of data
* Export all fields, **EXCEPT**:
  * For SPRs ONLY: exclude the **SPR_NOTE** field (found at the very bottom of the list of fields)
  * For SMRs ONLY: exclude **AYR1_CODE.SMR** (the second "Academic Year Code" field)

Once exported, check the files to make sure they are correct.

<br>

### Step 2 - Run AC

Once you've exported SPR and SMR records as described above, **hit the play button below.**

* If you've run the programme once already, the play button may look like a number in square brackets (e.g. [5])

* If you're warned that "This notebook was not authored by Google, do you want to run anyway?", select "Run anyway".

The "Dependencies, definitions, and configuration" code above will now run. When run for the first time in a session, you will be asked to give access via your Google credentials.

Next - *in the space below this text* - you will then be asked to:
1. Upload your SPR file
2. Upload your SMR file
3. Choose your output type ("Internal checking" or "Board reporting")
  * If  you choose "Board reporting" but errors are detected, the programme will default to "Internal checking"
4. Define the OLTP academic period (e.g. 23/24 OL1) - *this is only needed if your output type is "Board reporting"*

Once you're happy, **hit the "Run AC" button!**

The programme will let you know it's started and will confirm when it's done.



In [11]:
# @title
# ==== AC User "interface" ====
""" run this code block (i.e. hit the play button) to get started! """

report_options = ['Internal checking', 'Board report']
report_type = widgets.Dropdown(options=report_options, value='Internal checking', description='Ouput format:')
report_period = widgets.Text(value='', description='OLTP:')
button = widgets.Button(description="Run AC 😎")

display(HTML(f'<span style="font-size:14px;">First upload <span style="color:#185abc; font-weight:bold;">SPRs</span> (student records) - .csv format please</span>'))
fupload_SPRs = files.upload()

display(HTML(f'<br><span style="font-size:14px;">\nNow upload <span style="color:#185abc; font-weight:bold;">SMRs</span> (module records) - .csv format please</span>'))
fupload_SMRs = files.upload()
print(f"\n")

display(report_type)
display(report_period)
display(HTML(f'<span style="font-size: 14px; font-style:italic;">Period is only required when reporting for Board.</span><br>'))
display(button)

button.on_click(run_ac)

Saving Example SPRs.CSV to Example SPRs (2).CSV


Saving Example SMRs.CSV to Example SMRs (2).CSV




Dropdown(description='Ouput format:', options=('Internal checking', 'Board report'), value='Internal checking'…

Text(value='', description='OLTP:')

Button(description='Run AC 😎', style=ButtonStyle())

AC started!
AC complete 😅
It tooks 5 seconds!
