In [None]:
# default_exp data.loader

In [None]:
#hide
%load_ext autoreload
%autoreload 2

In [None]:
#export
import os
import logging
import pandas as pd
from enum import Enum, auto
from sqlalchemy import create_engine, inspect

logging.basicConfig(
    format="%(asctime)s %(levelname)s(): %(message)s", level=logging.INFO
)

In [None]:
#hide
from nbdev.showdoc import *

# Data Loader

> Generic data ingestion routines to ingest data from files to databases.

## Decorator

In [None]:
#export
def auto_str(cls):
    "Auto generate __str__"

    def __str__(self):
        return "%s(%s)" % (
            type(self).__name__,
            ", ".join("%s=%s" % item for item in vars(self).items()),
        )

    cls.__str__ = __str__
    return cls

## Attribute Delegation

In [None]:
#export
class GetAttr:
    "Inherit from this to have all attr accesses in `self._xtra` passed down to `self.default`"
    _default='default'
    def _component_attr_filter(self,k):
        if k.startswith('__') or k in ('_xtra',self._default): return False
        xtra = getattr(self,'_xtra',None)
        return xtra is None or k in xtra
    def _dir(self): return [k for k in dir(getattr(self,self._default)) if self._component_attr_filter(k)]
    def __getattr__(self,k):
        if self._component_attr_filter(k):
            attr = getattr(self,self._default,None)
            if attr is not None: return getattr(attr,k)
        raise AttributeError(k)
    def __dir__(self): return custom_dir(self,self._dir())
    def __setstate__(self,data): self.__dict__.update(data)

`ObjectFactor`, `DbSinkProvider` and `FileSourceProvider` are the factory classes.

In [None]:
#export
class ObjectFactory():
    "Generic object factory"
    def __init__(self):
        self._builders = {}

    def register_builder(self, key, builder):
        self._builders[key] = builder

    def create(self, key, **kwargs):
        builder = self._builders.get(key)
        if not builder:
            raise ValueError(key)
        return builder(**kwargs)
    

In [None]:
#export
class DbTargetProvider(ObjectFactory):
    "Database provider"

    def get(self, id, **kwargs):
        """Create the database interface"""
        return self.create(id, **kwargs)
   

In [None]:
#export 
class FileSourceProvider(ObjectFactory):
    "Supported file sources"

    def get(self, id, **kwargs):
        """Create the file interface"""
        return self.create(id, **kwargs)

`DatabaseType` and `FileSource` classes.

In [None]:
#export
class DatabaseTarget(Enum):
    PostgreSQL = auto()
    MySQL = auto()

In [None]:
#export
class FileSource(Enum):
    CSV = auto()
    Excel = auto()

## Target Databases

### PostgreSQL

In [None]:
#export
class PgSqlDbBuilder:
    """PostgreSQL database builder."""

    def __init__(self):
        self._instance = None

    def __call__(self, host, port, db, user, password, **_ignored):
        if not self._instance:
            self._instance = PgSqlDb(
                host,
                port,
                db,
                user,
                password
            )
        return self._instance

@auto_str
class PgSqlDb:
    """PostgreSQL database destination."""

    def __init__(self, host, port, db, user, password):
        self._host = host
        self._port = port
        self._db = db
        self._user = user
        self._password = password

    def get_engine(self):
        """Create and return sqlalchemy engine."""
        return create_engine(self.get_conn_str())

    def get_conn_str(self):
        """Return the connection string."""
        return f"postgresql+psycopg2://{self._user}:{self._password}@{self._host}:{self._port}/{self._db}"

In [None]:
#export
class ImpalaDbBuilder():
    """Impala database builder."""
    
    def __init__(self):
        self._instance = None

    def __call__(self, host, port, db, **_ignored):
        if not self._instance:
            self._instance = ImpalaDb(
                host,
                port,
                db
            )
        return self._instance


