# Building an ETL Pipeline

## Importing Libraries

In [8]:
import pandas as pd # Data Transformation
import requests     # Establishing connection with the web (API)
import json         # Exposes an API familiar to users of the standard library marshal and pickle modules
import pytest       # Makes it easy to write small, readable tests
import os
from dotenv import load_dotenv
import sqlalchemy
from sqlalchemy.engine import Engine, create_engine

## [E] Extraction

### API
- Data Source: https://docs.coincap.io/#89deffa0-ab03-4e0a-8d92-637a857d2c91
- Data comes in a JSON format (semi-structured data) from a 3rd Party

### Extraction Function

In [9]:
def extract(api_get_request_url):
    """
    Create and return a raw JSON extracted from an external API via GET Request.

    Args:
        api_get_request_url: The URL for API's GET Request.

    Returns:
        raw_data_json: a raw JSON with the API data.
    """
    
    # GET Request. Package the request, send the request and catch the response r
    r = requests.get(api_get_request_url)
    
    # Check if the request was successful
    if r.status_code == 200:
        # Extract JSON data from response: Decode the JSON data into a dictionary
        raw_data_json = r.json()
    else:
        print(f"Error. Non-success status code: {r.status_code}")
    
    # Return the json
    return raw_data_json

# Call the extract() function
raw_data_json = extract("https://api.coincap.io/v2/assets")

In [10]:
# Quick visualization
print("Printing JSON names:")
for key, value in raw_data_json.items():
    print(key, ":", raw_data_json[key])

