In [1]:
import pandas as pd
import requests
import urllib.request, json 
import re
import io
import time
import openpyxl
from datetime import datetime,date

# if you want to make notebooks have really long outputs
# pd.options.display.max_rows=4000



In [2]:
#save timestamp to text file
with open('timestamp.txt', 'w') as f:
    f.write(datetime.now().strftime("%Y-%b-%d"))

In [None]:
# If you need to install something in jupyter notebokes, you need these commands
# import sys
# !{sys.executable} -m pip install pandas

In [None]:
# if you need to create the metadata file from the starting file, (drops the ID_NAME, dedupes CS_CODES etc)

# Read the Excel file
df_meta = pd.read_excel('2025_starting_file_test_data.xlsx')

# Drop columns after "AVERAGE_PRICE"
if 'AVERAGE_PRICE' in df_meta.columns:
    last_col_idx = df_meta.columns.get_loc('AVERAGE_PRICE')
    df_meta = df_meta.iloc[:, :last_col_idx + 1]

# metadata for datadownload
df_meta.to_csv('2025_all_items_metadata.csv', index=False)

# Drop the "ID name" column
df_meta = df_meta.drop(columns=['ID_NAME'])

# Rearrange so CONSUMPTION_SEGMENT_CODE is the first column
if 'CONSUMPTION_SEGMENT_CODE' in df_meta.columns:
    cols = ['CONSUMPTION_SEGMENT_CODE'] + [col for col in df_meta.columns if col != 'CONSUMPTION_SEGMENT_CODE']
    df_meta = df_meta[cols]

# Dedupe by "CONSUMPTION_SEGMENT_CODE"
df_meta = df_meta.drop_duplicates(subset=['CONSUMPTION_SEGMENT_CODE'])

# Save as "2025_metadata.csv"
df_meta.to_csv('2025_metadata.csv', index=False)


### if you need to create the unchained file from the starting file which keeps just the CS and unchained values

# Read the CSV file
df_start = pd.read_csv('2025_starting_file_test_data.csv')

# Select columns: CONSUMPTION_SEGMENT_CODE and columns from '202101' onwards
cols_to_keep = ['CONSUMPTION_SEGMENT_CODE'] + [col for col in df_start.columns if re.match(r'^2021\d{2}$', col) or re.match(r'^20\d{4}$', col) and col >= '202101']
unchained2025 = df_start[cols_to_keep]

# Dedupe by "CONSUMPTION_SEGMENT_CODE"
unchained2025 = unchained2025.drop_duplicates(subset=['CONSUMPTION_SEGMENT_CODE'])

# Remove hyphens from all values in the DataFrame
unchained2025 = unchained2025.replace('-', '', regex=True)

# Save as "2025_unchained.csv"
unchained2025.to_csv('2025_unchained.csv', index=False)

In [3]:
# As we only track prices for 5 years, you need to set that time point as a reference point.
# Also as we are using the last January as a reference point, we need to set that as well.

# set average price reference month
avgpriceRefMonth=pd.Timestamp('2025-01-01 00:00:00')

# starting reference point
startref=pd.Timestamp('2020-01-01 00:00:00')

In [4]:
#read in metadata
meta = pd.read_csv('./2025_metadata.csv',parse_dates=['ID_START'],date_format="%Y%m")
meta["CONSUMPTION_SEGMENT_CODE"] = meta["CONSUMPTION_SEGMENT_CODE"].astype(str)
meta = meta.set_index("CONSUMPTION_SEGMENT_CODE")

In [5]:
# define a function to split a string at a certain occurance of a separator

# https://stackoverflow.com/questions/36300158/split-text-after-the-second-occurrence-of-character
def split(strng, sep, pos):
    strng = strng.split(sep)
    return sep.join(strng[:pos]), sep.join(strng[pos:])

