In [31]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import PercentFormatter
import pandas as pd
import ipywidgets as widgets
from IPython.display import clear_output 
# autoreload modules when code is run
%load_ext autoreload
%autoreload 2

# APIs
from fredapi import Fred
from dstapi import DstApi

# plotting
import matplotlib.pyplot as plt
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
plt.rcParams.update({'axes.grid':True,'grid.color':'black','grid.alpha':'0.25','grid.linestyle':'--'})
plt.rcParams.update({'font.size': 14})

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Calculate Elasticities

In [32]:

import numpy as np

from scipy.optimize import root_scalar




# ---------- shared preferences (same for both consumers) ----------

PREF = {"b1": 0., "b2": 0.3,
"w": 0.5}   # subsistence + taste weight on good 1

TARGET_ELAST = 2.0

EPS = 1e-4




def ces_price(p1, p2, w, sigma):

    if sigma == 1.0:

        return (p1**w) * (p2** (1 - w))

    return (w * p1**(1 - sigma) + (1 - w) * p2**(1 - sigma)) ** (1 / (1 - sigma))



def demand(p1, p2, E, sigma, b1, b2, w):

    M = E - (p1*b1 + p2*b2)  # supernumerary spending

    if M <= 0:

        return b1, b2

    P = ces_price(p1, p2, w, sigma)

    c1 = b1 + w * (p1 / P) ** ( - sigma) * M

    c2 = b2 + (1-w) * (p2 / P) ** ( - sigma) * M

    return c1, c2




def agg_ratio(p1, p2, sigma, E_list, pref):

    C1 = C2 = 0.0

    for E in E_list:

        c1, c2 = demand(p1, p2, E, sigma, pref["b1"], pref["b2"], pref["w"])

        C1 += c1; C2 += c2

    return C1 / C2




def implied_elast(sigma, E_list, pref, p1=1.0, p2=1.0, eps=EPS):

    r0 = agg_ratio(p1, p2, sigma, E_list, pref)

    r1 = agg_ratio(p1*(1+eps), p2, sigma, E_list, pref)

    el = (np.log(r1) - np.log(r0)) / np.log(1+eps)  # d log(C1/C2) / d log(p1/p2)

    return np.abs(el)




def calibrate_sigma(E_list, pref, target=TARGET_ELAST, bracket=(0.2, 50.0)):

    f = lambda s: implied_elast(s, E_list, pref) - target

    sol = root_scalar(f, bracket=bracket, method="brentq")

    if not sol.converged:

        raise RuntimeError("Root finder did not converge.")

    return float(sol.root)




# ---- example: same preferences, different total spending ----

E_list = [0.7, 1.5, 2.0 , 2.5, 3]  # spending of consumer 1 and 2 (only heterogeneity)




sigma_star = calibrate_sigma(E_list, PREF, target=2.0)

print("sigma* =", sigma_star)

print("implied elasticity =", implied_elast(sigma_star,E_list, PREF))




def ces_price(p1, p2, w, sigma):

    if sigma == 1.0:

        return (p1**w) * (p2** (1 - w))

    return (w * p1**(1 - sigma) + (1 - w) * p2**(1 - sigma)) ** (1 / (1 - sigma))



def demand(p1, p2, E, sigma, b1, b2, w):

    M = E - (p1*b1 + p2*b2)  # supernumerary spending

    if M <= 0:

        return b1, b2

    P = ces_price(p1, p2, w, sigma)

    c1 = b1 + w * (p1 / P) ** ( - sigma) * M

    c2 = b2 + (1-w) * (p2 / P) ** ( - sigma) * M

    return c1, c2




def agg_ratio(p1, p2, sigma, E_list, pref):

    C1 = C2 = 0.0

    for E in E_list:

        c1, c2 = demand(p1, p2, E, sigma, pref["b1"], pref["b2"], pref["w"])

        C1 += c1; C2 += c2

    return C1 / C2




def implied_elast(sigma, E_list, pref, p1=1.0, p2=1.0, eps=EPS):

    r0 = agg_ratio(p1, p2, sigma, E_list, pref)

    r1 = agg_ratio(p1*(1+eps), p2, sigma, E_list, pref)

    el = (np.log(r1) - np.log(r0)) / np.log(1+eps)  # d log(C1/C2) / d log(p1/p2)

    return np.abs(el)




