# Data Ingestion

This notebook is the first in a series of notebooks created for the Nimbus demo. Here, we will show how to read raw data files from an s3 storage, and ingest them as a table into Trino. Such a table can then be used for further analysis or for creating visualizations in Apache Superset.

The notebook also shows how to join two tables on Trino to create a new table.

In [1]:
import os
import time
import re
import pathlib
from dotenv import load_dotenv
import boto3
import trino
import json
import pandas as pd

### Injecting Credentials

In order to run this notebook, we need credentials to connect with S3 storage to retrieve data and the Trino server to create tables.

In an automated environment, the credentials can be specified in a pipeline's environment variables or through Openshift secrets.

For running the notebook in automation in an elyra pipeline, the environment variables can be updated in the notebook "Properties" in the pipeline UI or under `"env_vars"` in the `demo1.pipeline yaml` file.

For running the notebook in a local environment, we will define them as environment variables in a `credentials.env` file at the root of the project repository, and load them using dotenv. An example of what the contents of `credentials.env` could look like is shown below

```
# s3 credentials
S3_ENDPOINT=https://s3.us-east-1.amazonaws.com
S3_BUCKET=ocp-odh-os-demo-s3
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

# trino credentials
TRINO_USER=xxx
TRINO_PASSWD=xxx
TRINO_HOST=trino-secure-odh-trino.apps.odh-cl1.apps.os-climate.org
TRINO_PORT=443
```

In [2]:
# file to store runtime kfpipeline metrics
metrics_file_path = './mlpipeline-metrics.json'

In [3]:
# Load credentials
dotenv_dir = "/opt/app-root/src/aicoe-osc-demo"
dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path, override=True)

## Read Raw Data from S3

First, we will read some sample data from s3. We will format the column data types to ensure they can be understood by Trino, as well as rename the columns so that they are compatible with SQL naming conventions.

In [4]:
# Create an S3 client
s3 = boto3.client(
    service_name="s3",
    endpoint_url=os.environ["S3_ENDPOINT"],
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)

In [5]:
# Download a sample dataset file from s3
obj = s3.get_object(
    Bucket=os.environ["S3_BUCKET"],
    Key="urgentem/UrgentemDataSampleEmissionsTargetsDec2020.csv",
)

# load the raw file as a dataframe
df_emissions = (pd.read_csv(obj["Body"])).convert_dtypes()
len_emssions_data_1 = len(df_emissions)

# convert columns to specific data types
df_emissions = df_emissions.convert_dtypes()

In [6]:
# Download another sample dataset file from s3
obj_2 = s3.get_object(
    Bucket=os.environ["S3_BUCKET"], Key="urgentem/UrgentemDataSampleDec2020.csv"
)

# load the raw file as a dataframe
df_emissions_2 = (pd.read_csv(obj_2["Body"])).convert_dtypes()
len_emssions_data_2 = len(df_emissions_2)

# convert columns to specific data types
df_emissions_2 = df_emissions_2.convert_dtypes()

In [7]:
## Methods to clean column names
# Author: Erik Erlandson <eje@redhat.com>

_wsdedup = re.compile(r"\s+")
_usdedup = re.compile(r"__+")
_rmpunc = re.compile(r"[,.()&$/+-]+")
_p2smap = {"string": "varchar", "Float64": "double", "Int64": "bigint"}


def snakify(name, maxlen):
    w = name.casefold().rstrip().lstrip()
    w = w.replace("-", "_")
    w = _rmpunc.sub("", w)
    w = _wsdedup.sub("_", w)
    w = _usdedup.sub("_", w)
    w = w.replace("average", "avg")
    w = w.replace("maximum", "max")
    w = w.replace("minimum", "min")
    w = w.replace("absolute", "abs")
    w = w.replace("source", "src")
    w = w.replace("distribution", "dist")
    # these are common in the sample names but unsure of standard abbv
    # w = w.replace("inference", "inf")
    # w = w.replace("emissions", "emis")
    # w = w.replace("intensity", "int")
    # w = w.replace("reported", "rep")
    # w = w.replace("revenue", "rev")
    w = w[:maxlen]
    return w