Printing JSON names:
data : [{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '19707593.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1373016268522.9596935142235751', 'volumeUsd24Hr': '7541407038.6911341429067854', 'priceUsd': '69669.4045042923148207', 'changePercent24Hr': '-0.3423378138310054', 'vwap24Hr': '69248.9917619300781134', 'explorer': 'https://blockchain.info/'}, {'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '120146771.9777860300000000', 'maxSupply': None, 'marketCapUsd': '454570794718.2490231060028494', 'volumeUsd24Hr': '4546338734.8446519367671548', 'priceUsd': '3783.4624038196776809', 'changePercent24Hr': '-1.3256030249437774', 'vwap24Hr': '3779.9103854636110771', 'explorer': 'https://etherscan.io/'}, {'id': 'tether', 'rank': '3', 'symbol': 'USDT', 'name': 'Tether', 'supply': '112286364258.1122000000000000', 'maxSupply': None, 'marketCapUsd': '112307378362.8632859803908257', 'volum

## [T] Transformation
* Data Normalization
    * Transforming the JSON into a table (as a Dataframe)
* Data Exploration
    * Check for: Missing Data, Data Types.
* Data Cleaning

### Initial Exploration
* Exploring the JSON name-value pairs


In [11]:
# Exploring the JSON structure
print("This is the Json's type from the API request:", type(raw_data_json))
print("It's a dict, so let's check how many keys it has:", len(raw_data_json))

# Saving Keys and Values as Lists
raw_data_json_keys_ls = list(raw_data_json.keys())
raw_data_json_values_ls = list(raw_data_json.values())

print("Let's print both keys:", raw_data_json_keys_ls)
for key, value in raw_data_json.items():
    print("->", key, ":", raw_data_json[key])

print("It is a nested JSON, with two key-value pais: data and timestamp.")
print("The actual information comes in the 'data' key along with its associated 'timestamp' as another key.")
print("How many key-value pairs in the 'data' key:", len(raw_data_json_values_ls[0]))
print("Let's explore the first element of 'data':", raw_data_json_values_ls[0][0])
potential_columns = len(raw_data_json_values_ls[0][0])
potential_rows = len(raw_data_json_values_ls[0])
print(f"-> There are {potential_columns} features in a single record of 'data'.")
print(f"-> In total, there are {potential_rows} records stored in 'data' and each record has {potential_columns} features.")
print("We should see the same when storing in Dataframe.")


This is the Json's type from the API request: <class 'dict'>
It's a dict, so let's check how many keys it has: 2
Let's print both keys: ['data', 'timestamp']
-> data : [{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '19707593.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1373016268522.9596935142235751', 'volumeUsd24Hr': '7541407038.6911341429067854', 'priceUsd': '69669.4045042923148207', 'changePercent24Hr': '-0.3423378138310054', 'vwap24Hr': '69248.9917619300781134', 'explorer': 'https://blockchain.info/'}, {'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '120146771.9777860300000000', 'maxSupply': None, 'marketCapUsd': '454570794718.2490231060028494', 'volumeUsd24Hr': '4546338734.8446519367671548', 'priceUsd': '3783.4624038196776809', 'changePercent24Hr': '-1.3256030249437774', 'vwap24Hr': '3779.9103854636110771', 'explorer': 'https://etherscan.io/'}, {'id': 'tether', 'rank': '3', 'symbol': 'US

### Transformation Function
* Note: the list of records are stored in the 'data' key of the JSON file, as seen above.

In [12]:
def transform(raw_data_json):
    """
    Create and return a raw dataset in the DataFrame format based on a JSON.

    Args:
        raw_data_json: The raw json returned by the extraction function.

    Returns:
        raw_normalized_data_df: a raw, normalized dataset in the DataFrame format.
    """    
    
    # Data Normalization
    # record_path = "data": path to list of records.
    # sep = ".": Nested records will generate names separated by 'sep'.
    raw_normalized_data_df = pd.json_normalize(raw_data_json, record_path = "data", sep = '.') 

    # Assert that the transform function returns a pd.DataFrame
    assert isinstance(raw_normalized_data_df, pd.DataFrame)
    
    return raw_normalized_data_df

# Try-except block for the extract function
try:
    # Transform the raw_testing_scores DataFrame
    raw_normalized_data_df = transform(raw_data_json)
except:
    print("There is an error with the transform function.")

In [13]:
# Checking the Raw Dataframe
print("The JSON was transformed into a Dataframe:", type(raw_normalized_data_df))
print(f"It has {raw_normalized_data_df.shape[1]} columns and {raw_normalized_data_df.shape[0]} records.") 
print("-> the JSON and the Dataframes have the same number of columns and rows. Great!")
print(raw_normalized_data_df.head())

The JSON was transformed into a Dataframe: <class 'pandas.core.frame.DataFrame'>
It has 12 columns and 100 records.
-> the JSON and the Dataframes have the same number of columns and rows. Great!
             id rank symbol      name                         supply  \
0       bitcoin    1    BTC   Bitcoin      19707593.0000000000000000   
1      ethereum    2    ETH  Ethereum     120146771.9777860300000000   
2        tether    3   USDT    Tether  112286364258.1122000000000000   
3  binance-coin    4    BNB       BNB     166801148.0000000000000000   
4        solana    5    SOL    Solana     459807524.0144820000000000   

                    maxSupply                    marketCapUsd  \
0   21000000.0000000000000000  1373016268522.9596935142235751   
1                        None   454570794718.2490231060028494   
2                        None   112307378362.8632859803908257   
3  166801148.0000000000000000   108218988621.3032104721853980   
4                        None    76433017160.4

## [L] Load
* Loading data to Postgres.
    * Open a SQL connection with SQLAlchemy
    * .to_sql()
* Data Quality checks:
    * Validate that data was correctly persisted in postgres
        * Ensure it can be queried
            * pd.read_sql()
        * Make sure counts match
        * Validate each row is present

In [20]:
# Create SQL Engine based on SQLAlchemy
def create_db_engine(connection_uri: str) -> Engine: # Arrow indicates that the function returns an object of type Engine
    """
    Create and return a SQLAlchemy engine based on the provided connection URI.

    Args:
        connection_uri (str): The connection URI for the database.

    Returns:
        Engine: A SQLAlchemy engine connected to the specified database.
    """
    db_engine = create_engine(connection_uri)
    return db_engine

# Load environment variables from .env file
load_dotenv()

# Retrieve individual components from environment variables
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
host = os.getenv('POSTGRES_HOST')
db_name = os.getenv('POSTGRES_DB')

# Ensure the connection URI is retrieved successfully
if not all([user, password, host, db_name]):
    raise ValueError("One or more environment variables for the database connection are not set")

# Construct the connection URI
connection_uri = f"postgresql://{user}:{password}@{host}/{db_name}"

# Ensure the connection URI is retrieved successfully
if connection_uri is None:
    raise ValueError("DATABASE_URL environment variable is not set")

# Create the database engine
db_engine = create_db_engine(connection_uri)

In [26]:
# Load/Persist data in Postgres
def load(raw_normalized_data_df, con_engine):
    """
    Load/persist data in PostgreSQL database.

    Args:
        raw_normalized_data_df (DataFrame): The DataFrame containing the data to be loaded.
        con_engine (Engine): SQLAlchemy engine for database connection.

    Returns:
        None
    """
    # to_sql: Write records stored in a DataFrame to a SQL database.
    raw_normalized_data_df.to_sql(name="crypto_mkt", con=con_engine, if_exists="replace", index=False)

# Call the load function to load the transformed data to persistent storage
load(raw_normalized_data_df, db_engine)

# Query the data in the crypto_mkt table, check the head of the DataFrame
to_validate = pd.read_sql("SELECT * FROM crypto_mkt", con=db_engine)
print(to_validate.head())


             id rank symbol      name                         supply  \
0       bitcoin    1    BTC   Bitcoin      19707593.0000000000000000   
1      ethereum    2    ETH  Ethereum     120146771.9777860300000000   
2        tether    3   USDT    Tether  112286364258.1122000000000000   
3  binance-coin    4    BNB       BNB     166801148.0000000000000000   
4        solana    5    SOL    Solana     459807524.0144820000000000   

                    maxSupply                    marketCapUsd  \
0   21000000.0000000000000000  1373016268522.9596935142235751   
1                        None   454570794718.2490231060028494   
2                        None   112307378362.8632859803908257   
3  166801148.0000000000000000   108218988621.3032104721853980   
4                        None    76433017160.4406353473127080   

                  volumeUsd24Hr                priceUsd    changePercent24Hr  \
0   7541407038.6911341429067854  69669.4045042923148207  -0.3423378138310054   
1   4546338734.8

### Data Exploration

In [24]:
# Data Exploration
raw_normalized_data_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 12 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   id                 100 non-null    object
 1   rank               100 non-null    object
 2   symbol             100 non-null    object
 3   name               100 non-null    object
 4   supply             100 non-null    object
 5   maxSupply          56 non-null     object
 6   marketCapUsd       100 non-null    object
 7   volumeUsd24Hr      100 non-null    object
 8   priceUsd           100 non-null    object
 9   changePercent24Hr  100 non-null    object
 10  vwap24Hr           100 non-null    object
 11  explorer           99 non-null     object
dtypes: object(12)
memory usage: 9.5+ KB


#### Handling Missing Data

In [14]:
# # GET Request. Package the request, send the request and catch the response r
# api_url = "https://api.coincap.io/v2/assets"
# r = requests.get(api_url)

# # Check if the request was successful
# if r.status_code == 200:
#     # Extract JSON data from response: Decode the JSON data into a dictionary
#     crypto_data_json = r.json()
# else:
#     print(f"Error. Non-success status code: {r.status_code}")