In [1]:
import pandas as pd
import numpy as np
import util as util

In [2]:
initial_setup = {
    'n_villagers' : 3,
    'resources' : {
        'food' : 200,
        'wood' : 200,
        'gold' : 100,
        'stone' : 200
    }
}

In [3]:
class CONSTANTS:
    occupations = ['lumberjack', 'farmer' , 'fisherman', 'hunter', 'shepherd', 'forager', 'gold miner', 'stone miner', 'builder', 'idle', 'walking']
    resource_names =  ['wood', 'food' , 'gold', 'stone']

    resources_2_occupations_map = {
        'wood' : ['lumberjack'],
        'food' : ['farmer', 'fisherman', 'hunter', 'shepherd', 'forager'],
        'gold' : ['gold miner'],
        'stone' : ['stone miner']
    }

In [51]:
CONSTANTS = CONSTANTS

class generic_economy:

    def __init__(self, initial_setup):

        # Get basic data
        gather_rate_base_csv = pd.read_csv('data/gathering_rate.csv')
        self.gather_rate_base_dc = dict(zip(gather_rate_base_csv['villager_type'], gather_rate_base_csv['rate']))
        self.unit_stats = pd.read_csv('data/unit_statistics.csv', index_col = 0)
        self.building_stats = pd.read_csv('data/building_statistics.csv', index_col = 0)
        self.tech_stats = pd.read_csv('data/technology_statistics.csv', index_col = 0)

        # Initialize inputs
        self.initial_setup = initial_setup
        
        # Initialize tables that log economy dynamics
        self.villager_distribution = self._create_villager_distribution_df(initial_setup['n_villagers'])
        self._gather_rate_dynamics = self._create_gather_rate_dynamics(self.gather_rate_base_dc)
        self.resources = self._create_resources_table(initial_setup)
        self.expenditure = self._create_expenditure_table()

        # (TO DO) This table logs the actions and notes you take
        self.events_table = None

    # Functions that create economy dynamics log tables
    def _create_villager_distribution_df(self, vil_count_0 = 3):
        '''
        Create dataframe to track what each villager is doing at each second
        '''
        # Create a villager distribution
        occupations = CONSTANTS.occupations
        df_vil_dist = pd.DataFrame(columns = ['time']+occupations)
        df_vil_dist['time'] = range(0,60*60+1)
        df_vil_dist.iloc[:,1:] = 0
        df_vil_dist.loc[:,'idle'] = vil_count_0
        return df_vil_dist

    def _create_resources_table(self, initial_setup = initial_setup):
        '''
        Create dataframe to track what each villager is doing at each second
        '''
        resources = CONSTANTS.resource_names
        resources_0 = initial_setup['resources']
        df_resources = pd.DataFrame(columns = ['time']+resources)
        # populate table
        df_resources['time'] = range(0,60*60+1)
        df_resources.loc[:, resources] = [resources_0[res] for res in resources]

        return df_resources

    def _create_gather_rate_dynamics(self, gather_rate_base: dict):
        occupations = CONSTANTS.occupations
        df_out = pd.DataFrame(columns = ['time']+occupations)
        df_out['time'] = range(0,60*60+1)
        df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]

        return df_out
    
    def _create_expenditure_table(self):
        '''
        Create table that logs expenditure at a given point in time
        '''
        # Create table
        df_expenditure = pd.DataFrame(columns = ['time']+CONSTANTS.resource_names)
        df_expenditure['time'] = range(0,60*60+1)
        df_expenditure.loc[:,CONSTANTS.resource_names] = 0

        return df_expenditure

    ### Functions that update economy dynamics log tables
    def _update_resources_table(self):
        '''
        Update resources table based on villager distribution and gather rate dynamics
        '''
        
        gather_rate_df = self._gather_rate_dynamics
        villager_distribution_df = self.villager_distribution

        # Get intitial resources
        resources_0 = initial_setup['resources']

        # How much we collect from each occupation at any point in time
        instant_collection_df = gather_rate_df.loc[:,CONSTANTS.occupations].values * villager_distribution_df.loc[:,CONSTANTS.occupations].values
        instant_collection_df = pd.DataFrame(instant_collection_df, columns = CONSTANTS.occupations)

        # How much RESOURCES we collect at any point in time
        # Create an empty DataFrame with 3601 rows
        instant_resources_df = pd.DataFrame(np.zeros((len(instant_collection_df), len(CONSTANTS.resource_names))), columns=CONSTANTS.resource_names)

        for res in CONSTANTS.resource_names:
            # row 0 has the initial resources
            instant_resources_df.loc[0,res] = resources_0[res]
            instant_resources_df.loc[1:,res] = instant_collection_df.loc[1:,CONSTANTS.resources_2_occupations_map[res]].sum(axis = 1)

        # How much resourcers we spend at any point in time
        instant_expenditure_df = self.expenditure.loc[:,CONSTANTS.resource_names]

        # How much resources we have at any point in time
        instant_resources_net_df = instant_resources_df - instant_expenditure_df

        # Update Resources 
        self.resources.loc[:,CONSTANTS.resource_names] = instant_resources_net_df.cumsum(axis = 0)

    ### Actions that you can take

    def _create_unit_at_time(self, time, unit_name):
        '''
        Create a unit at a given time and occupation
        - time: time in seconds
        - unit name from standardized unit names
     '''
        # standardize unit name
        unit_name = unit_name.lower()

        # error check if unit name not in unit stats
        if unit_name not in self.unit_stats.index:
            print(f'Error: unit name {unit_name} not in unit stats table')
            return None

        # get cost of the unit
        unit_cost = self.unit_stats.loc[unit_name, CONSTANTS.resource_names]

        # Update expenditure table
        for res in CONSTANTS.resource_names:
            self.expenditure.loc[time, res] -= unit_cost[res]

        # Update resource table
        self._update_resources_table()

        # Update notes
        # COMPLETE

    def _create_villager_at_time(self, time, to_oc = 'idle'):
        '''
        Create a villager at a given time and occupation
        - time: time in seconds
        - to_oc: occupation of new villager created
        '''
        # Update villager distribution taking into account training time
        training_time = self.unit_stats.loc['villager', 'training time']
        self.villager_distribution.loc[(time+training_time):, to_oc] += 1

        # The rest is the same as creating a generic unit
        self._create_unit_at_time(time, 'villager')

        # Update notes
        # COMPLETE

    def _build_at_time(self, time, building_name):
        '''
        Build a building at a given time
        - time: time in seconds
        - unit name from standardized unit names
     '''

        # standardize building name
        building_name = building_name.lower()

        # error check if unit name not in unit stats
        if building_name not in self.building_stats.index:
            print(f'Error: unit name {building_name} not in building stats table')
            return None

        # get cost of the building
        cost = self.building_stats.loc[building_name.lower(), CONSTANTS.resource_names]

        # Update expenditure table
        for res in CONSTANTS.resource_names:
            self.expenditure.loc[time, res] -= cost[res]

        # Update resource table
        self._update_resources_table()

        # Update notes
        # COMPLETE

    def _move_villager_at_time(self, time, from_oc, to_oc):
        '''
        Create a villager at a given time and occupation
        - time: time in seconds
        - occupation: occupation of new villager created
        '''
        # Update villager distribution
        self.villager_distribution.loc[time:, to_oc] += 1
        self.villager_distribution.loc[time:, from_oc] -= 1

        # Update gather rate dynamics
        self._update_resources_table()

        # Update notes
        # COMPLETE

