In [6]:
import pandas as pd
import requests
import io
import sqlite3

In [25]:
ts = '2000-01'
# qs = '2024-Q1'
# data links
wage_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.TPS,DSD_EAR@DF_HOU_EAR,1.0/all?startPeriod=' + ts + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'
unemp_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.TPS,DSD_LFS@DF_IALFS_UNE_M,1.0/all?startPeriod=' + ts + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'
finance_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.STES,DSD_STES@DF_FINMARK,4.0/all?startPeriod=' + ts + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'
# prod_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.TPS,DSD_PDB@DF_PDB_ULC_Q,1.0/.Q.......?startPeriod=' + qs + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'
cpi_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.TPS,DSD_PRICES@DF_PRICES_ALL,1.0/all?startPeriod=' + ts + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'
cci_url = 'https://sdmx.oecd.org/public/rest/data/OECD.SDD.STES,DSD_STES@DF_CS,4.0/all?startPeriod=' + ts + '&dimensionAtObservation=AllDimensions&format=csvfilewithlabels'

In [12]:
conn = sqlite3.connect('/Users/Mark Rozenberg/Downloads/Macro-Indicators/macro_indicators_v2.db')

### Wages

In [5]:
# Read the data from wage_url
wage_data = pd.read_csv(io.StringIO(requests.get(wage_url).content.decode('utf-8')), header=0)

