# 2. Building ETL Pipelines

Dive into leveraging pandas to extract, transform, and load data as you build your first data pipelines. Learn how to make your ETL logic reusable, and apply logging and exception handling to your pipelines.

## Libraries

In [38]:
# Common
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Parquet Files
import parquet as pq
import fastparquet

# SQL
import sqlalchemy
import psycopg2

# Pandas SQL
import pandasql as ps

# Logging
import logging

## User Variables

In [30]:
sales_data_csv_path = "../Datasets/sales_data.csv"
sales_data = pd.read_csv(sales_data_csv_path)

clean_sales_data_csv_path = "../Datasets/clean_sales_data.csv"
clean_sales_data = pd.read_csv(clean_sales_data_csv_path)

sales_data_pq_path = "../Datasets/sales_data.parquet"
sales_data.to_parquet(sales_data_pq_path)

# Exercises

## 1. Extracting data from parquet files

### Description

One of the most common ways to ingest data from a source system is by reading data from a file, such as a CSV file. As data has gotten bigger, the need for better file formats has brought about new column-oriented file types, such as parquet files.

In this exercise, you'll practice extracting data from a parquet file.

### Instructions

* Read the parquet file at the path ``"sales_data.parquet"`` into a ``pandas`` DataFrame.
* Check the data types of the DataFrame via ``print()``ing.
* Output the shape of the DataFrame, as well as it's head.

In [5]:
import pandas as pd

# Read the sales data into a DataFrame
sales_data = pd.read_parquet(sales_data_pq_path, engine="fastparquet")

# Check the data type of the columns of the DataFrames
print(sales_data.dtypes)

# Print the shape of the DataFrame, as well as the head
print(sales_data.shape)
print(sales_data.head())

Order ID              int64
Product              object
Quantity Ordered      int64
Price Each          float64
Order Date           object
Purchase Address     object
dtype: object
(282, 6)
   Order ID                 Product  Quantity Ordered  Price Each  \
0    259358  34in Ultrawide Monitor                 1      379.99   
1    259359  27in 4K Gaming Monitor                 1      389.99   
2    259360  AAA Batteries (4-pack)                 2        2.99   
3    259361        27in FHD Monitor                 1      149.99   
4    259362        Wired Headphones                 1       11.99   

       Order Date                           Purchase Address  
0  10/28/19 10:56            609 Cherry St, Dallas, TX 75001  
1  10/28/19 17:26          225 5th St, Los Angeles, CA 90001  
2  10/24/19 17:20       967 12th St, New York City, NY 10001  
3  10/14/19 22:26  628 Jefferson St, New York City, NY 10001  
4   10/7/19 16:10         534 14th St, Los Angeles, CA 90001  


Great job! Not only have you imported a parquet file, but you've investigated the DataFrame's metadata. This will help when tranforming data.

## 2. Pulling data from SQL databases

### Description

SQL databases are one of the most used data storage tools in the world. Many companies have teams of several individuals responsible for creating and maintaining these databases, which typically store data crucial for day-to-day operations. These SQL databases are commonly used as source systems for a wide range of data pipelines.

For this exercise, ``pandas`` has been imported as ``pd``. Best of luck!

### Instructions

* Update the connection URI to create a connection engine for the ``sales`` database, using ``sqlalchemy``.
* Query all rows and columns of the ``sales`` table and output the results.

In [6]:
import sqlalchemy

# Create a connection to the sales database

# db_engine = sqlalchemy.create_engine("postgresql+psycopg2://repl:password@localhost:5432/sales") # Requires PostgreSQL Server running

# Query all rows and columns of the sales table
# raw_sales_data = pd.read_sql("SELECT * FROM sales", db_engine)
# print(raw_sales_data)

In [8]:
# Alternative: PandaSQL

query = "SELECT * FROM sales"
raw_sales_data = ps.sqldf(query, {"sales": sales_data})
print(raw_sales_data)

     Order ID                   Product  Quantity Ordered  Price Each  \