In [6]:
# read in unchained csv
unchained = pd.read_csv('2025_unchained.csv')
unchained["CONSUMPTION_SEGMENT_CODE"] = unchained["CONSUMPTION_SEGMENT_CODE"].astype(str)
unchained = unchained.set_index("CONSUMPTION_SEGMENT_CODE")

#find the last month in the unchained file
# latestmonth=datetime.strptime(unchained.columns[-1],"%Y-%m-%d %H:%M:%S")
# latestmonth=datetime.strptime(unchained.columns[-1],"%Y-%m-%d")
latestmonth=datetime.strptime(unchained.columns[-1],"%Y%m")
# and print it out
latestmonth

datetime.datetime(2025, 2, 1, 0, 0)

In [None]:
# Excel changes date formats into something funky. Use this if you need to convert it to python date format
# un = pd.read_csv('unchained.csv', index_col=0)
# columns = {}
# for col in un.columns:
#     try:
#         columns[col] = datetime.strptime(str(col), "%d/%m/%Y").date()
#         # columns[col] = datetime.strptime(str(col), "%d/%m/%Y %H:%M").date()
#         # columns[col] = datetime.strptime(str(col), "%Y-%m-%d %H:%M:%S").date()
#         # columns[col] = datetime.strptime(str(col), "%Y%m").date()
#     except ValueError:
#         pass
# un.rename(columns=columns, inplace=True)
# un.to_csv('unchained.csv',date_format='%Y-%m-%d')

In [None]:
# to merge the next month test data

# get a test file 
df = pd.read_csv('march_2025_example_test_data.csv')
df['CS_ID'] = df['CS_ID'].astype(str)

index_date=df.iloc[0,0]

# parse columns as dates in unchained
# https://stackoverflow.com/questions/42472418/parse-file-headers-as-date-objects-in-python-pandas
columns = {}
for col in unchained.columns:
    try:
        columns[col] = datetime.strptime(str(col), "%Y%m")
    except ValueError:
        pass
    
unchained.rename(columns=columns, inplace=True)

un = unchained.merge(
    df[['CS_ID', 'CPI_INDEX']].rename(columns={"CPI_INDEX": datetime.strptime(str(index_date), "%Y%m")}),
    left_on="CONSUMPTION_SEGMENT_CODE",
    right_on='CS_ID',
    how='inner'
)
un = un.rename(columns={"CS_ID":"CONSUMPTION_SEGMENT_CODE"})
    
#if last date is Jan, then chain it to december
if(un.columns[-1].month == 1):
    print('chaining jan')
    jancol=un.columns[-1]
    prevdec=un.columns[-2]
    for index,value in un.iloc[:,-1].items():
        un.at[index,jancol]=un.loc[index,prevdec]*value/100

un.set_index("CONSUMPTION_SEGMENT_CODE",inplace=True)

In [None]:
# first get the /data.json from the cpi items and prices page from the ONS website
with requests.Session() as s:
    r=s.get("https://www.ons.gov.uk/economy/inflationandpriceindices/datasets/consumerpriceindicescpiandretailpricesindexrpiitemindicesandpricequotes/data",headers={'User-Agent': 'Mozilla/5.0'})
    data = r.json()
    datasets = data['datasets']

#go through the dataset and find the first one which doesn't contain the word framework, glossary or /pricequotes. The url includes pricesquotes so that slash is important. Save the index as the variable match  
for i,dataset in enumerate(datasets):
    match = i
    if('framework' not in dataset['uri'] and 'glossary' not in dataset['uri'] and '/pricequotes' not in dataset['uri']):
        break
    
#get the uri of the items dataset we want
items = data['datasets'][match]['uri']
print('dataset='+items)

#get the month and year from the uri
date=split(items,'itemindices',2)[1]
print('the date from url:'+date)

#parse it as a date
itemmonth=datetime.strptime(date,"%B%Y")