@auto_str
class ImpalaDb:
    
    def __init__(self, host, port, db):
        self._host = host
        self._port = port
        self._db = db

    def get_engine(self):
        """Create and return sqlalchemy engine."""
        return create_engine('impala://', creator=self.get_conn)

     def get_conn(self):
        return connect(host=self._host,
                    port=int(self._port),
                    use_ssl=True,
                    auth_mechanism='GSSAPI',
                    kerberos_service_name='impala',
                    database=self._db,
                    )

### MySQL

In [None]:
#export
class MySqlDbBuilder:
    """MySQL database builder."""

    def __init__(self):
        self._instance = None

    def __call__(self, host, port, db, user, password, **_ignored):
        if not self._instance:
            self._instance = MySqlDb(
                host,
                port,
                db,
                user,
                password
            )
        return self._instance

@auto_str
class MySqlDb:
    """MySQL database destination."""

    def __init__(self, host, port, db, user, password):
        self._host = host
        self._port = port
        self._db = db
        self._user = user
        self._password = password

    def get_engine(self):
        """Create and return sqlalchemy engine."""
        return create_engine(self.get_conn_str())

    def get_conn_str(self):
        """Return the connection string."""
        return f"mysql+pymysql://{self._user}:{self._password}@{self._host}:{self._port}/{self._db}?charset=utf8mb4"

## Supported Sources

In [None]:
#export

def create_excel_file_source(file_path, **args):
    """Create Excel file source."""
    return ExcelSource(file_path, **args) 
    
def create_csv_file_source(file_path, **args):
    """Create CSV file source."""
    return CSVSource(file_path, **args) 
    
class ExcelSource:
    """Excel file source."""

    def __init__(self, file_path, **args):
        self._file_path = file_path
        self._args = args

    def filepath(self):
        return self._file_path
    
    def get_data(self):
        """Read the file and return a `DataFrame`"""
        return pd.read_excel(self._file_path, engine='openpyxl', **self._args)
    
class CSVSource:
    """CSV file source."""

    def __init__(self, file_path, **args):
        self._file_path = file_path
        self._args = args
        
    def filepath(self):
        return self._file_path

    def get_data(self):
        """Read the file and return a `DataFrame`"""
        return pd.read_csv(self._file_path, engine=None, **self._args)

## Ingestion

In [None]:
#export

# Register supported database providers
db_targets = DbTargetProvider()
db_targets.register_builder(DatabaseTarget.PostgreSQL, PgSqlDbBuilder())
db_targets.register_builder(DatabaseTarget.MySQL, MySqlDbBuilder())

In [None]:
#export

# Register supported file types
file_sources = FileSourceProvider()
file_sources.register_builder(FileSource.Excel, create_excel_file_source)
file_sources.register_builder(FileSource.CSV, create_csv_file_source)

In [None]:
#export
def ingest(file_source, target_db, table_name, *, if_exists='append', method='multi', schema=None):
    """Ingest the file into the database table."""
    
    # Create db engine
    engine = target_db.get_engine()

    # Inspect the target table schema
    inspector = inspect(engine)
    dtypes = {}
    for column in inspector.get_columns(table_name, schema=schema):
        dtypes[column["name"]] = column["type"]
    logging.debug(dtypes)

    # Load the excel into database
    df = file_source.get_data()
    df.to_sql(
        table_name, engine, if_exists=if_exists, method=method, chunksize=500, index=False, dtype=dtypes
    )

    # Validation
    logging.info(f"\nTotal ingested records from {file_source.filepath()} - {len(df)}")
    for c in df.columns:
        logging.debug(f"{c} - {df[c].nunique()}")


In [None]:
# Create a Excel file source and return a pandas dataframe
excel_source = file_sources.get(FileSource.Excel, file_path="data/accounts.xlsx")
# excel_source.get_data()