In [56]:
## Initialize economy class
economy = generic_economy(initial_setup = initial_setup)

time = 0 
unit_name = 'archer'
building_name = 'town center'

# economy.unit_stats.loc['villager', 'training time']
# .values[0]

# row 0 has the initial resources
# How much RESOURCES we collect at any point in time
# instant_resources_df = pd.DataFrame(columns = CONSTANTS.resource_names)

# res = 'wood'
# resources_0 = initial_setup['resources']
# instant_resources_df.loc[0,res] = resources_0[res]
# instant_resources_df.loc[1:,res]

# instant_resources_df.loc[1:,res] = instant_collection_df.loc[1:,CONSTANTS.resources_2_occupations_map[res]].sum(axis = 1)

# economy._create_unit_at_time(time, unit_name)
# economy._create_villager_at_time(0, to_oc = 'farmer')

economy.resources.head(30)
# economy.villager_distribution.head(30)






  df_vil_dist.iloc[:,1:] = 0
  df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]
  df_resources.loc[:, resources] = [resources_0[res] for res in resources]
  df_expenditure.loc[:,CONSTANTS.resource_names] = 0


Unnamed: 0,time,wood,food,gold,stone
0,0,200,200,100,200
1,1,200,200,100,200
2,2,200,200,100,200
3,3,200,200,100,200
4,4,200,200,100,200
5,5,200,200,100,200
6,6,200,200,100,200
7,7,200,200,100,200
8,8,200,200,100,200
9,9,200,200,100,200


In [6]:
import unittest

