## Extract, Transform and Load (ETL) with Python 

This notebook will help you perform an ETL with python using data sources from the local folder, PostgresQL and AWS S3 Bucket.

Python ETL Tools are the general ETL Tools written in Python and support other Python libraries for extracting, loading, and transforming different types of tables of data imported from multiple data sources like XML, CSV, Text, or JSON, etc into Data Warehouses, Data Lakes, etc. Python is a widely used language to create Data pipelines and is easy to manage. Python ETL Tools are fast, reliable, and deliver high performance

Please install the necessary packages before you continue. 
- ```!pip install pandas psycopg2 sqlalchemy boto3```
- The other packages are python inbuilt packages 

This is the steps in this notebook.
- Import packages
- Initiate config parser; you will need it later for the credentials
- Utility functions
- Extract
- Transform 
- Load

##### Import packages

Import the necessary packages.

In [1]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import configparser
from io import StringIO
from io import BytesIO
import boto3
from urllib import request
import json
from datetime import datetime

##### Initiate config parser; you will need it later for the credentials

This step is helpful for authentication.

In [2]:
config = configparser.ConfigParser()

# read the configuration file
config.read('../Dataverse/multi_config.ini')

# get all the connections
config.sections()

['postgresql', 'aws_s3', 'weather_api']

In [3]:
'''
Authenticate the Postgres and S3 database by getting the credentials from the config file
'''
database = config.get('postgresql', 'database')
user = config.get('postgresql', 'user')
password = config.get('postgresql', 'password')
host = config.get('postgresql', 'host')
port = config.get('postgresql', 'port')

# AWS Credentials
service_name = config.get('aws_s3', 'service_name')
region_name = config.get('aws_s3', 'region_name')
aws_access_key_id = config.get('aws_s3', 'aws_access_key_id')
aws_secret_access_key = config.get('aws_s3', 'aws_secret_access_key')
s3_bucket = config.get('aws_s3', 's3_bucket')

# Weather API Credentials
api_key = config.get('weather_api', 'api_key')
print("Authentication successful \n")
print(f'The database is "{database}" and the service_name is "{service_name}"')

Authentication successful 

The database is "data_engineering" and the service_name is "s3"


##### Utility functions

- contents: Import data from the S3 bucket. Accept the bucket name and key as input and returns the s3 key as an object.
- load_local: Import data from the local folder. Accept the file path as input and returns the data as a dataframe.
- load_postgres_table: The function save a dataframe as a new table on the postrgesQL database. It appends the data if the table exists.
- extract_postgres_table: The function import data to your workspace as a dataframe using SQLAlchemy engine.
- extract_psycopg_data: The function import data to your workspace as a dataframe using Psycopg2.

In [4]:
# function to extract data using the bucket name and key
def contents(bucket_name, key):
    """
    Import data from the S3 bucket. 
    Accept the bucket name and key as input and 
    returns the s3 key as an object.
    """
    string_io = io.BytesIO()
    s3.Object(bucket_name, key).download_fileobj(string_io)
    return string_io.getvalue() 


def extract_local(path):
    """
    Import data from the local folder. 
    Accept the file path as input and 
    returns the data as a dataframe.
    """
    local_data = pd.read_csv(path)
    return local_data


def load_postgres_table(df, table_name):
    """
    The function save a dataframe as a new table on the postrgesQL database. 
    It appends the data if the table exists.
    Input: df - dataframe, table_name - Name of table to be created
    Returns: None
    """
    alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600);
    postgreSQLConnection = alchemyEngine.connect();
    postgreSQLTable = table_name # "admission_postgres";

    # Use try/except to catch errors if exists
    try:
        frame = df.to_sql(postgreSQLTable, postgreSQLConnection, if_exists='fail'); #, index=False); #postgres_data
    except ValueError as vx:
        print(vx)
    except Exception as ex:  
        print(ex)
    else:
        print(f'PostgreSQL Table, "{postgreSQLTable}", has been created successfully.');
    finally:
        postgreSQLConnection.close();
        
        