wage_data = wage_data.loc[(wage_data['FREQ'] == 'M') & (wage_data['ADJUSTMENT'] == 'N'), ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in wage_data.iterrows():
    conn.execute('''
    INSERT INTO wage (country, time_period, wage)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    wage=excluded.wage
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()


### Finance: IRLT + Shares

In [9]:
finance_data = pd.read_csv(io.StringIO(requests.get(finance_url).content.decode('utf-8')), header=0)

In [7]:
irlt_data = finance_data.loc[(finance_data['FREQ'] == 'M') & (finance_data['MEASURE'] == 'IRLT'), ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in irlt_data.iterrows():
    conn.execute('''
    INSERT INTO irlt (country, time_period, irlt)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    irlt=excluded.irlt
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()

In [13]:
conn.execute(''' drop table if exists unemp ''')
conn.commit()

In [12]:
share_data = finance_data.loc[(finance_data['FREQ'] == 'M') & (finance_data['MEASURE'] == 'SHARE'), ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in share_data.iterrows():
    conn.execute('''
    INSERT INTO share (country, time_period, share)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    share=excluded.share
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()

### Unemployment

In [14]:
unemp_data = pd.read_csv(io.StringIO(requests.get(unemp_url).content.decode('utf-8')), header=0)
unemp_data = unemp_data.loc[\
 (unemp_data['FREQ'] == 'M') &\
 (unemp_data['ADJUSTMENT'] == 'N') &\
 (unemp_data['SEX'] == '_T') &\
 (unemp_data['AGE'] == 'Y_GE25') ,\
 ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in unemp_data.iterrows():
    conn.execute('''
    INSERT INTO unemp (country, time_period, unemp)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    unemp=excluded.unemp
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()

### Prices

In [26]:
cpi_data = pd.read_csv(io.StringIO(requests.get(cpi_url).content.decode('utf-8')), header=0)
cpi_data = cpi_data.loc[\
 (cpi_data['FREQ'] == 'M') &\
 (cpi_data['ADJUSTMENT'] == 'N') &\
 (cpi_data['METHODOLOGY'] == 'N') &\
 (cpi_data['EXPENDITURE'] == '_T') &\
 (cpi_data['UNIT_MEASURE'] == 'PA') ,\
 ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in cpi_data.iterrows():
    conn.execute('''
    INSERT INTO cpi (country, time_period, cpi)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    cpi=excluded.cpi
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()

### Consumer confidence

In [11]:
cci_data = pd.read_csv(io.StringIO(requests.get(cci_url).content.decode('utf-8')), header=0)
cci_data = cci_data.loc[\
 (cci_data['FREQ'] == 'M') &\
 (cci_data['MEASURE'] == 'CCICP') ,\
 ['REF_AREA', 'TIME_PERIOD', 'OBS_VALUE']]
# Save the data to a table in the database
for index, row in cci_data.iterrows():
    conn.execute('''
    INSERT INTO cci (country, time_period, cci)
    VALUES (?, ?, ?)
    ON CONFLICT(country, time_period) DO UPDATE SET
    cci=excluded.cci
    ''', (row['REF_AREA'], row['TIME_PERIOD'], row['OBS_VALUE']))
conn.commit()

### check and tests:

In [12]:
# Query to list all tables in the database
pd.read_sql_query('''SELECT name FROM sqlite_master WHERE type='table';''', conn)

Unnamed: 0,name
0,wage
1,irlt
2,share
3,unemp
4,cpi
5,cci
6,main_table


### test - number of entries for each period and each variable

In [15]:
pd.read_sql_query('''
with
part1 as(SELECT time_period, count(*) as wage FROM wage group by 1),
part2 as(SELECT time_period, count(*) as irlt FROM irlt group by 1),
part3 as(SELECT time_period, count(*) as share FROM share group by 1),
part4 as(SELECT time_period, count(*) as unemp FROM unemp group by 1),
part5 as(SELECT time_period, count(*) as cpi FROM cpi group by 1),
part6 as(SELECT time_period, count(*) as cci FROM cci group by 1),
part7 as(
select part1.time_period, wage, irlt, share, unemp, cpi, cci
from part1
full join part2 on part1.time_period = part2.time_period
full join part3 on part1.time_period = part3.time_period
full join part4 on part1.time_period = part4.time_period
full join part5 on part1.time_period = part5.time_period
full join part6 on part1.time_period = part6.time_period
)
select * from part7
where time_period is not null
order by time_period desc
''', conn)

Unnamed: 0,time_period,wage,irlt,share,unemp,cpi,cci
0,2024-10,1,45,48,37,47,37
1,2024-09,6,45,48,41,47,39
2,2024-08,8,45,48,41,47,39
3,2024-07,13,45,48,41,48,39
4,2024-06,14,44,48,41,48,39
...,...,...,...,...,...,...,...
293,2000-05,15,30,47,33,51,27
294,2000-04,15,30,47,33,51,27
295,2000-03,15,29,47,33,51,28
296,2000-02,15,29,47,33,51,28


### combine the sources into main table

In [27]:
conn.execute(''' drop table if exists main_table ''')
conn.execute('''
create table if not exists main_table as
with irlt_dt as (select country, time_period, irlt from irlt),
share_dt as (select country, time_period, share from share),
unemp_dt as (select country, time_period, unemp from unemp),
wage_dt as (select country, time_period, wage from wage),
cpi_dt as (select country, time_period, cpi from cpi),
cci_dt as (select country, time_period, cci from cci)
select
coalesce(t1.time_period, t2.time_period, t3.time_period, t4.time_period, t5.time_period, t6.time_period) as time_period,
coalesce(t1.country, t2.country, t3.country, t4.country, t5.country, t6.country) as country,
t1.irlt, t2.share, t3.unemp, t4.wage, t5.cpi, t6.cci
from irlt_dt t1
full join share_dt t2 on t1.time_period = t2.time_period and t1.country = t2.country
full join unemp_dt t3 on t1.time_period = t3.time_period and t1.country = t3.country
full join wage_dt t4 on t1.time_period = t4.time_period and t1.country = t4.country
full join cpi_dt t5 on t1.time_period = t5.time_period and t1.country = t5.country
full join cci_dt t6 on t1.time_period = t6.time_period and t1.country = t6.country
''')
conn.commit()

### export the data to GCP

In [10]:
from google.cloud import storage

def list_buckets():
    storage_client = storage.Client()
    buckets = list(storage_client.list_buckets())
    print("Buckets:")
    for bucket in buckets:
        print(bucket.name)

list_buckets()

Buckets:
footbal_climate_data
ltint_forecast


In [25]:
import pandas as pd
from google.cloud import storage
import sqlite3

def export_view_to_csv_and_upload(view_name):
    # Load the data into a DataFrame
    df = pd.read_sql(f"SELECT *, date(time_period || '-01') as date FROM {view_name}", conn)
    float_columns = df.select_dtypes(include=['float64']).columns
    # df[float_columns] = df[float_columns].fillna(0)
    df = df.drop(columns=['time_period'])

    csv_file_path = view_name + '.csv'
    print(csv_file_path)
    # Save the DataFrame to a CSV file
    df.to_csv(csv_file_path, index=False)

    # Upload the CSV file to GCP
    storage_client = storage.Client()
    bucket = storage_client.bucket("ltint_forecast")
    blob = bucket.blob(csv_file_path)
    # Delete the existing file with the same name
    blob.delete()
    blob.upload_from_filename(csv_file_path)

    print(f"File {csv_file_path} uploaded to {csv_file_path}.")

# Example usage
export_view_to_csv_and_upload('main_table')

main_table.csv
File main_table.csv uploaded to main_table.csv.
