<font size="5">Ingest WDI - "GDP per capita" data into Trino pipeline</font>

In [1]:
from dotenv import dotenv_values, load_dotenv
import osc_ingest_trino as osc
import os
import pathlib

Load Environment Variables

In [2]:
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [3]:
# use a catalog that is configured for iceberg
ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'pcaf_sovereign_footprint'
ingest_table = 'sf_wdi_gdp'

In [4]:
import trino
from sqlalchemy.engine import create_engine

env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

trino_bucket = osc.attach_s3_bucket("S3_DEV")

In [5]:
import boto3

s3_source = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
source_bucket = s3_source.Bucket(os.environ['S3_LANDING_BUCKET'])

Open a Trino connection using JWT for authentication

In [6]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {ingest_catalog}')
for row in schema_read.fetchall():
    print(row)

('aicoe_osc_demo_results',)
('default',)
('demo_dv',)
('dera',)
('essd',)
('iceberg_demo',)
('information_schema',)
('ingest',)
('mdt_sandbox',)
('pcaf_sovereign_footprint',)
('sandbox',)
('wri_gppd',)


Load GDP file (updated sporadically from https://data.worldbank.org/indicator/NY.GDP.PCAP.PP.CD)

In [7]:
import pandas as pd
import ParseXLS as parser


## GDP
ticker_file = s3_source.Object(os.environ['S3_LANDING_BUCKET'],'PCAF-sovereign-footprint/WDI/API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv')
ticker_file.download_file(f'/tmp/API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv')

df = parser.process('WDI_GDP.ini','WDI_GDP.csv') 

## GDP PPP

ticker_file = s3_source.Object(os.environ['S3_LANDING_BUCKET'],'PCAF-sovereign-footprint/WDI/API_NY.GDP.MKTP.PP.CD_DS2_en_csv_v2.csv')
ticker_file.download_file(f'/tmp/API_NY.GDP.MKTP.PP.CD_DS2_en_csv_v2.csv')

df2 = parser.process('WDI_GDP_PPP.ini','WDI_GDP_PPP.csv') 

# combine both dataframes

df =pd.concat([df,df2])

df=df.astype({'validity_date': 'int32'})
df= df.convert_dtypes()
df.info(verbose=True)
df= df[['rec_source','data_provider','country_iso_code','country_name','validity_date','attribute','value','value_units']].dropna(subset=['value'])
#df = df.convert_dtypes()
#print(df.info(verbose=True))
#df
#df


WDI_GDP.ini
file_list:
['/tmp/API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv']
/tmp/API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv
2
csv
/tmp/API_NY.GDP.MKTP.CD_DS2_en_csv_v
<configparser.ConfigParser object at 0x7fdea9f37f40>
                    Country Name Country Code     Indicator Name  \
0                          Aruba          ABW  GDP (current US$)   
1    Africa Eastern and Southern          AFE  GDP (current US$)   
2                    Afghanistan          AFG  GDP (current US$)   
3     Africa Western and Central          AFW  GDP (current US$)   
4                         Angola          AGO  GDP (current US$)   
..                           ...          ...                ...   
261                       Kosovo          XKX  GDP (current US$)   
262                  Yemen, Rep.          YEM  GDP (current US$)   
263                 South Africa          ZAF  GDP (current US$)   
264                       Zambia          ZMB  GDP (current US$)   
265                     Zimbabwe          ZWE

In [8]:
#%run -i TransposeXLS.py --config WDI.ini --output=WDI.csv 

In [9]:
import osc_ingest_trino as osc
columnschema = osc.create_table_schema_pairs(df) 


In [10]:
tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['validity_date']
)
"""
print(tabledef)
qres = engine.execute(tabledef)
#print(qres.fetchall())


create table if not exists osc_datacommons_dev.pcaf_sovereign_footprint.sf_wdi_gdp(
    rec_source varchar,
    data_provider varchar,
    country_iso_code varchar,
    country_name varchar,
    validity_date integer,
    attribute varchar,
    value double,
    value_units varchar
) with (
    format = 'ORC',
    partitioning = array['validity_date']
)



In [11]:
# Delete the data for the related attribute 
sql=f"""
delete from {ingest_catalog}.{ingest_schema}.{ingest_table} 
"""
qres = engine.execute(sql)
print(qres.fetchall())

[(20064,)]


In [12]:
sql=f"""
select * from {ingest_catalog}.{ingest_schema}.{ingest_table}
"""
pd.read_sql(sql, engine)


Unnamed: 0,rec_source,data_provider,country_iso_code,country_name,validity_date,attribute,value,value_units


In [13]:
print(ingest_catalog)
#df=df.drop(df[df.country_name=="cote d'ivoire"].index)
df.to_sql(ingest_table,
           con=engine,
           schema=ingest_schema,
           if_exists='append',
           index=False,
           method=osc.TrinoBatchInsert(batch_size = 1000, verbose = True))

osc_datacommons_dev
constructed fully qualified table name as: "pcaf_sovereign_footprint.sf_wdi_gdp"
inserting 1000 records
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'AFE', 'Africa Eastern and Southern', 1960, 'GDP (current US$)', 21290586002.8823, 'USD')
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'AFG', 'Afghanistan', 1960, 'GDP (current US$)', 537777811.111111, 'USD')
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'AFW', 'Africa Western and Central', 1960, 'GDP (current US$)', 10404135069.15, 'USD')
  ...
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'CHL', 'Chile', 1967, 'GDP (current US$)', 7013196078.43137, 'USD')
batch insert result: [(1000,)]
inserting 1000 records
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'CHN', 'China', 1967, 'GDP (current US$)', 72881631326.6715, 'USD')
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'CIV', 'Cote d''Ivoire', 1967, 'GDP (current US$)', 1082922892.15202, 'USD')
  ('API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv', 'WDI', 'C

In [14]:
import pandas as pd
sql=f"""
select * from {ingest_catalog}.{ingest_schema}.{ingest_table}" + " where country_iso_code='BHS' order by validity_date desc"""
pd.read_sql(sql, engine)


Unnamed: 0,rec_source,data_provider,country_iso_code,country_name,validity_date,attribute,value,value_units
0,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",2021,GDP (current US$),1.120860e+10,USD
1,API_NY.GDP.MKTP.PP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",2021,"GDP, PPP (current international $)",1.353788e+10,USD
2,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",2020,GDP (current US$),9.699500e+09,USD
3,API_NY.GDP.MKTP.PP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",2020,"GDP, PPP (current international $)",1.142966e+10,USD
4,API_NY.GDP.MKTP.PP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",2019,"GDP, PPP (current international $)",1.482532e+10,USD
...,...,...,...,...,...,...,...,...
89,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",1964,GDP (current US$),2.666667e+08,USD
90,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",1963,GDP (current US$),2.377451e+08,USD
91,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",1962,GDP (current US$),2.122549e+08,USD
92,API_NY.GDP.MKTP.CD_DS2_en_csv_v2.csv,WDI,BHS,"Bahamas, The",1961,GDP (current US$),1.900980e+08,USD