class TestGenericEconomy(unittest.TestCase):

    def setUp(self):
        self.economy = generic_economy(initial_setup)

    def test_create_villager_distribution_df(self):
        vil_dist_df = self.economy._create_villager_distribution_df()
        self.assertIsInstance(vil_dist_df, pd.DataFrame)
        self.assertEqual(vil_dist_df.shape[0], 60*60+1)
        self.assertEqual(vil_dist_df.iloc[0]['idle'], initial_setup['n_villagers'])

    def test_create_resources_table(self):
        resources_df = self.economy._create_resources_table(initial_setup)
        self.assertIsInstance(resources_df, pd.DataFrame)
        self.assertEqual(resources_df.shape[0], 60*60+1)

        for res in CONSTANTS.resource_names:
            self.assertEqual(resources_df.iloc[0][res], initial_setup['resources'][res])

    def test_create_gather_rate_dynamics(self):
        gather_rate_df = self.economy._create_gather_rate_dynamics(self.economy.gather_rate_base_dc)
        self.assertIsInstance(gather_rate_df, pd.DataFrame)
        self.assertEqual(gather_rate_df.shape[0], 60*60+1)

    def test_create_expenditure_table(self):
        expenditure_df = self.economy._create_expenditure_table()
        self.assertIsInstance(expenditure_df, pd.DataFrame)
        self.assertEqual(expenditure_df.shape[0], 60*60+1)

    def test_create_unit_at_time(self):
        time = 10
        unit_name = 'Archer'
        self.economy._create_unit_at_time(time, unit_name)
        # You can add more assertions to check if the unit was created successfully

    def test_create_villager_at_time(self):
        time = 10
        to_oc = 'lumberjack'
        self.economy._create_villager_at_time(time, to_oc)
        # You can add more assertions to check if the villager was created successfully

    def test_move_villager_at_time(self):
        time = 10
        from_oc = 'idle'
        to_oc = 'lumberjack'
        self.economy._move_villager_at_time(time, from_oc, to_oc)
        # You can add more assertions to check if the villager was moved successfully

if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestGenericEconomy)
    unittest.TextTestRunner(verbosity=2).run(suite)

  df_vil_dist.iloc[:,1:] = 0
  df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]
  df_resources.loc[:, resources] = [resources_0[res] for res in resources]
  df_expenditure.loc[:,CONSTANTS.resource_names] = 0
  df_expenditure.loc[:,CONSTANTS.resource_names] = 0
ok
  df_vil_dist.iloc[:,1:] = 0
  df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]
  df_resources.loc[:, resources] = [resources_0[res] for res in resources]
  df_expenditure.loc[:,CONSTANTS.resource_names] = 0
  df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]
ok
  df_vil_dist.iloc[:,1:] = 0
  df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]
  df_resources.loc[:, resources] = [resources_0[res] for res in resources]
  df_expenditure.loc[:,CONSTANTS.resource_names] = 0
  df_resources.loc[:, resources] = [resources_0[res] for res in resources]
ok


In [7]:
import pytest
import ipytest

# Configure ipytest to work with the current notebook
ipytest.autoconfig()

@pytest.fixture
def economy():
    return generic_economy(initial_setup)

def test_create_villager_distribution_df(economy):
    vil_dist_df = economy._create_villager_distribution_df()
    assert isinstance(vil_dist_df, pd.DataFrame)
    assert vil_dist_df.shape[0] == 60 * 60 + 1
    assert vil_dist_df.iloc[0]['idle'] == initial_setup['n_villagers']

def test_create_resources_table(economy):
    resources_df = economy._create_resources_table(initial_setup)
    assert isinstance(resources_df, pd.DataFrame)
    assert resources_df.shape[0] == 60 * 60 + 1

    for res in CONSTANTS.resource_names:
        assert resources_df.iloc[0][res] == initial_setup['resources'][res]

def test_create_gather_rate_dynamics(economy):
    gather_rate_df = economy._create_gather_rate_dynamics(economy.gather_rate_base_dc)
    assert isinstance(gather_rate_df, pd.DataFrame)
    assert gather_rate_df.shape[0] == 60 * 60 + 1

def test_create_expenditure_table(economy):
    expenditure_df = economy._create_expenditure_table()
    assert isinstance(expenditure_df, pd.DataFrame)
    assert expenditure_df.shape[0] == 60 * 60 + 1

def test_create_unit_at_time(economy):
    time = 10
    unit_name = 'Archer'
    economy._create_unit_at_time(time, unit_name)
    # You can add more assertions to check if the unit was created successfully

def test_create_villager_at_time(economy):
    time = 10
    to_oc = 'lumberjack'
    economy._create_villager_at_time(time, to_oc)
    # You can add more assertions to check if the villager was created successfully

def test_move_villager_at_time(economy):
    time = 10
    from_oc = 'idle'
    to_oc = 'lumberjack'
    economy._move_villager_at_time(time, from_oc, to_oc)
    # You can add more assertions to check if the villager was moved successfully

# Run the tests
ipytest.run()




[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[33m                                                                               [100%][0m
    df_vil_dist.iloc[:,1:] = 0

    df_out.loc[:, occupations] = [gather_rate_base.get(occupation,0) for occupation in occupations]

    df_resources.loc[:, resources] = [resources_0[res] for res in resources]

    df_expenditure.loc[:,CONSTANTS.resource_names] = 0

    self.resources.loc[:,CONSTANTS.resource_names] = instant_resources_net_df.cumsum(axis = 0)



<ExitCode.OK: 0>