In [3]:
import pandas as pd
import os, sys, io
from io import StringIO
import pkg_resources
import ast
from datetime import datetime

<h1>Link Code to be Tested</h1>

In [4]:
import devanalyst.simulation.businessObjects as bo
from devanalyst.simulation.businessObjects import UserStoriesRepo, TicketsRepo, ScrumTeamsRepo
import devanalyst.simulation.generateTimecards as timecard
from devanalyst.simulation.generateTimecards import IdCounter

<h1>Common Context Across Tests</h1>

<h2>Immutable Globals</h2>

In [5]:
#globals used in this test. After they are initialized in 'loadTestResources()', they don't change.
PM_DF = None
DEV_DF = None

<h2>Mutable Globals</h2>

In [6]:
# Global variable used to have a single counter for user stories id as they get generated in multiple calls. 
# Mutable state
NEXT_USER_STORY_ID = None


# Dictioaries of test results. Each key is the string name of a test and the value is the output of that test,
# expected or actual
EXPECTED = {}
ACTUAL = {}

<h2>Test Utilities</h2>

In [1]:
def initTestData(developers_df, productManagers_df, releaseDuration, sprintDuration, modelsConfig):
# Test the code for creating user stories by constructing them and arranging them in a dataframe to display and evidence
# visually that the code works as it should

    global NEXT_USER_STORY_ID
    storiesRepo = UserStoriesRepo([])
    teamsRepo = ScrumTeamsRepo([])
    ticketsRepo = TicketsRepo()
    NEXT_USER_STORY_ID = IdCounter()
    #cols = ['User Story Id','Scrum Team', 'Product Manager', 'Developer', 'Estimate',]
    userStoryId_vals = []
    scrumTeam_vals = []
    developer_vals = []
    productManager_vals = []
    estimate_vals = []
    
    teams_df = bo.createTeamsDF(developers_df, productManagers_df)
    
    for team in teams_df['Scrum Team']:
        stories, backlog = timecard.createUserStoryBacklog(team, releaseDuration, sprintDuration, 
                                                           NEXT_USER_STORY_ID, modelsConfig)
        storiesRepo.stories.extend(stories)
        teamsRepo.teams.append(team)
        for story in stories:
            scrumTeam_vals.append(team.teamId)
            userStoryId_vals.append(story.userStoryId)
            developer_vals.append(story.developer)
            productManager_vals.append(story.productManager)
            estimate_vals.append(story.originalEstimate)
        team.backlog = backlog
    stories_dict = {'User Story Id': userStoryId_vals, 'Scrum Team': scrumTeam_vals, 'Product Manager':productManager_vals, \
                'Developer': developer_vals, 'Estimate': estimate_vals}
    return teams_df, pd.DataFrame(stories_dict), teamsRepo, storiesRepo, ticketsRepo

In [8]:
def loadTestResources():
    global PM_DF
    global DEV_DF
    resource_path = '/'.join(('Resources', 'Simulation', 'Team.xlsx'))
    resource_package = 'devanalyst'
    fullpath = pkg_resources.resource_filename(resource_package, resource_path)
    PM_DF = pd.read_excel(fullpath, 'PMs')
    DEV_DF = pd.read_excel(fullpath, 'Dev')

loadTestResources()

In [9]:
def createExpectedOutput(expected_df, testName):
    resource_path = '/'.join(('Resources', 'Tests','Simulation', testName + '_EXPECTED.csv'))
    resource_package = 'devanalyst'
    fullpath = pkg_resources.resource_filename(resource_package, resource_path)
    return expected_df.to_csv(fullpath)

def loadExpectedOutput(testName, literal_cols=[]):
    resource_path = '/'.join(('Resources', 'Tests','Simulation', testName + '_EXPECTED.csv'))
    resource_package = 'devanalyst'
    fullpath = pkg_resources.resource_filename(resource_package, resource_path)
    df = pd.read_csv(fullpath)
    
    spurious_col = 'Unnamed: 0'
    if spurious_col in df.columns:
        df = df.drop([spurious_col], axis = 'columns') # Remove spurious index column, if any
    
    # Now for each column in list_cols, we have to treat it like it is a list so replace the contents of that
    # column with a parsing of the string value to to a list value
    for col in literal_cols:
        df[col] = df[col].apply(lambda x: ast.literal_eval(str(x)))
    return df

In [10]:
def matches(actual, expected):
    if isinstance(actual, str):
        return actual==expected
    else:
        return actual.equals(expected)

In [11]:
def mismatchButOK(old_df, new_df, new_cols):
# Verifies that two dataframes are different only with regards to new columns added to the newest dataframe
    df = new_df.copy()
    return (not old_df.equals(new_df) and old_df.equals(df.drop(columns = new_cols)))

