<a href="https://colab.research.google.com/github/GerardRagbir/Python-Notebooks/blob/main/etlPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL Pipeline

This notebook goes through an arbitrary ETL workflow, and intends to act as a template of sorts for my pipeline process. It's not directly executable since it does require some setup, in fact I'll outline these now:


*   **Database Connection**: in my code, I have configured the essentials for PostgreSQl via SQLAlchemy and PyODBC. This can be setup for any other supported database, or alternatively substituted for another package.
*   **Environment Configuration**: it's always good practice to obfuscate/secure your sensitive credentials - especially in the event that a breach occurs and the source code is visibly available - it can compromise your entire database! Check out [this fantastic article from Github](https://github.blog/2022-01-27-beyond-sql-injection-owasp-tips-secure-database-access/).


## Environment

The codeblock below isn't necessary for an already configured environment - it just ensures the Jupyter notebook is configured properly as we intend to use 3.11 (and possibly newer provided newer versions do not present breaking changes).

Our environnment will require the following:

*   Python 3.11
*   Packages: [SqlAlchemy](https://docs.sqlalchemy.org/en/14/), [PyODBC](https://github.com/mkleehammer/pyodbc/wiki), [Pandas](https://pandas.pydata.org/docs/)

In [None]:
'''Install python 3.11'''
!sudo apt-get update -y # update system dependencies
!sudo apt-get install python3.11 # update python to 3.11 (latest as of 2022-Oct)

'''Install required packages'''
!pip install sqlalchemy pyodbc pandas # required packages, modify as needed

Typically you'd need to set the ENV VARIABLES to the PATH using specific methods via BASH for the OS you're on. However, since we're only concerned with Python 3.X, we'll use the following script:

In [None]:
# Write ENV variables to OS via a Python script such as:

os.environ['PGPASS'] = 'password'
os.environ['PGUID'] = 'etluser'


'''Alternative for quickly setting env variables'''

# %env PGPASS=password
# %env PGUID=etluser

## Required Modules

<b>Resources</b>

SQL Alchemy: https://towardsdatascience.com/sqlalchemy-python-tutorial-79a577141a91

PyODBC: https://learn.microsoft.com/en-us/sql/connect/python/pyodbc/python-sql-driver-pyodbc?view=sql-server-ver16

Pandas: https://www.w3schools.com/python/pandas/default.asp

In [None]:
from sqlalchemy import create_engine
import pyodbc as odbc
import pandas as pd
import os

## Get variables from OS Environment

REMINDER: Never store variables within source!

The codeblock below assumes you've stored your database credentials (if you're using a username and password, some alternative verification methods may be present so consult the documentation and research best practices for your solution!)

In [None]:
''' 
These linked variables can be refactored directly to their methods, 
I am just separating them for explanation during live sessions!

eg pwd = os.environp['PGPASS'] would give a similar result.
'''

ETLPWD = str('PGPASS')
ETLUID = str('PGUID')

pwd = os.environ[ETLPWD]
uid = os.environ[ETLUID]

In [None]:
'''SQL Connection'''
driver = "{ODBC Driver 17 for SQL Server}" #eg ODBC Driver 17 for SQL Server
server = "localhost" #use localhost if using a local machine
database = "#Name_of_DB_HERE" 
port = 5432

table = "#tableName"


CONNECTION_PATH = f'DRIVER={driver};SERVER={server}\SQLEXPRESS;DATABASE={database};UID={uid};PWD={pwd}'

Refer to: https://www.connectionstrings.com/formating-rules-for-connection-strings/

In [None]:
QUERY = """
        SELECT t.name AS table_name
        FROM sys.tables t WHERE t.name IN ('DimProduct', 'DimProductSubcategory', 'DimProductCategory', 'DimSalesTerritory', 'FactInternetSales');
        """

In case you're unaware of what ETL is, it stands for Extract, Transform, Load - and is meant to encompass a pipeline or methodology for automating the populating of a DWH (Data Warehouse).

It begins by EXTRACTING data from the existing API/s, or from the database/s directly. This source data is referred to as the OLTP (Online Transactional Processing) which is meant for transactional-systems, mainly APIs for your backend and main client applications. This data intends to feed your OLAP (Online Analytical Processing), which is ideally a read-optimized (probably columnar) database known as a Data Warehouse.

NOTE: we could just as easily use a standard set of tools such as Apache Kafka, Apache Nifi, AWS Kinesis or Hevo. These ingest solutions are usually preferred for massive datasets, versus the basic python code below.

If we were using a memory limited host system for the extract script below, it may become necessary to implement some form of bathc processing. Luckily for us, Pandas [provides us with some ideas](https://pandas.pydata.org/docs/user_guide/scale.html), one of which is [chunking](https://towardsai.net/p/data-science/efficient-pandas-using-chunksize-for-large-data-sets-c66bf3037f93)

In [None]:
def extract():
  try:
    src_connect = odbc.connect(CONNECTION_PATH)
    cursor = src_connect.cursor()

    #execute query
    cursor.execute(QUERY)
    rows = cursor.fetchall()

    for row in rows:
      df = pd.read_sql_query(f'SELECT * FROM {table[0]}', src_connect)
      load(df, table[0])

  except Exception as e:
    print(f"Extraction Error: {e}")

  finally:
    src_connect.close()

In [None]:
def load():
  try:
    rows_imported = 0
    engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:{port}/{table}')
    print(f'Importing rows: {rows_imported} to {rows_imported+len(df)} ... for table {table}')

    #commit dataframe to database (postgresql here)
    df.to_sql(f'stg_{table}', engine, if_exists='replace', index=False)
    rows_imported += len(df)
    print("Data imported successfully!")
  except Exception as e:
    print(f"Load Error: {e}")


## Run ETL

In [None]:
try:
  extract()
except Exception as e:
  print(f"Error during ETL: {e}")