# check date to see if you need to download a file
if(itemmonth!=latestmonth):
    print('month from indices is different to latest month in unchained csv')
    
    with requests.Session() as s:
        r=s.get("https://www.ons.gov.uk"+items+"/data",headers={'User-Agent': 'Mozilla/5.0'})
        itemspage = r.json()
        csv = itemspage['downloads'][0]['file']
    
    # get the csv of the latest indices
    with requests.Session() as s:
        download = s.get("https://www.ons.gov.uk/file?uri="+items+"/"+csv,headers={'User-Agent': 'Mozilla/5.0'})
        df=pd.read_csv(io.StringIO(download.content.decode('utf-8')))
    
    #get the index date which is the first cell
    index_date=df.iloc[0,0]
    
    # parse columns as dates in unchained
    # https://stackoverflow.com/questions/42472418/parse-file-headers-as-date-objects-in-python-pandas
    columns = {}
    for col in unchained.columns:
        try:
            columns[col] = datetime.strptime(str(col), "%Y-%m-%d")
        except ValueError:
            pass
    unchained.rename(columns=columns, inplace=True)
    
    #join it onto existing csv
    un=unchained.merge(df[['ITEM_ID','ALL_GM_INDEX']].rename(columns={"ALL_GM_INDEX": datetime.strptime(str(index_date),"%Y%m")}),on='ITEM_ID',how='left')
    
    #if last date is Jan, then chain it to december
    if(un.columns[-1].month == 1):
        print('chaining jan')
        jancol=un.columns[-1]
        prevdec=un.columns[-2]
        for index,value in un.iloc[:,-1].items():
            un.at[index,jancol]=un.loc[index,prevdec]*value/100
    
    un.set_index("ITEM_ID",inplace=True)

else:
    print('Nothing to update')  
    # parse columns as dates in unchained
    # https://stackoverflow.com/questions/42472418/parse-file-headers-as-date-objects-in-python-pandas
    columns = {}
    for col in unchained.columns:
        try:
            columns[col] = datetime.strptime(str(col), "%Y-%m-%d")
            # columns[col] = datetime.strptime(str(col), "%Y-%m-%d %H:%M:%S")
        except ValueError:
            pass
    unchained.rename(columns=columns, inplace=True)
    un=unchained
    un.set_index("ITEM_ID",inplace=True)

In [9]:
print(un.index.to_list())