def extract_postgres_data(table):
    """
    The function import data to your workspace as a dataframe.
    Input: Name of the table to be created or updated - table
    """
    try:
        # Create an engine instance
        alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600);

        # Connect to PostgreSQL server
        dbConnection = alchemyEngine.connect();

        # Read data from PostgreSQL database table and load into a DataFrame instance
        sql = f"select * from \"{table}\""
        dataFrame = pd.read_sql(sql, dbConnection);

        pd.set_option('display.expand_frame_repr', False);

        # Print the DataFrame
        #print(dataFrame); 

        # Close the database connection
        #dbConnection.close()
        return dataFrame
    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL", error)
    finally:
        if dbConnection:
            dbConnection.close()
            print("PostgreSQL connection is closed")

    
def extract_psycopg_data(table):
    """
    UserWarning: pandas only support SQLAlchemy connectable(engine/connection) 
    ordatabase string URI or sqlite3 DBAPI2 connection other DBAPI2 objects are
    not tested. Please consider using SQLAlchemy. 
    This is just a practice.
    The function import data to your workspace as a dataframe.
    Input: Name of the table to be created or updated - table
    
    """
    import psycopg2
    from psycopg2 import Error
    try:
        # Connect to an existing database
        connection = psycopg2.connect(user=user,
                                      password=password,
                                      host=host,
                                      port=port,
                                      database=database)

        cursor = connection.cursor()
        # SQL query to create a new table
        query = f"select * from \"{table}\""
        # Execute a command: this creates a new table
        df = pd.read_sql_query(query, connection)
        # print(df)
        connection.commit()
        print("Table extracted in PostgreSQL ")
        return df

    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL", error)
    finally:
        if connection:
            cursor.close()
            connection.close()
            print("PostgreSQL connection is closed")


def load_postgres_table(df, table_name):
    """
    This function helps to load data from a postgres table
    Appends data if the table exists
    Input: df - Dataframe, table name
    Output: None
    """
    # Start the alchemy engine
    alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600);
    
    # Establish the connection
    postgreSQLConnection = alchemyEngine.connect();
    
    # Name the table to be created
    postgreSQLTable = table_name # "admission_postgres";

    # Use try and except to catch errors 
    try:
        # Load data to postgres, if table exist append
        frame = df.to_sql(postgreSQLTable, postgreSQLConnection, if_exists='append', index=0); # 'fail'
    except ValueError as vx:
        print(vx)
    except Exception as ex:  
        print(ex)
    else:
        print(f'PostgreSQL Table, "{postgreSQLTable}", has been created successfully.');
    finally:
        postgreSQLConnection.close();
        
        
def extract_postgres_data(table):

    # Create an engine instance
    alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600);

    # Connect to PostgreSQL server
    dbConnection = alchemyEngine.connect();

    # Read data from PostgreSQL database table and load into a DataFrame instance
    sql = f"select * from \"{table}\""
    dataFrame = pd.read_sql(sql, dbConnection);

    pd.set_option('display.expand_frame_repr', False);

    # Print the DataFrame
    #print(dataFrame); 

    # Close the database connection
    dbConnection.close()
    return dataFrame

## Extract

The function extract data from multiple sources in batches. 

In [5]:
# Local file path to the the data
path = "../Dataverse/admission_local.csv"

# "../Assets/[name-of-asset]"