Unnamed: 0,user_id,username,password,email,created_on,last_login
0,1,user1,user11,user1@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
1,2,user2,user22,user2@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
2,3,user3,user33,user3@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
3,4,user4,user44,user4@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
4,5,user5,user55,user5@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
...,...,...,...,...,...,...
95,96,user96,user9696,user96@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
96,97,user97,user9797,user97@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
97,98,user98,user9898,user98@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003
98,99,user99,user9999,user99@abc.com,2020-12-06 04:00:00.000003,2020-12-07 13:00:00.000003


In [None]:
# Create a CSV file source and return a pandas dataframe
csv_source = file_sources.get(FileSource.CSV, file_path="data/accounts.csv")
# csv_source.get_data()

Unnamed: 0,user_id,username,password,email,created_on,last_login
0,1,user1,user11,user1@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
1,2,user2,user22,user2@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
2,3,user3,user33,user3@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
3,4,user4,user44,user4@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
4,5,user5,user55,user5@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
...,...,...,...,...,...,...
95,96,user96,user9696,user96@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
96,97,user97,user9797,user97@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
97,98,user98,user9898,user98@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00
98,99,user99,user9999,user99@abc.com,2020-12-06 04:00:00,2020-12-07 13:00:00


In [None]:
# Create PostgreSQL target
config = {
    'host': 'localhost',
    'port': 5432,
    'db': 'testdb',
    'user': 'user1',
    'password': 'userpwd'
}
pgsql_target = db_targets.get(DatabaseTarget.PostgreSQL, **config)
pgsql_target.get_conn_str()

'postgresql+psycopg2://user1:userpwd@localhost:5432/testdb'

In [None]:
# Ingest to PostgreSQL
ingest(excel_source, pgsql_target, 'accounts')

2020-12-12 21:33:38,062 INFO(): {'user_id': INTEGER(), 'username': VARCHAR(length=50), 'password': VARCHAR(length=50), 'email': VARCHAR(length=255), 'created_on': TIMESTAMP(), 'last_login': TIMESTAMP()}



Total records in data/accounts.xlsx - 100
user_id - 100
username - 100
password - 100
email - 100
created_on - 1
last_login - 1


In [None]:
# Ingest and replace
ingest(excel_source, pgsql_target, 'accounts', if_exists='replace')

2020-12-12 21:33:38,214 INFO(): {'user_id': INTEGER(), 'username': VARCHAR(length=50), 'password': VARCHAR(length=50), 'email': VARCHAR(length=255), 'created_on': TIMESTAMP(), 'last_login': TIMESTAMP()}



Total records in data/accounts.xlsx - 100
user_id - 100
username - 100
password - 100
email - 100
created_on - 1
last_login - 1


In [None]:
# Create MySQL target
config = {
    'host': 'localhost',
    'port': 3306,
    'db': 'testdb',
    'user': 'user1',
    'password': 'userpwd'
}
mysql_target = db_targets.get(DatabaseTarget.MySQL, **config)
mysql_target.get_conn_str()

'mysql+pymysql://user1:userpwd@localhost:3306/testdb?charset=utf8mb4'

In [None]:
# Ingest to MySQL
ingest(excel_source, mysql_target, 'accounts')

2020-12-12 21:33:38,710 INFO(): {'user_id': INTEGER(), 'username': VARCHAR(length=50), 'password': VARCHAR(length=50), 'email': VARCHAR(length=255), 'created_on': TIMESTAMP(), 'last_login': TIMESTAMP()}



Total records in data/accounts.xlsx - 100
user_id - 100
username - 100
password - 100
email - 100
created_on - 1
last_login - 1


In [None]:
# Ingest and replace 
ingest(excel_source, mysql_target, 'accounts', if_exists='replace')

2020-12-12 21:33:39,131 INFO(): {'user_id': INTEGER(), 'username': VARCHAR(length=50), 'password': VARCHAR(length=50), 'email': VARCHAR(length=255), 'created_on': TIMESTAMP(), 'last_login': TIMESTAMP()}



Total records in data/accounts.xlsx - 100
user_id - 100
username - 100
password - 100
email - 100
created_on - 1
last_login - 1


## Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 01_data.loader.ipynb.
Converted index.ipynb.