0      259358    34in Ultrawide Monitor                 1      379.99   
1      259359    27in 4K Gaming Monitor                 1      389.99   
2      259360    AAA Batteries (4-pack)                 2        2.99   
3      259361          27in FHD Monitor                 1      149.99   
4      259362          Wired Headphones                 1       11.99   
..        ...                       ...               ...         ...   
277    259623  Apple Airpods Headphones                 1      150.00   
278    259624    34in Ultrawide Monitor                 1      379.99   
279    259625    AAA Batteries (4-pack)                 1        2.99   
280    259626    AAA Batteries (4-pack)                 2        2.99   
281    259627    AAA Batteries (4-pack)                 1        2.99   

         Order Date                           Purchase Address  
0    10/28/19 10:56            609 Cherry St, Dallas, TX 7

Pretty cool, huh? Being able to leverage Python to pull directly from a SQL database will turbo-charge your data pipelines.

### 3. Building functions to extract data

### Desccription

It's important to modularize code when building a data pipeline. This helps to make pipelines more readable and reusable, and can help to expedite troubleshooting efforts. Creating and using functions for distinct operations in a pipeline can even help when getting started on a new project by providing a framework to begin development.

``pandas`` has been imported as ``pd``, and ``sqlalchemy`` is ready to be used.

### Instructions

* Complete the connection URI with port ``5432`` and database ``"sales"``.
* Pass the URI to the appropriate ``sqlalchemy`` function to create a connection engine.
* Use ``pandas`` to query the ``sales`` tables for all columns and records with ``"quantity_ordered"`` equal to 1.
* Print the head of the DataFrame, and return the extracted data. Then, execute the ``extract()`` function.

In [11]:
def extract():
  	# Create a connection URI and connection engine
    
    # connection_uri = "postgresql+psycopg2://repl:password@localhost:5432/sales"
    # db_engine = sqlalchemy.create_engine(connection_uri)

    # Query the DataFrame to return all records with quantity_ordered equal to 1
    # raw_data = pd.read_sql("SELECT * FROM sales WHERE quantity_ordered = 1", db_engine)

    # Alternative: PandaSQL
    query = "SELECT * FROM sales"
    raw_data = ps.sqldf(query, {"sales": sales_data})

    # Print the head of the DataFrame
    print(raw_data.head())
    
    # Return the extracted DataFrame
    return raw_data
    
# Call the extract() function
raw_sales_data = extract()


   Order ID                 Product  Quantity Ordered  Price Each  \
0    259358  34in Ultrawide Monitor                 1      379.99   
1    259359  27in 4K Gaming Monitor                 1      389.99   
2    259360  AAA Batteries (4-pack)                 2        2.99   
3    259361        27in FHD Monitor                 1      149.99   
4    259362        Wired Headphones                 1       11.99   

       Order Date                           Purchase Address  
0  10/28/19 10:56            609 Cherry St, Dallas, TX 75001  
1  10/28/19 17:26          225 5th St, Los Angeles, CA 90001  
2  10/24/19 17:20       967 12th St, New York City, NY 10001  
3  10/14/19 22:26  628 Jefferson St, New York City, NY 10001  
4   10/7/19 16:10         534 14th St, Los Angeles, CA 90001  


Awesome work! Modularizing logic in a data pipeline helps to make your code reusable and easier to troubleshoot.

## 3. Filtering pandas DataFrames

### Description

Once data has been extracted from a source system, it's time to transform it! Often, source data may have more information than what is needed for downstream use cases. If this is the case, dimensionality should be reduced during the "transform" phase of the data pipeline.

``pandas`` has been imported as ``pd``, and the ``extract()`` function is available to load a DataFrame from the path that is passed.

### Instructions

* Use the ``extract()`` function to load the DataFrame stored in the ``"sales_data.parquet"`` path.
* Update the ``transform()`` function to return all rows and columns with ``"Quantity Ordered"`` greater than 1.
* Further filter the ``clean_data`` DataFrame to only include columns ``"Order Date"``, ``"Quantity Ordered"`` and ``"Purchase Address"``.
* Return the filtered DataFrame.

In [13]:
def extract(file_path):
    return pd.read_parquet(file_path)

In [14]:
# Extract data from the sales_data.parquet path
raw_sales_data = extract(sales_data_pq_path)

def transform(raw_data):
  	# Only keep rows with `Quantity Ordered` greater than 1
    clean_data = raw_data.loc[raw_data["Quantity Ordered"] > 1, :]
	
    # Only keep columns "Order Date", "Quantity Ordered", and "Purchase Address"
    clean_data = clean_data.loc[:, ["Order Date", "Quantity Ordered", "Purchase Address"]]
	
    # Return the filtered DataFrame
    return clean_data
    