In [2]:
def find_mismatches(df1, df2):
# For two dataframes with equivalent indices and columns, it returns the list of row indexes in for rows that
# are different between df1 and df2
    bad_idx = []
    for idx, rows in df1.iterrows():
        if not df1.loc[idx].equals(df2.loc[idx]):
            bad_idx.append(idx)
    return bad_idx

In [12]:
def testOK(testname):
    return matches(ACTUAL[testname], EXPECTED[testname])

In [13]:
class ExpectedOutputCleaner:
    # Class containing utilities to clean the expected output when it is loaded from a CSV file. The process of
    # saving a dataframe to CSV and then loading it results in some cosmetic changes that would cause spurious
    # test failures if they are not cleaned.
    
    def __init__(self):
        return
    
    # Float-valued fields may have many decimal places that get truncated when saving or loading a CSV file.
    # To avoid spurious errors, round all values to just a few decimal places that are not part of the truncation
    def cleanRoundingNoise(sensitive_fields, testlets, test_EXPECTED, test_ACTUAL):
        for field in sensitive_fields:
            for testlet in testlets:
                rounded = test_EXPECTED[testlet][field].apply(lambda x: round(x, 6)) # Round to 6 decimal places
                test_EXPECTED[testlet][field] = rounded
                rounded = test_ACTUAL[testlet][field].apply(lambda x: round(x, 6)) # Round to 6 decimal places
                test_ACTUAL[testlet][field] = rounded
                
    # Dates are loaded as strings, not pd.Timestamps, so to avoid spurious mismatches between ACTUAL (which represents dates
    # as pd.Timestamps) and EXPECTED, convert the EXPECTED dates into pd.Timestamps
    def standardizeDates(date_fields, testlets, test_EXPECTED):
        for field in date_fields:
            for testlet in testlets:        
                d = test_EXPECTED[testlet][field]
                test_EXPECTED[testlet][field] = d.apply(lambda x: pd.Timestamp(datetime.strptime(x, '%Y-%m-%d')))
                
    def alignColumns(testlets, test_EXPECTED, test_ACTUAL):
        for testlet in testlets:
            test_EXPECTED[testlet].columns  = test_ACTUAL[testlet].columns
            
    def alignIndex(testlets, test_EXPECTED, test_ACTUAL):
        for testlet in testlets:
            test_EXPECTED[testlet].index  = test_ACTUAL[testlet].index
            
    # Used for columns where some values are non-negative integers, and others are strings. If the non-negative 
    # integers are represented as strings, this method will replace them with the corresponding non-negative integer
    # value. Other values in the column are left alone
    def destringify(fields, testlets, test_EXPECTED):
        for field in fields:
            for testlet in testlets:
                original = test_EXPECTED[testlet][field]
                test_EXPECTED[testlet][field] = original.apply(lambda x: x if not str.isdigit(x) else int(x))

In [1]:
# Detects errors in a dataframe that is supposed to have a tail of 0 values for the 'finishing_line_column' when
# grouped by the 'grouping_column'
#
# For example, a timecard that is grouped by user story id should not have a tail of 0's for the effort spent:
# if timecard generation is working properly, once the user story is completed then there should not be additional
# spurious entries for the user story, all with 0 for effort spent (since no more effort is required, since the
# user story is completed). This is useful to detect anomalies in timecards, as may happen if rounding errors make
# it appear that there is an epsilon amount of work still left, so the user story keeps re-appearing in the timecard
# entries, but with 0 time entered against it. If that happens, it is a bug as timecard generation should not leave
# spurious epsilon-sized work for the future. In those cases, this method is useful to debug because it produces a
# a dataframe with a 'TAINTED' column which is TRUE for rows that have such spurious 0's.
def taintFailuresToStop(original_df, grouping_column, finishing_line_column):
    tainted_df = original_df.groupby(grouping_column).apply(_detect, finishing_line_column=finishing_line_column)
    return tainted_df

# Helper method used in taintFailuresToStop
def _detect(df, finishing_line_column):
    df['TAINTED'] = False
    df['PRIOR'] = None
    df['ACCUM'] = 0.0
    prior = None
    accum = 0.0
    last_good_idx = -1
    for idx in df.index:
        extra = df.loc[idx, finishing_line_column]
        if extra > 0.0:
            last_good_idx = idx

    for idx in df.index:
        df.loc[idx, 'PRIOR'] = prior
        if idx > last_good_idx:
            df.loc[idx, 'TAINTED'] = True
        prior = idx
    return df