['520130', '520131', '520137', '520140', '510439', '510434', '520109', '510429', '510432', '510324', '510328', '510330', '510336', '510340', '510344', '510347', '510348', '510349', '510350', '510351', '510352', '510436', '510437', '510438', '510353', '510528', '510104', '510106', '510108', '510109', '510113', '510117', '510118', '510120', '510128', '510130', '510131', '510132', '510133', '510134', '510413', '510419', '510433', '510501', '510515', '510516', '510535', '510534', '510206', '510208', '510212', '510215', '510219', '510223', '510233', '510235', '510236', '510237', '510245', '510248', '510250', '510257', '510258', '510259', '510405', '510406', '510407', '510506', '510529', '510530', nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, '520138', '520139', '520235', '520242', '520251', '520213', '520216', '520223', '520226', '520236', '520237', '520241', '520243', '520

In [None]:
#create a copy of unchained to create the chained indices
chained = un.copy()

for col in chained:
    for i, row_value in chained[col].items():
        # print(col,i,row_value,meta.loc[i,'ID_START'])
        if(col>=meta.loc[i,'ID_START']):
            if(col==startref):
                chained.at[i,col]=100
            # elif(col==meta.loc[i,'ITEM_START']):
            #     sample.at[i,col]=row_value
            elif(col<=startref+pd.tseries.offsets.DateOffset(years=1)):
                chained.at[i,col]=row_value
            else:
                if(col.month==1 and col>startref+pd.tseries.offsets.DateOffset(years=1)):
                    chained.at[i,col]=float(row_value)*float(chained.loc[i][datetime(col.year-1,1,1)])/100
                else:
                    chained.at[i,col]=float(row_value)*float(chained.loc[i][datetime(col.year,1,1)])/100

        elif(col==meta.loc[i,'ID_START']-pd.tseries.offsets.DateOffset(months=1)):
            chained.at[i,col]=100

        else:
            chained.at[i,col]=None

In [None]:
# Then calculate average prices
# avgprice=chained.copy()

# for col in avgprice:
#     for i, row_value in avgprice[col].items():
#         if(row_value==None):
#             avgprice.at[i,col]=None
#         else:
#             avgprice.at[i,col]=float(row_value)/ \
#             float(chained.loc[i,avgpriceRefMonth])* \
#             float(meta.loc[i,'AVERAGE_PRICE'])
            
# #rename columns to dates without time formats
# columns = {}
# for col in avgprice.columns:
#     try:
#         columns[col] = col.date()
#     except ValueError:
#         pass
# avgprice.rename(columns=columns, inplace=True)

# avgprice.astype(float).round(2).to_csv('2025_avgprice_cs.csv',date_format='%Y-%m-%d',na_rep='')

In [None]:
# calculate annual growth for CS
# annualgrowth=chained.copy()

# for col in annualgrowth:
#     for i, row_value in annualgrowth[col].items():
#         prev_col = col - pd.tseries.offsets.DateOffset(years=1)
#         if prev_col in annualgrowth.columns:
#             # safe to access previous year
#             annualgrowth.at[i, col] = (
#                 float(row_value) - float(chained.loc[i, prev_col])
#             ) * 100 / float(chained.loc[i, prev_col])
#         else:
#             annualgrowth.at[i, col] = None
                
                
# #rename columns to dates without time formats
# columns = {}
# for col in annualgrowth.columns:
#     try:
#         columns[col] = col.date()
#     except ValueError:
#         pass
# annualgrowth.rename(columns=columns, inplace=True)
                
# annualgrowth.astype(float).round(0).astype(int,errors='ignore').to_csv('2025_annualgrowth_cs.csv',date_format='%Y-%m-%d',na_rep='',float_format="%.0f")

In [None]:
# calculate monthly growth for cs
# monthlygrowth=chained.copy()

# for col in monthlygrowth:
#     for i, row_value in monthlygrowth[col].items():
#         prev_col = col - pd.tseries.offsets.DateOffset(months=1)
#         if (col < meta.loc[i, 'ID_START']) or (prev_col not in chained.columns):
#             monthlygrowth.at[i, col] = None
#         else:
#             monthlygrowth.at[i, col] = (
#                 (float(row_value) - float(chained.loc[i, prev_col])) * 100
#                 / float(chained.loc[i, prev_col])
#             )

# #rename columns to dates without time formats
# columns = {}
# for col in monthlygrowth.columns:
#     try:
#         columns[col] = col.date()
#     except ValueError:
#         pass
# monthlygrowth.rename(columns=columns, inplace=True)
                
# monthlygrowth.astype(float).round(0).astype(int,errors='ignore').to_csv('2025_monthlygrowth_cs.csv',date_format='%Y-%m-%d',na_rep='',float_format="%.0f")

In [None]:
#Finally save the unchained and chainedvnumbers to csv
#rename columns to dates without time formats
columns = {}
for col in un.columns:
    try:
        columns[col] = col.date()
    except ValueError:
        pass
un.rename(columns=columns, inplace=True)

#and save it
un.to_csv('unchained.csv')

#rename columns to dates without time formats
columns = {}
for col in chained.columns:
    try:
        columns[col] = col.date()
    except ValueError:
        pass
chained.rename(columns=columns, inplace=True)

chained.astype(float).round(3).to_csv('chained.csv',date_format='%Y-%m-%d',na_rep='')

In [None]:
#turn it into a excel datadownload file
# with pd.ExcelWriter("datadownload.xlsx", mode="a", if_sheet_exists="replace", date_format="YYYY-MM-DD", datetime_format="YYYY-MM-DD") as writer:
#     meta.drop(columns=['AVERAGE_PRICE']).to_excel(writer, sheet_name="metadata")  
#     # un.to_excel(writer, sheet_name="unchained")
#     chained.astype(float).round(3).transpose().to_excel(writer, sheet_name="chained")
#     avgprice.astype(float).round(2).fillna('').transpose().to_excel(writer, sheet_name="averageprice")
#     monthlygrowth.astype(float).round(0).fillna('').transpose().to_excel(writer, sheet_name="monthlygrowth")
#     annualgrowth.astype(float).round(0).fillna('').transpose().to_excel(writer,sheet_name="annualgrowth")
    

In [None]:
# Merge ID_NAME and CONSUMPTION_SEGMENT_CODE from allitems with avgprice calculated from chained indices

# Ensure allitems, chained, and avgprice_allitems are loaded
allitems = pd.read_csv('2025_all_items_metadata.csv')

# Ensure chained index is CONSUMPTION_SEGMENT_CODE
if chained.index.name != 'CONSUMPTION_SEGMENT_CODE':
    chained = chained.reset_index().set_index('CONSUMPTION_SEGMENT_CODE')

# Prepare avgprice DataFrame with ID_NAME and CONSUMPTION_SEGMENT_CODE
avgprice_merged = allitems[['ID_NAME', 'CONSUMPTION_SEGMENT_CODE']].copy()

# Calculate avgprice for each segment and month using chained indices and AVERAGE_PRICE
for col in chained.columns:
    avgprice_merged[str(col)] = avgprice_merged['CONSUMPTION_SEGMENT_CODE'].map(
        lambda seg: round((chained.loc[seg, col] / chained.loc[seg, chained.columns[0]] * allitems.loc[allitems['CONSUMPTION_SEGMENT_CODE'] == seg, 'AVERAGE_PRICE'].values[0]), 2)
        if seg in chained.index and not allitems.loc[allitems['CONSUMPTION_SEGMENT_CODE'] == seg, 'AVERAGE_PRICE'].empty else None
    )

# Save to CSV
avgprice_merged.to_csv('avgprice_merged.csv', index=False)

In [None]:
df_meta = pd.read_excel("2025_starting_file_test_data.xlsx")
df_meta["CONSUMPTION_SEGMENT_CODE"].astype(str).eq("510439").any()

In [None]:
# Calculate monthly growth of chained indices for each segment, rounded to integers
# Uses ID_NAME and CONSUMPTION_SEGMENT_CODE from allitems

import math

# Ensure allitems and chained are loaded
allitems = pd.read_csv('2025_all_items_metadata.csv')
if chained.index.name != 'CONSUMPTION_SEGMENT_CODE':
    chained = chained.reset_index().set_index('CONSUMPTION_SEGMENT_CODE')

# Prepare DataFrame for monthly growth
monthly_growth = allitems[['ID_NAME', 'CONSUMPTION_SEGMENT_CODE']].copy()

# Calculate monthly growth for each segment and month, handle NaN safely
for idx, col in enumerate(list(chained.columns)[1:], start=1):
    prev_col = chained.columns[idx-1]
    def calc_growth(seg):
        try:
            prev = chained.loc[seg, prev_col]
            curr = chained.loc[seg, col]
            if pd.isna(prev) or pd.isna(curr) or prev == 0:
                return None
            return int(round((curr - prev) * 100 / prev))
        except Exception:
            return None
    monthly_growth[str(col)] = monthly_growth['CONSUMPTION_SEGMENT_CODE'].map(calc_growth)

# Save to CSV
monthly_growth.to_csv('monthly_growth_merged.csv', index=False)

In [None]:
# Calculate annual growth of chained indices for each segment, rounded to integers (0 decimal places)
# Uses ID_NAME and CONSUMPTION_SEGMENT_CODE from allitems

# Ensure allitems and chained are loaded
allitems = pd.read_csv('2025_all_items_metadata.csv')
if chained.index.name != 'CONSUMPTION_SEGMENT_CODE':
    chained = chained.reset_index().set_index('CONSUMPTION_SEGMENT_CODE')

# Prepare DataFrame for annual growth
annual_growth = allitems[['ID_NAME', 'CONSUMPTION_SEGMENT_CODE']].copy()

# Calculate annual growth for each segment and month (from 12th column onwards)
for idx, col in enumerate(list(chained.columns)[12:], start=12):
    prev_col = chained.columns[idx-12]
    def calc_annual_growth(seg):
        try:
            prev = chained.loc[seg, prev_col]
            curr = chained.loc[seg, col]
            if pd.isna(prev) or pd.isna(curr) or prev == 0:
                return None
            return int(round((curr - prev) * 100 / prev))
        except Exception:
            return None
    annual_growth[str(col)] = annual_growth['CONSUMPTION_SEGMENT_CODE'].map(calc_annual_growth)

# Save to CSV
annual_growth.to_csv('annual_growth_merged.csv', index=False)

In [None]:
#turn it into a excel datadownload file

meta_for_datadownload = pd.read_csv("2025_all_items_metadata.csv")

with pd.ExcelWriter("datadownload.xlsx", mode="a", if_sheet_exists="replace", date_format="YYYY-MM-DD", datetime_format="YYYY-MM-DD") as writer:
    meta_for_datadownload.drop(columns=["AVERAGE_PRICE"]).to_excel(writer, index=False, sheet_name="Metadata")  
    # un.to_excel(writer, sheet_name="unchained")
    
    # make it tidy, join on meta data, reorder columns by index
    chained.round(3).reset_index().melt(id_vars=['CONSUMPTION_SEGMENT_CODE'],var_name='Date',value_name='Value').dropna()\
    .merge(meta_for_datadownload.reset_index()[["ID_NAME",'CONSUMPTION_SEGMENT_NAME','CONSUMPTION_SEGMENT_CODE','Category1','Category2','WEIGHT\SIZE']])\
    .iloc[:,[1,3,4,0,5,6,2]]\
    .to_excel(writer, index=False, sheet_name="Chained")
    
    avgprice_merged.round(2).melt(id_vars=["ID_NAME",'CONSUMPTION_SEGMENT_CODE'], var_name='Date', value_name='Price').dropna(subset=['Price'])\
    .merge(meta_for_datadownload.reset_index()[["ID_NAME",'CONSUMPTION_SEGMENT_NAME','CONSUMPTION_SEGMENT_CODE','Category1','Category2','WEIGHT\SIZE']])\
    .iloc[:,[2,0,4,1,5,6,7,3]]\
    .to_excel(writer, index=False, sheet_name="Average price")
    
    monthly_growth.round(0).melt(id_vars=["ID_NAME",'CONSUMPTION_SEGMENT_CODE'],var_name='Date',value_name='Percentage').dropna(subset=['Percentage'])\
    .merge(meta_for_datadownload.reset_index()[["ID_NAME",'CONSUMPTION_SEGMENT_NAME','CONSUMPTION_SEGMENT_CODE','Category1','Category2','WEIGHT\SIZE']])\
    .iloc[:,[2,0,4,1,5,6,7,3]]\
    .to_excel(writer, index=False, sheet_name="Monthly growth")
    
    annual_growth.round(0).melt(id_vars=["ID_NAME",'CONSUMPTION_SEGMENT_CODE'],var_name='Date', value_name='Percentage').dropna(subset=['Percentage'])\
    .merge(meta_for_datadownload.reset_index()[["ID_NAME",'CONSUMPTION_SEGMENT_NAME','CONSUMPTION_SEGMENT_CODE','Category1','Category2','WEIGHT\SIZE']])\
    .iloc[:,[2,0,4,1,5,6,7,3]]\
    .to_excel(writer,index=False, sheet_name="Annual growth")
    