transform(raw_sales_data)


Unnamed: 0,Order Date,Quantity Ordered,Purchase Address
2,10/24/19 17:20,2,"967 12th St, New York City, NY 10001"
41,10/31/19 19:06,2,"263 Willow St, San Francisco, CA 94016"
44,10/9/19 20:27,3,"11 Lakeview St, Seattle, WA 98101"
52,10/1/19 21:32,2,"110 Hill St, San Francisco, CA 94016"
57,10/16/19 19:13,2,"280 8th St, Portland, OR 97035"
63,10/18/19 8:54,2,"791 Forest St, Los Angeles, CA 90001"
68,10/31/19 22:54,2,"64 Lake St, Atlanta, GA 30301"
80,10/26/19 0:30,2,"23 Walnut St, San Francisco, CA 94016"
93,10/18/19 21:47,2,"145 River St, Los Angeles, CA 90001"
113,10/6/19 12:22,3,"512 1st St, Austin, TX 73301"


Perfect transformation! Removing rows from a DataFrame is one of the most common operations you'll perform when working with DataFrames. Keep up the great work!

## 4. Transforming sales data with pandas

### Description

Before insights can be extracted from a dataset, column types may need to be altered to properly leverage the data. This is especially common with temporal data types, which can be stored in several different ways.

For this example, ``pandas`` has been import as ``pd`` and is ready for you to use.

### Instructions

* Update the ``transform()`` function to convert data in the ``"Order Date"`` column to type datetime.
* Filter the DataFrame to only contain rows with ``"Price Each"`` less than ten dollars.
* Print the data types of each column in the DataFrame.

In [17]:
def extract(file_path):
    # Ingest the data to a DataFrame
    
    raw_data = pd.read_csv(file_path)
    
    # Return the DataFrame
    return raw_data

In [18]:
raw_sales_data = extract(sales_data_csv_path)

def transform(raw_data):
    # Convert the "Order Date" column to type datetime
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")
    
    # Only keep items under ten dollars
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]
    return clean_data

clean_sales_data = transform(raw_sales_data)

# Check the data types of each column
print(clean_sales_data.dtypes)


Order ID                     int64
Product                     object
Quantity Ordered             int64
Price Each                 float64
Order Date          datetime64[ns]
Purchase Address            object
dtype: object


Very well done! Datestamps and timestamps are everywhere in data, and they can get messy. Being able to transform them is an invaluable skill!

## 5. Validating data transformations

### Description

Great work so far! Manually spot-checking transformations is a great first step to ensuring that you're maintaining data quality throughout a pipeline. ``pandas`` offers several built-in functions to help you with just that!

To help get you started with this exercise, ``pandas`` has been imported as ``pd``.

### Instructions

* Update the ``extract()`` function to read the parquet file stored in the ``file_path`` parameter into a DataFrame.
* Update the ``transform()`` function to return the ``"Order ID"``, ``"Price Each"`` and ``"Quantity Ordered"`` columns for all items with a ``"Quantity Ordered"`` equal to 1.
* Call the ``transform()`` function on the ``raw_sales_data`` DataFrame.

In [21]:
def extract(file_path):
  	# Ingest the data to a DataFrame
    raw_data = pd.read_parquet(file_path)
    
    # Return the DataFrame
    return raw_data
  
raw_sales_data = extract(sales_data_pq_path)

def transform(raw_data):
  	# Filter rows and columns
    clean_data = raw_data.loc[raw_data["Quantity Ordered"] == 1, ["Order ID", "Price Each", "Quantity Ordered"]]
    return clean_data

# Transform the raw_sales_data
clean_sales_data = transform(raw_sales_data)

In [26]:
sorted(clean_sales_data["Price Each"], reverse=True)[:2]

[1700.0, 1700.0]

### Question

What is the value of the price ``"Price Each"`` column of the two most expensive items in the transformed DataFrame? The ``clean_sales_data`` DataFrame has been loaded for you, and you can use the console to explore it further.

### Answer

`1700` as shown above. 

Congrats! You've not only built the 'extract' and 'load' components of a pipeline, but you've taken steps to validate the logic that you've implemented. Great work!

## 6. Loading sales data to a CSV file

