In [8]:
import os
import sys
import pandas as pd
import logging
from datetime import datetime, date
from google.cloud import bigquery
from hashlib import md5
from google.oauth2 import service_account

# emission data file name
filename = './data/Emissions_by_Country_2002-2022.csv'
PROJECT_NAME = "emissions-team-project"
project_id = "emissions-team-project"
DATASET_NAME = "emissions"


# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'fct_emissions': {
        'table_name': 'fct_emissions',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('country_code_year', 'string', mode='REQUIRED', max_length=128),
            bigquery.SchemaField('country_code', 'string', mode='NULLABLE'),
            bigquery.SchemaField('year', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('total_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('coal_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('oil_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('gas_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('cement_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('flaring_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('other_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('per_capita_em', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('created_at', 'string', mode='REQUIRED'),
            bigquery.SchemaField('modified_at', 'string', mode='REQUIRED'),
        ],
    }
}

# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
key_path = "/home/chloe_ycl/.creds/emission-team-project.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(credentials=credentials, project=credentials.project_id,)

logger.info(f"Setup Completed")

[DEBUG][2023-01-18 14:45:38,191][342428440:0053] : Creating bigquery client
[INFO ][2023-01-18 14:45:38,203][342428440:0062] : Setup Completed


## Loading Emission_by_Country CSV File

In [9]:
# *** always perform checks first ***
# check if the file exists
assert os.path.exists(filename), f"Data file does not exists: '{filename}'"
# check if the file contains any data. Header row alone is 78 bytes. size > 78
assert os.path.getsize(filename) > 78, f"Data file size incorrect; does not seem to contain data: '{filename}'"

# load into dataframe
df = pd.read_csv(
    filename, 
    header=0, 
    infer_datetime_format=True, 
    on_bad_lines='warn',
    )
logger.info(f"loaded {len(df.index)} rows from {filename}")

# *** always perform check at the end ***
# check schema: contains all expected columns?
expected_columns = ['Country', 'ISO 3166-1 alpha-3', 'Year', 'Total', 'Coal', 'Oil', 'Gas', 'Cement', 'Flaring', 'Other', 'Per Capita']
for col in expected_columns:
    assert col in list(df.columns), f"Data file missing required column: {col}"

#convert columns name to all lower case
df= df.rename(columns=str.lower)
df = df.rename(columns={'iso 3166-1 alpha-3': 'country_code','total': 'total_em', 'coal': 'coal_em', 'oil': 'oil_em', 'gas': 'gas_em', 'cement': 'cement_em', 'flaring': 'flaring_em', 'other': 'other_em', 'per capita': 'per_capita_em'})

# log data column data types
logger.debug(df.dtypes)
display(df.head(5))

[INFO ][2023-01-18 14:45:41,735][2156124246:0014] : loaded 63104 rows from ./data/Emissions_by_Country_2002-2022.csv
[DEBUG][2023-01-18 14:45:41,751][2156124246:0027] : country           object
country_code      object
year               int64
total_em         float64
coal_em          float64
oil_em           float64
gas_em           float64
cement_em        float64
flaring_em       float64
other_em         float64
per_capita_em    float64
dtype: object


Unnamed: 0,country,country_code,year,total_em,coal_em,oil_em,gas_em,cement_em,flaring_em,other_em,per_capita_em
0,Afghanistan,AFG,1750,0.0,,,,,,,
1,Afghanistan,AFG,1751,0.0,,,,,,,
2,Afghanistan,AFG,1752,0.0,,,,,,,
3,Afghanistan,AFG,1753,0.0,,,,,,,
4,Afghanistan,AFG,1754,0.0,,,,,,,


In [10]:
#add created_at and modified_at columns
df['created_at'] = pd.Timestamp('today').strftime("%Y-%m-%d")
df['modified_at'] = 'None'

display(df)
display(df.dtypes)

Unnamed: 0,country,country_code,year,total_em,coal_em,oil_em,gas_em,cement_em,flaring_em,other_em,per_capita_em,created_at,modified_at
0,Afghanistan,AFG,1750,0.000000,,,,,,,,2023-01-18,
1,Afghanistan,AFG,1751,0.000000,,,,,,,,2023-01-18,
2,Afghanistan,AFG,1752,0.000000,,,,,,,,2023-01-18,
3,Afghanistan,AFG,1753,0.000000,,,,,,,,2023-01-18,
4,Afghanistan,AFG,1754,0.000000,,,,,,,,2023-01-18,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
63099,Global,WLD,2017,36096.739276,14506.973805,12242.627935,7144.928128,1507.923185,391.992176,302.294047,4.749682,2023-01-18,
63100,Global,WLD,2018,36826.506600,14746.830688,12266.016285,7529.846784,1569.218392,412.115746,302.478706,4.792753,2023-01-18,
63101,Global,WLD,2019,37082.558969,14725.978025,12345.653374,7647.528220,1617.506786,439.253991,306.638573,4.775633,2023-01-18,
63102,Global,WLD,2020,35264.085734,14174.564010,11191.808551,7556.290283,1637.537532,407.583673,296.301685,4.497423,2023-01-18,


country           object
country_code      object
year               int64
total_em         float64
coal_em          float64
oil_em           float64
gas_em           float64
cement_em        float64
flaring_em       float64
other_em         float64
per_capita_em    float64
created_at        object
modified_at       object
dtype: object

In [13]:
cols = ['country_code', 'year']

# create composite key column for the fct_gdp table
def composite_key(row):
    comp_key = f"{row.country_code}{row.year}"
    return comp_key

# customer_id = md5 hash of customer first and last name
df['country_code_year'] = df.apply(composite_key, axis=1)
# set index by customer_id
df = df.set_index(keys='country_code_year')

logger.info(f"country_code_year generated")
display(df)

[INFO ][2023-01-18 15:50:46,749][2602611836:0013] : country_code_year generated


Unnamed: 0_level_0,country,country_code,year,total_em,coal_em,oil_em,gas_em,cement_em,flaring_em,other_em,per_capita_em,created_at,modified_at
country_code_year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
AFG1750,Afghanistan,AFG,1750,0.000000,,,,,,,,2023-01-18,
AFG1751,Afghanistan,AFG,1751,0.000000,,,,,,,,2023-01-18,
AFG1752,Afghanistan,AFG,1752,0.000000,,,,,,,,2023-01-18,
AFG1753,Afghanistan,AFG,1753,0.000000,,,,,,,,2023-01-18,
AFG1754,Afghanistan,AFG,1754,0.000000,,,,,,,,2023-01-18,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
WLD2017,Global,WLD,2017,36096.739276,14506.973805,12242.627935,7144.928128,1507.923185,391.992176,302.294047,4.749682,2023-01-18,
WLD2018,Global,WLD,2018,36826.506600,14746.830688,12266.016285,7529.846784,1569.218392,412.115746,302.478706,4.792753,2023-01-18,
WLD2019,Global,WLD,2019,37082.558969,14725.978025,12345.653374,7647.528220,1617.506786,439.253991,306.638573,4.775633,2023-01-18,
WLD2020,Global,WLD,2020,35264.085734,14174.564010,11191.808551,7556.290283,1637.537532,407.583673,296.301685,4.497423,2023-01-18,


In [14]:
# *** generic load function ***
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: bigquery.SchemaField,
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")


fct_emissions = df
logger.info(f"Preparing {len(fct_emissions.index)} to load to bigquery")
display(fct_emissions.head(n=10))
# load to bigquery
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['fct_emissions']['table_name']}"
schema = TABLE_METADATA['fct_emissions']['schema']
load_table(fct_emissions, client, table_name, schema)

[INFO ][2023-01-18 15:50:54,456][2708331725:0039] : Preparing 63104 to load to bigquery


Unnamed: 0_level_0,country,country_code,year,total_em,coal_em,oil_em,gas_em,cement_em,flaring_em,other_em,per_capita_em,created_at,modified_at
country_code_year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
AFG1750,Afghanistan,AFG,1750,0.0,,,,,,,,2023-01-18,
AFG1751,Afghanistan,AFG,1751,0.0,,,,,,,,2023-01-18,
AFG1752,Afghanistan,AFG,1752,0.0,,,,,,,,2023-01-18,
AFG1753,Afghanistan,AFG,1753,0.0,,,,,,,,2023-01-18,
AFG1754,Afghanistan,AFG,1754,0.0,,,,,,,,2023-01-18,
AFG1755,Afghanistan,AFG,1755,0.0,,,,,,,,2023-01-18,
AFG1756,Afghanistan,AFG,1756,0.0,,,,,,,,2023-01-18,
AFG1757,Afghanistan,AFG,1757,0.0,,,,,,,,2023-01-18,
AFG1758,Afghanistan,AFG,1758,0.0,,,,,,,,2023-01-18,
AFG1759,Afghanistan,AFG,1759,0.0,,,,,,,,2023-01-18,


[INFO ][2023-01-18 15:50:54,511][2708331725:0030] : loading table: 'emissions-team-project.emissions.fct_emissions'
[INFO ][2023-01-18 15:51:00,432][2708331725:0035] : loaded 63104 rows into emissions-team-project:emissions.fct_emissions