# Start s3 session
s3 = boto3.resource(
    service_name=service_name,
    region_name=region_name,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

def extract_data():
    """
    Ingest data from local folder, PostgresQL and AWS S3
    Input: None 
    Returns: 3 dataframes - local_data, s3_data, postgres_data
    """
    
    # Extract data from the local folder
    local_data = extract_local(path)
    
    # Extract data from the AWS s3 bucket
    x = contents(s3_bucket,'admission_csv_s3.csv') 
    s3_data = pd.read_csv(io.BytesIO(x))    
    
    # Extract data from PostgresQL 
    # SQLAlchemy
    postgres_data = extract_postgres_data('admission_postgres')
    
    # Psycopg2
    psycopg_df = extract_psycopg_data('admission_postgres')
    
    # Return the data as 3 Dataframes
    return local_data, s3_data, postgres_data, psycopg_df
    

In [6]:
# local_data, s3_data, postgres_data,  psycopg_df = extract_data()

In [7]:
# s3_data.head(2)

## Transform

After extracting the data, we’ll go on to the “Transform” phase of the process. 

This function will merge the three dataframes we created as a result of the extraction process.

In [8]:
def transform_data():
    """
    Transform data by merging the 3 dataframe
    Input: None 
    Output: Merged data as a dataframe
    """
    admission_data = s3_data.\
                        merge(postgres_data,on='Serial_Number').\
                        merge(local_data,on='Serial_Number')
    return admission_data

In [9]:
# transform_data = transform_data()

In [10]:
# transform_data.shape

## Load

It is time to load the data into the data warehouse where it can be used to generate insight. 

We are saving the pandas dataframe as a CSV in this scenario. 

We have gone through the steps of extracting and transforming. Finally, loading the data from various sources into a single target file.

In [11]:
def load_data():
    """
    Load the data to 
    - local file path
    - postgresQL
    - AWS S3
    """
    
    # Save transformed data as a csv to file path 
    transform_data.to_csv('new_data.csv', index=False)
    print("Dataframe is saved as CSV in the local folder.")
    
    # Save transformed data as a table on postgresQL
    load_postgres_table(postgres_data, "Merged_Data")
    
    # Save transformed data as an S3 Object
    csv_buffer = StringIO()
    transform_data.to_csv(csv_buffer, header=True, index=False)
    csv_buffer.seek(0)
    
    s3.Object(s3_bucket, 'new_data.csv').put(Body=csv_buffer.getvalue())
    print("Dataframe is saved as CSV in S3 bucket.")

In [12]:
# load_data()

### Logging

In [13]:
def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y' #Hour-Minute-Second-MonthName-Day-Year
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open("../Dataverse/logfile.txt","a") as f:
        f.write(timestamp + ',' + message + '\n')

### Running ETL Process

In [None]:
logfile    = "../Dataverse/logfile.txt"              

log("ETL Job Started")

log("Extract phase Started")

local_data, s3_data, postgres_data,  psycopg_df = extract_data()

log("Extract phase Ended")

log("Transform phase Started")

transform_data = transform_data()

log("Transform phase Ended")

log("Load phase Started")

load_data()

log("Load phase Ended")

log("ETL Job Ended")

## Extract, Transform and Load (ETL) with Python - API

This remaining part of this notebook will help you perform an ETL with python using an API as a data source.

Please get the following ready before you continue;

- Please install the necessary packages before you continue. 
    - ```!pip install pandas psycopg2 sqlalchemy```
    - Get your OpenWeatherMap API key here (https://openweathermap.org/appid).

This is the steps in this notebook.
- Import packages
- Initiate config parser; you will need it later for the credentials
- Utility function for PostgreSQL
- Extract, transform and load

## Extract, transform and load

##### Download data through an API - OpenWeatherMap API

- Bring out your API key, you need it here!

In [18]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import configparser
from io import StringIO
from io import BytesIO
import io
import boto3
from urllib import request
import json
import datetime
import csv
import random

"""
Retrieve the current weather forecast from OpenWeatherMap. FL#Lat 28.4717 Lon -80.5378 {'lat': 28.4717, 'lon': -80.5378}
"""

# lat, lon = 57.1499, 2.0938
def get_weather_forecast(coords={'lat': 57.1499, 'lon': 2.0938}):  # default location at Cape Canaveral,
    try:  
        ### Extract
        
        # retrieve forecast for specified coordinates
        # api_key = config.api_key  # replace with your own OpenWeatherMap API key
        api_key = "857fdc31d974e60756a1a81d06189a5a"
        
        url = f'https://api.openweathermap.org/data/2.5/forecast?lat={coords["lat"]}&lon={coords["lon"]}&appid={api_key}&units=metric'
        
        data = json.load(request.urlopen(url))
        
        # Uncomment next line to see how the data looks like. We need to select the data of interest.
        # print(data)
        
        ### Transform
        # Select city, country and periods in the data list
        forecast = {'city': data['city']['name'],  # city name
                    'country': data['city']['country'],  # country name
                    'periods': list()}  # list to hold forecast data for future periods
        
        
        for period in data['list'][0:23]:  # populate list with next 23 forecast periods
            forecast['periods'].append({'timestamp': datetime.datetime.fromtimestamp(period['dt']),
                                        'temp': round(period['main']['temp']),
                                        'description': period['weather'][0]['description'].title(),
                                        'icon': f'http://openweathermap.org/img/wn/{period["weather"][0]["icon"]}.png'})
        
        forecast = pd.DataFrame(forecast['periods'])
        #print(forecast['city'])
        #print(forecast['country'])
        ### Load
        
        load_postgres_table(forecast, "weatherdata")
                
        return forecast

    except Exception as e:
        print(e)

In [19]:
get_weather_forecast(coords={'lat': 57.1499, 'lon': 2.0938})

PostgreSQL Table, "weatherdata", has been created successfully.


Unnamed: 0,timestamp,temp,description,icon
0,2022-08-07 19:00:00,17,Broken Clouds,http://openweathermap.org/img/wn/04d.png
1,2022-08-07 22:00:00,16,Broken Clouds,http://openweathermap.org/img/wn/04n.png
2,2022-08-08 01:00:00,15,Overcast Clouds,http://openweathermap.org/img/wn/04n.png
3,2022-08-08 04:00:00,15,Overcast Clouds,http://openweathermap.org/img/wn/04n.png
4,2022-08-08 07:00:00,15,Overcast Clouds,http://openweathermap.org/img/wn/04d.png
5,2022-08-08 10:00:00,16,Broken Clouds,http://openweathermap.org/img/wn/04d.png
6,2022-08-08 13:00:00,16,Scattered Clouds,http://openweathermap.org/img/wn/03d.png
7,2022-08-08 16:00:00,17,Few Clouds,http://openweathermap.org/img/wn/02d.png
8,2022-08-08 19:00:00,17,Few Clouds,http://openweathermap.org/img/wn/02d.png
9,2022-08-08 22:00:00,16,Overcast Clouds,http://openweathermap.org/img/wn/04n.png


##### Download data through an API - Get a summary of a random article on Wikipedia

In [20]:
""" 
There is a public access to Wikipedia API. There is no need for a key.
"""

def get_wikipedia_article():
    """
    Retrieve the summary extract for a random Wikipedia article.
    Returns data as a json object in a list
    Transform the data to select what you need.
    """
    try:  # retrieve random Wikipedia article
        # Extract
        data = json.load(request.urlopen('https://en.wikipedia.org/api/rest_v1/page/random/summary'))
        
        # The data extracted is transformed and 
        # Title, summary of random article and the url is loaded 
        
        return {'title': data['title'],
                'extract': data['extract'],
                'url': data['content_urls']['desktop']['page']}
    
    # Catch errors if exists
    except Exception as e:
        print(e)

In [21]:
# test get_wikipedia_article()
print('\nTesting random Wikipedia article retrieval...')

article = get_wikipedia_article()
if article:
    print(f'\n{article["title"]}\n<{article["url"]}>\n{article["extract"]}')


Testing random Wikipedia article retrieval...

Herb Kohl
<https://en.wikipedia.org/wiki/Herb_Kohl>
Herbert H. Kohl is an American businessman and politician. Alongside his brother and father, the Kohl family created the Kohl's department stores chain, of which Kohl went on to be president and CEO. Kohl also served as a United States Senator from Wisconsin from 1989 to 2013 as a member of the Democratic Party. He chose not to seek re-election in 2012 and was succeeded by fellow Democrat Tammy Baldwin. Kohl is also the former owner of the Milwaukee Bucks of the National Basketball Association.


##### Download data through an API - Chicago Food Inspection Data

In [23]:
"""Download Chicago food inspection data
There is a public access to this API
Practice by running this code to download the data"""

from urllib.request import urlopen
import bz2

# The url for the API
url = (
    'https://data.cityofchicago.org/api/views/4ijn-s7e5/'
    'rows.csv?accessType=DOWNLOAD'
)

# The data is downloaded as a CSV file and saved in your 
# project's folder

with bz2.open('../Dataverse/food.csv.bz2', 'w') as out, urlopen(url) as resp:
    for i, line in enumerate(resp):
        if i > 3001:
            break
        out.write(line)