def calibrate_sigma(E_list, pref, target=TARGET_ELAST, bracket=(0.2, 50.0)):

    f = lambda s: implied_elast(s, E_list, pref) - target

    sol = root_scalar(f, bracket=bracket, method="brentq")

    if not sol.converged:

        raise RuntimeError("Root finder did not converge.")

    return float(sol.root)




# ---- example: same preferences, different total spending ----

E_list = [0.7, 1.5]  # spending of consumer 1 and 2 (only heterogeneity)




sigma_star = calibrate_sigma(E_list, PREF, target=2.0)

print("sigma* =", sigma_star)

print("implied elasticity =", implied_elast(sigma_star, E_list, PREF))


sigma* = 2.309251570970585
implied elasticity = 1.999999999999665
sigma* = 2.5454024865327813
implied elasticity = 2.0000000000013305


## Calculating income share

In [33]:
INC = DstApi('IFOR32')
INC.tablesummary(language='en') 

Table IFOR32: Avg. equivalised disposable Income in decile groups, by decile average, municipality and time
Last update: 2025-12-01T08:00:00


Unnamed: 0,variable name,# values,First value,First value label,Last value,Last value label,Time variable
0,DECILGEN,10,1DC,First decil,10DC,Tenth decil,False
1,KOMMUNEDK,99,000,All Denmark,851,Aalborg,False
2,Tid,38,1987,1987,2024,2024,True


In [34]:
INC.variable_levels('DECILGEN', language='en')


Unnamed: 0,id,text
0,1DC,First decil
1,2DC,Second decil
2,3DC,Third decil
3,4DC,Fourth decil
4,5DC,Fifth decil
5,6DC,Sixth decil
6,7DC,Seventh decil
7,8DC,Eigth decil
8,9DC,Ninth decil
9,10DC,Tenth decil


In [35]:
INC.variable_levels('KOMMUNEDK', language='en')

Unnamed: 0,id,text
0,000,All Denmark
1,101,Copenhagen
2,147,Frederiksberg
3,155,Dragør
4,185,Tårnby
...,...,...
94,773,Morsø
95,840,Rebild
96,787,Thisted
97,820,Vesthimmerlands


Gathering data on income shares by deciles from the IFOR32 table. The 'DECILGEN' variable represents the deciles of income distribution, and 'KOMMUNEDK' represents the municipality code. I will focus on the year 2024, which is the latest data point, and all of Denmark.



In [36]:
def load_IFOR32(TYPE, varname):
    params = {
        'table': 'IFOR32',
        'format': 'BULK', # semicolon separated file
        'lang': 'en',
        'variables': [
        {'code' : 'DECILGEN', 'values' : ['*']},
        {'code' : 'KOMMUNEDK', 'values' : [TYPE]},
        {'code': 'Tid', 'values': ['2024']}, # '*' is everything
        ]
    }
    # b. download
    df = DstApi('IFOR32').get_data(params=params)
    # c. set types and rename
    df['INDHOLD'] = df['INDHOLD'].astype(float)
    df = df.rename(columns={'INDHOLD': varname, 'TID': 'year'})
    df = df.set_index(['year', 'DECILGEN']).sort_index()
    return df

inc_df = load_IFOR32('000','income')

inc_df = inc_df.sort_values('income', ascending=True)
inc_df.head(10)



Unnamed: 0_level_0,Unnamed: 1_level_0,KOMMUNEDK,income
year,DECILGEN,Unnamed: 2_level_1,Unnamed: 3_level_1
2024,First decil,All Denmark,101181.0
2024,Second decil,All Denmark,182331.0
2024,Third decil,All Denmark,213817.0
2024,Fourth decil,All Denmark,245750.0
2024,Fifth decil,All Denmark,278084.0
2024,Sixth decil,All Denmark,311118.0
2024,Seventh decil,All Denmark,348230.0
2024,Eigth decil,All Denmark,395277.0
2024,Ninth decil,All Denmark,466658.0
2024,Tenth decil,All Denmark,852980.0