### Description

Loading data is an essential component of any data pipeline. It ensures that any data consumers and processes have reliable access to data that you've extracted and transformed earlier in a pipeline. In this exercise, you'll practice loading transformed sales data to a CSV file using ``pandas``, which has been imported as ``pd``. In addition to this, the raw data has been extracted and is available in the DataFrame ``raw_sales_data``.

### Instructions

* Filter the ``raw_sales_data`` DataFrame to only keep all items with a price less than 25 dollars.
* Update the ``load()`` function to write the transformed sales data to a file named ``"transformed_sales_data.csv"``, making sure not include the ``index`` column.
* Call the ``load()`` function on the cleaned Data Frame.

In [27]:
def transform(raw_data):
	# Find the items prices less than 25 dollars
	return raw_data.loc[raw_data["Price Each"] < 25, ["Order ID", "Product", "Price Each", "Order Date"]]

def load(clean_data):
	# Write the data to a CSV file without the index column
	clean_data.to_csv("transformed_sales_data.csv", index=False)


clean_sales_data = transform(raw_sales_data)

# Call the load function on the cleaned DataFrame
load(clean_sales_data)

Fantastic work! You've loaded data that had been extracted and transformed to a CSV file, where it can be used by data consumers.

## 7. Customizing a CSV file

### Description

Sometimes, data needs to be stored in a CSV file in a customized manner. This may include using different header values, including or excluding the index column of a DataFrame, or altering the character used to separate columns. In this example, you'll get to practice this, as well as ensuring that the file is stored in the desired file path.

The ``pandas`` library has been imported as ``pd``, and the data has already been transformed to include only rows with a "Quantity Ordered" greater than one. The cleaned DataFrame is stored in a variable named ``clean_sales_data``.

### Instructions

* Import the ``os`` library.
* Write the cleaned DataFrame to a CSV stored at ``path_to_write``, without a header.
* Ensure the file was written to the desired path.

In [35]:
# Import the os library
import os

# Load the data to a csv file with the index, no header and pipe separated
def load(clean_data, path_to_write):
	clean_data.to_csv(path_to_write, header=False, sep="|")

load(clean_sales_data, "../Datasets/clean_sales_data.csv")

# Check that the file is present.
file_exists = os.path.exists("../Datasets/clean_sales_data.csv")
print(file_exists)

True


Perfect! You've successfully written a DataFrame to a CSV file with custom options, and validated that the file was written to the desired path.

## 8. Persisting data to files

### Description

Loading data to a final destination is one of the most important steps of a data pipeline. In this exercise, you'll use the ``transform()`` function shown below to transform product ``sales`` data before loading it to a ``.csv`` file. This will give downstream data consumers a better view into total sales across a range of products.

For this exercise, the ``sales`` data has been loaded and transformed, and is stored in the ``clean_sales_data`` DataFrame. The ``pandas`` package has been imported as ``pd``, and the ``os`` library is also ready to use!

### Instructions

* Update the ``load()`` function to write data to the provided path, without headers or an index column.
* Check to make sure the file was loaded to the desired file path.
* Call the function to load the transformed data to persistent storage.

In [37]:
def load(clean_data, file_path):
    # Write the data to a file
    clean_data.to_csv(file_path, header=False, index=False)

    # Check to make sure the file exists
    file_exists = os.path.exists(file_path)
    if not file_exists:
        raise Exception(f"File does NOT exists at path {file_path}")

# Load the transformed data to the provided file path
load(clean_sales_data, "../Datasets/transformed_sales_data.csv")

Great job! You've successfully loaded a DataFrame to a .csv file and ensured the file existed after writing it. This is an essential component of building reliable data pipelines.

## 9. Logging within a data pipeline

### Description

In this exercise, we'll take a look back at the function you wrote in a previous video and practice adding logging to the function. This will help when troubleshooting errors or making changes to the logic!

``pandas`` has been imported as ``pd``. In addition to this, the ``logging`` module has been imported, and the default log-level has been set to ``"debug"``.

### Instructions

* Create an info-level log after the transformation, passing the string: ``"Transformed 'Order Date' column to type 'datetime'."``
* Log the ``.shape`` of the DataFrame at the debug-level before and after filtering. 

