# Data Ingestion

This notebook is the first in the series of notebooks created for the _Nimbus_ demo. Here, we will demonstrate how you can programatically ingest raw data into a database for your data science projects. We will read a set of dataset files from an object store (Amazon S3), convert it into a format compatible with Trino, create a table for this schema, and finally upload the data to this table.

In [2]:
import os
import re
import boto3
import pathlib

import trino

import pandas as pd

from dotenv import dotenv_values, load_dotenv

In [None]:
# Author: Erik Erlandson <eje@redhat.com>
_wsdedup = re.compile(r"\s+")
_usdedup = re.compile(r"__+")
_rmpunc = re.compile(r"[,.()&$/+-]+")
# 63 seems to be a common max column name length
def snakify(name, maxlen=63):
    if isinstance(name, list):
        return [snakify(e) for e in name]
    w = name.casefold().rstrip().lstrip()
    w = w.replace("-", "_")
    w = _rmpunc.sub("", w)
    w = _wsdedup.sub("_", w)
    w = _usdedup.sub("_", w)
    w = w.replace("average", "avg")
    w = w.replace("maximum", "max")
    w = w.replace("minimum", "min")
    w = w.replace("absolute", "abs")
    w = w.replace("source", "src")
    w = w.replace("distribution", "dist")
    # these are common in the sample names but unsure of standard abbv
    #w = w.replace("inference", "inf")
    #w = w.replace("emissions", "emis")
    #w = w.replace("intensity", "int")
    #w = w.replace("reported", "rep")
    #w = w.replace("revenue", "rev")
    w = w[:maxlen] 
    return w

def snakify_columns(df, inplace=False, maxlen=63):
    icols = df.columns.to_list()
    ocols = snakify(icols, maxlen=maxlen)
    scols = set(ocols)
    if (len(set(ocols)) < len(ocols)):
        raise ValueError("remapped column names were not unique!")
    rename_map = dict(list(zip(icols,snakify(icols))))
    return df.rename(columns=rename_map, inplace=inplace)

_p2smap = {
    'string': 'varchar',
    'Float64': 'double',
    'Int64': 'bigint'
}

def pandas_type_to_sql(pt):
    st = _p2smap.get(pt)
    if st is not None:
        return st
    raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))

# add ability to specify optional dict for specific fields?
# if column name is present, use specified value?
def generate_table_schema_pairs(df):
    ptypes = [str(e) for e in df.dtypes.to_list()]
    stypes = [pandas_type_to_sql(e) for e in ptypes]
    pz = list(zip(df.columns.to_list(), stypes))
    return ",\n".join(["    {n} {t}".format(n=e[0],t=e[1]) for e in pz])