def snakify_columns(df, inplace=False, maxlen=63):
    icols = df.columns.to_list()
    ocols = [snakify(e, maxlen=maxlen) for e in icols]
    if len(set(ocols)) < len(ocols):
        raise ValueError("remapped column names were not unique!")
    rename_map = dict(list(zip(icols, ocols)))
    return df.rename(columns=rename_map, inplace=inplace)


def pandas_type_to_sql(pt):
    st = _p2smap.get(pt)
    if st is not None:
        return st
    raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))


# add ability to specify optional dict for specific fields?
# if column name is present, use specified value?
def generate_table_schema_pairs(df):
    ptypes = [str(e) for e in df.dtypes.to_list()]
    stypes = [pandas_type_to_sql(e) for e in ptypes]
    pz = list(zip(df.columns.to_list(), stypes))
    return ",\n".join(["    {n} {t}".format(n=e[0], t=e[1]) for e in pz])


# Convert GHG values with string representation of numbers to float
def str_w_spaces_to_numeric(df, col):
    df[col] = df[col].str.replace(' ','').str.replace(',','')
    df[col] = df[col].astype('float').astype('Float64')

In [8]:
# map column names to a form that works for SQL
snakify_columns(df_emissions, inplace=True)

# map column names to a form that works for SQL
# Had to increase the snakify max length to 100 to avoid column name repetition
snakify_columns(df_emissions_2, inplace=True, maxlen=100)

In [9]:
# Modify GHG emissions columns to be numeric so plotting charts becomes easier
str_w_spaces_to_numeric(df_emissions, 'base_year_ghg_emissions_s1_tco2e')
str_w_spaces_to_numeric(df_emissions, 'base_year_ghg_emissions_s1s2_tco2e')
str_w_spaces_to_numeric(df_emissions, 'base_year_ghg_emissions_s3_tco2e')

In [10]:
# a way to examine the structure of a pandas data frame
df_emissions.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19 entries, 0 to 18
Data columns (total 15 columns):
 #   Column                              Non-Null Count  Dtype  
---  ------                              --------------  -----  
 0   company_name                        19 non-null     string 
 1   isin                                19 non-null     string 
 2   target_type                         19 non-null     string 
 3   scope                               19 non-null     string 
 4   coverage_s1                         16 non-null     Float64
 5   coverage_s2                         15 non-null     Float64
 6   coverage_s3                         4 non-null      Int64  
 7   reduction_ambition                  19 non-null     Float64
 8   base_year                           19 non-null     Int64  
 9   end_year                            19 non-null     Int64  
 10  start_year                          19 non-null     Int64  
 11  base_year_ghg_emissions_s1_tco2e    1 non-null 

## Save Processed Data to S3

Now that our data is in a form ingestible by Trino, we will upload it back into our s3 bucket. This will be the data source for our Trino table.

In [11]:
# parquet has multiple options for appending or updating data
# including adding new files, or appending, sharding directory trees, etc
df_emissions.to_parquet("/tmp/emissions_table1.parquet", index=False)
t = time.time()
s3.upload_file(
    Bucket=os.environ["S3_BUCKET"],
    Key="urgentem/trino/itr_emissions_join_1/emissions.parquet",
    Filename="/tmp/emissions_table1.parquet",
)
upload_df1_time = time.time() - t

In [12]:
# parquet has multiple options for appending or updating data
# including adding new files, or appending, sharding directory trees, etc
df_emissions_2.to_parquet("/tmp/emissions_table2.parquet", index=False)
t = time.time()
s3.upload_file(
    Bucket=os.environ["S3_BUCKET"],
    Key="urgentem/trino/itr_emissions_join_2/emissions.parquet",
    Filename="/tmp/emissions_table2.parquet",
)
upload_df2_time = time.time() - t

## Create a Table on Trino

Finally, we will create a table in our Trino database that uses the parquet files we uploaded in the previous section as the data source.