I will now convert the income shares from deciles to quintiles to match the format of the expenditure data. This involves aggregating the income shares of the first two deciles to form the first quintile, the next two deciles for the second quintile, and so on.

In [37]:
# Convert deciles to quintiles by summing every two deciles
inc_df['income']= inc_df['income'].rolling(window=2).mean()
inc_df.head(10)


Unnamed: 0_level_0,Unnamed: 1_level_0,KOMMUNEDK,income
year,DECILGEN,Unnamed: 2_level_1,Unnamed: 3_level_1
2024,First decil,All Denmark,
2024,Second decil,All Denmark,141756.0
2024,Third decil,All Denmark,198074.0
2024,Fourth decil,All Denmark,229783.5
2024,Fifth decil,All Denmark,261917.0
2024,Sixth decil,All Denmark,294601.0
2024,Seventh decil,All Denmark,329674.0
2024,Eigth decil,All Denmark,371753.5
2024,Ninth decil,All Denmark,430967.5
2024,Tenth decil,All Denmark,659819.0


In [38]:
inc_df = inc_df.iloc[1::2].reset_index()  # take every second row starting from the second row
inc_df = inc_df.rename(columns={'DECILGEN': 'quintile'})
inc_df.head(10)

Unnamed: 0,year,quintile,KOMMUNEDK,income
0,2024,Second decil,All Denmark,141756.0
1,2024,Fourth decil,All Denmark,229783.5
2,2024,Sixth decil,All Denmark,294601.0
3,2024,Eigth decil,All Denmark,371753.5
4,2024,Tenth decil,All Denmark,659819.0


In [None]:
inc_df = inc_df.drop(columns=['year','KOMMUNEDK'])
# Map decil names to quintile names
quintile_map = {
    'Second decil': 'Q1',
    'Fourth decil': 'Q2',
    'Sixth decil': 'Q3',
    'Eigth decil': 'Q4',
    'Tenth decil': 'Q5'
}

inc_df['quintile'] = inc_df['quintile'].replace(quintile_map)
inc_df.head(10)

Unnamed: 0,quintile,income
0,Q1,141756.0
1,Q2,229783.5
2,Q3,294601.0
3,Q4,371753.5
4,Q5,659819.0


In [41]:
q5_income = inc_df.loc[inc_df['quintile'] == 'Q5', 'income'].values[0]

# Create a new column for relative income
inc_df['relative_income'] = inc_df['income'] / q5_income

inc_df.head(10)

Unnamed: 0,quintile,income,relative_income
0,Q1,141756.0,0.214841
1,Q2,229783.5,0.348252
2,Q3,294601.0,0.446488
3,Q4,371753.5,0.563417
4,Q5,659819.0,1.0


In [40]:
def disp_incom(xi,w,l):
    return xi * w * l

w=2.8677
xi1 = 1
xi2 = 0.56
xi3 = 0.45
xi4 = 0.35
xi5 = 0.2
phi = 2
varphi = 1
sigma = 2.0
pe= 1
omega = 0.93
mu= 1

c1 = 2.46963
c2 = 1.35839
c3 = 1.08065
c4 = 0.828251
c5 = 0.450054

p=(omega + pe*(1-omega)**(1-sigma))**(1/(1-sigma))

l1 = (w * xi1)/(mu * p*c1**varphi)
l2 = (w * xi2)/(mu * p*c2**varphi)
l3 = (w * xi3)/(mu * p*c3**varphi)
l4 = (w * xi4)/(mu * p*c4**varphi)
l5 = (w * xi5)/(mu * p*c5**varphi)

worker_1 = disp_incom(xi1,w, l1)  
worker_2 = disp_incom(xi2,w, l2)  
worker_3 = disp_incom(xi3,w, l3)  
worker_4 = disp_incom(xi4,w, l4)  
worker_5 = disp_incom(xi5,w, l5)

print(worker_1/worker_1)
print(worker_2/worker_1)
print(worker_3/worker_1)
print(worker_4/worker_1)
print(worker_5/worker_1)

1.0
0.5701425717209343
0.46277710174432046
0.36526327767790195
0.2194963271074138