In [39]:
def transform(raw_data):
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]
    
    # Create an info log regarding transformation
    logging.info("Transformed 'Order Date' column to type 'datetime'.")
    
    # Create debug-level logs for the DataFrame before and after filtering
    logging.debug(f"Shape of the DataFrame before filtering: {raw_data.shape}")
    logging.debug(f"Shape of the DataFrame after filtering: {clean_data.shape}")
    
    return clean_data
  
clean_sales_data = transform(raw_sales_data)

Nicely done! Creating logs to provide meaningful output can save a lot of time when a pipeline fails or when a change needs to be made.

## 10. Handling exceptions when loading data

### Description

Sometimes, your data pipelines might throw an exception. These exceptions are a form of alerting, and they let a Data Engineer know when something unexpected happened. It's important to properly handle these exceptions. In this exercise, we'll practice just that!

To help get you started, ``pandas`` has been imported as ``pd``, along with the ``logging`` module has been imported. The default log-level has been set to ``"debug"``.

### Instructions

* Update the pipeline to include a ``try`` block, and attempt to read the data from the path ``"sales_data.parquet"``.
* Catch a ``FileNotFoundError`` if the file is not able to be read into a ``pandas`` DataFrame.
* Create an error-level log to document the failure.

In [40]:
def extract(file_path):
    return pd.read_parquet(file_path)

# Update the pipeline to include a try block
try:
	# Attempt to read in the file
    raw_sales_data = extract(sales_data_pq_path)
	
# Catch the FileNotFoundError
except FileNotFoundError as file_not_found:
	# Write an error-level log
	logging.error(file_not_found)

Wonderful! Incorporating try-except logic in your pipelines is the foundation for more advanced monitoring and alerting solutions.

## 11. Monitoring and alerting within a data pipeline

### Description

It's time to put it all together! You might have guessed it, but using handling errors using try-except and logging go hand-in-hand. These two practices are essential for a pipeline to be resilient and transparent, and are the building blocks for more advanced monitoring and alerting solutions.

``pandas`` has been imported as ``pd``, and the ``logging`` module has been loaded and configured for you. The ``raw_sales_data`` DataFrame has been extracted, and is ready to be transformed.

### Instructions

* Create an info-level logging message to document success, and a warning-level logging message if the transformation fails.
* Update the ``try-except`` clause to catch a ``KeyError``, and alias as ``ke``.
* Change the warning-level log to include the error being thrown.
* If a key error is thrown, create a column ``"Total Price"`` by multiplying the ``"Price Each"`` and ``"Quantity Ordered"`` columns.

In [54]:
sales_data_csv_path = "../Datasets/sales_data_backup.csv"
raw_sales_data = pd.read_csv(sales_data_csv_path)
raw_sales_data.head()

Unnamed: 0,Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
0,259358,34in Ultrawide Monitor,1,379.99,10/28/19 10:56,"609 Cherry St, Dallas, TX 75001"
1,259359,27in 4K Gaming Monitor,1,389.99,10/28/19 17:26,"225 5th St, Los Angeles, CA 90001"
2,259360,AAA Batteries (4-pack),2,2.99,10/24/19 17:20,"967 12th St, New York City, NY 10001"
3,259361,27in FHD Monitor,1,149.99,10/14/19 22:26,"628 Jefferson St, New York City, NY 10001"
4,259362,Wired Headphones,1,11.99,10/7/19 16:10,"534 14th St, Los Angeles, CA 90001"


In [55]:
def transform(raw_data):
	return raw_data.loc[raw_data["Total Price"] > 1000, :]

try:
	# Attempt to transform DataFrame, log an info-level message
	clean_sales_data = transform(raw_sales_data)
	logging.info("Successfully filtered DataFrame by 'Total Price'")
# Update the exception to be a KeyError, alias as ke
except KeyError as ke:
	logging.warning(f"{ke}: Cannot filter DataFrame by 'Total Price'")
	
	# Create the "Total Price" column, transform the updated DataFrame
	raw_sales_data["Total Price"] = raw_sales_data["Price Each"] * raw_sales_data["Quantity Ordered"]
	clean_sales_data = transform(raw_sales_data)

'''except Exception:
	# Log a warning-level message
	logging.warning("Cannot filter DataFrame by 'Total Price'")'''





Great job! Adding monitoring and alerting to your data pipelines will help to build trust in your solutions. Keep up the great work!