In [13]:
# use trino password env-var to hold token values
JWT_TOKEN = os.environ['TRINO_PASSWD']
conn = trino.dbapi.connect(
    host=os.environ['TRINO_HOST'],
    port=os.environ['TRINO_PORT'],
    user=os.environ['TRINO_USER'],
    http_scheme='https',
    auth=trino.auth.JWTAuthentication(JWT_TOKEN),
)
cur = conn.cursor()

In [14]:
# generate a sql schema that will correspond to the data types
# of columns in the pandas DF
# to-do: add some mechanisms for overriding types, either here
# or on the pandas data-frame itself before we write it out
schema = generate_table_schema_pairs(df_emissions)

tabledef = """create table if not exists osc_datacommons_dev.urgentem.itr_emissions_1_v2(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{s3_bucket}/urgentem/trino/itr_emissions_join_1/'
)""".format(
    schema=schema,
    s3_bucket=os.environ["S3_BUCKET"],
)

t = time.time()
# tables created externally may not show up immediately in cloud-beaver
cur.execute(tabledef)
time_to_create_table_1 = time.time() - t
cur.fetchall()

[[True]]

In [15]:
1  # generate a sql schema that will correspond to the data types
# of columns in the pandas DF
# to-do: add some mechanisms for overriding types, either here
# or on the pandas data-frame itself before we write it out
schema = generate_table_schema_pairs(df_emissions_2)

tabledef = """create table if not exists osc_datacommons_dev.urgentem.itr_emissions_2(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{s3_bucket}/urgentem/trino/itr_emissions_join_2/'
)""".format(
    schema=schema,
    s3_bucket=os.environ["S3_BUCKET"],
)

t = time.time()
# tables created externally may not show up immediately in cloud-beaver
cur.execute(tabledef)
time_to_create_table_2 = time.time() - t
cur.fetchall()

[[True]]

In [16]:
## Check if table 1 is there
cur.execute("select * from osc_datacommons_dev.urgentem.itr_emissions_1_v2 LIMIT 5")
cur.fetchall()[1]

['ADIDAS AG',
 'DE000A1EWWW0',
 'Absolute',
 'S1+S2',
 0.9,
 0.9,
 None,
 0.15,
 2015,
 2020,
 2015,
 None,
 59132.0,
 295660.0,
 1.0]

In [17]:
## Check if table 2 is there
cur.execute("select * from osc_datacommons_dev.urgentem.itr_emissions_2 LIMIT 5")
cur.fetchall()[1][:15]

['ADIDAS AG',
 'DE000A1EWWW0',
 '4031976',
 'ADS GR',
 3,
 3,
 'Germany',
 'Europe',
 301.0,
 17.5,
 'Sum of Location and Scope One',
 283.5,
 'Sum of Average Category Intensities',
 1.8,
 'Inferred - Average - Industry winsor']

In [18]:
metrics = {
    "metrics":[
        {
            "name":"no_rows_emissions_table_1",
            "numberValue":len_emssions_data_1,
            "format":"RAW"
        },
        {
            "name":"no_rows_emissions_table_2",
            "numberValue":len_emssions_data_2,
            "format":"RAW"
        },
        {
            "name":"time_to_upload_df_1",
            "numberValue":upload_df1_time,
            "format":"RAW"
        },
        {
            "name":"time_to_upload_df_2",
            "numberValue":upload_df2_time,
            "format":"RAW"
        },
        {
            "name":"time_to_create_table_1",
            "numberValue":time_to_create_table_1,
            "format":"RAW"
        },
        {
            "name":"time_to_create_table_2",
            "numberValue":time_to_create_table_2,
            "format":"RAW"
        }
    ]
}

In [19]:
pathlib.Path(metrics_file_path).parent.mkdir(parents=True, exist_ok=True)
pathlib.Path(metrics_file_path).write_text(json.dumps(metrics))

515

# Conclusion

In this notebook, we showed how to take raw csv files on S3, process them into a format usable by Trino, and save them as parquet files on S3. Then we showed how to create tables on Trino using these parquet files. The tables can now be used in a Superset dashboard for visualization. 