# **Building a Custom ETL Tool with Python**

ETL tool for integration can either be custom-coded or licensed. One of the tools for building a custom ETL tool is python. For this exercise we will be building an ETL by integrating data from multiple sources into a single data warehouse as a single source of truth. This exercise will involve extraction from range of sources including:

* Databases
* CSV files
* JSON files
* Excel files

The first step is the extraction, where data is extracted from these sources. these extracted data are then transformed to match the target.

**NOTE**: Before we begin please install `psycopg2` by running the code below



In [None]:
!pip install --upgrade psycopg2

# Section 1: **Extraction**

In [1]:
import pandas as pd
import numpy as np
import psycopg2 as ps

### **Extraction of sales table from database**

Remember the Iowa Liquor Database from the first bootcamp? It is useful to keep sales in a postgres database hosted on a server because it gets updated regularly. For us to use it we need to create a connection to the database and extract the information. There are several options for connecting to the database including `SQLAlchemy`, `SQLite3`, and `psycopg2` (a postgres adapter for python). Because we know our database is managed by Postgres, we can connect using `psycopg2`.

In [2]:
# Create a connection to the database

connection = ps.connect(#dbname="DA Bootcamp",
                        database="iowa_liquor_sales_database",
                        user="analytics_student",
                        password="analyticsga",
                        host="analyticsga-psql.generalassemb.ly",
                        port=5432
                        )

In [3]:
# Create a cursor for selection in the database
cur = connection.cursor()

We can write SQL commands in Python using the `psycopg2` execute command. Try the cell below to obtain the table names

In [4]:
query_name='''SELECT table_name
              FROM information_schema.tables
              WHERE table_schema='public'
              ORDER BY table_name'''

In [5]:
# Fetch the names of tables in the database.
try:
    cur.execute(query_name)
except:
    connection.rollback()
    cur.execute(query_name)
for table_name in cur.fetchall():
    name = table_name[0]
    print(name)

counties
products
sales
stores


In [7]:
query="""SELECT *
        FROM sales"""

In [None]:
# Create the sql query of interest
connection.rollback()
# Read the sql query using pandas and connection established.
sales = pd.read_sql_query(query, connection)
sales.head()

### Explore the sales data using pandas

In [None]:
sales.head()

In [None]:
sales.shape

In [None]:
sales.describe()

In [None]:
sales.dtypes

We are now going to extract all the information from the Iowa Liquor Sales Database into this workbook

### **Extraction of PRODUCTS table from a csv file**


Use `pd.read_csv`

In [None]:
products = pd.read_csv('products.csv')

**Explore the products data**

### **Extraction of STORES table from a json file**

Use `pd.read_json`

In [None]:
stores = pd.read_json('stores_as_json.json')

**Explore the stores data**

### **Extraction of COUNTIES table from an Excel file** 

Use `pd.read_excel`

In [None]:
counties = pd.read_excel('counties_2.xlsx')

**Explore the counties data**

### Extension

Experiment extracting information using different SQL queries with psycopg2, e.g. use filters and groupbys

# Section 2: **Transformation**

Now we have extracted the information we need to transform it before we can load it into one file. We will have to explore our data to find out what needs to be transformed.



## **Transformation of SALES table**

* Rename columns if needed
* Deduplicate table
* Validate data type and convert where necessary
* Check for null values and treat accordingly.

In [None]:
# We can use .info() to examine our data, are there any fields which show data types we do not expect?

sales.info()

Looks like `btl_price` and `state_btl_cost` are objects, when we need them to be floats. 

### Exercise

From the `btl_price`, and `state_btl_cost` fields:
* Remove the `$`
* Covert to float

If stuck, highlight the spaces below

<span style ='color:white'> sales['btl_price'] = sales.btl_price.apply(lambda x: float(x.replace('$','').replace(',', ''))) </span>


<span style ='color:white'>sales['state_btl_cost'] = sales.state_btl_cost.apply(lambda x: float(x.replace('$','').replace(',', ''))) 

In [None]:
sales.info()

### Exercise:

* Check for null values and deal with them accordingly using .isnull().sum()

### Exercise

* How many duplicate values are in the sales table?
* Optional: How many duplicates are in each field?

Is this expected? 



## **Transformation of `products` table**

* Rename columns if needed
* Deduplicate table
* Validate data type and convert where necessary
* Check for null values and treat accordingly.

In [None]:
products.info()

The profiling task from earlier, may have given you an idea which fields need to be transformed in the product table.

### Exercise

* drop the `age` field
* convert `list_date` to a `datetime` object
* convert `bottle_price` to a float (watch out for nulls)
* Check for duplicates

Double click on the spaces below if stuck

<span style ='color:white'> products.dropna(subset=['bottle_price'],inplace=True)</span>

<span style ='color:white'> products['bottle_price'] = products.bottle_price.apply(lambda x: float(x.replace('$','').replace(',', '')))</span>

In [None]:
products.info()

### Exercise

* Are there any fields in products which are NOT a foreign key in sales? 
* Drop these fields from the products table

## Transformation of COUNTIES and STORES tables

### Exercise

* Examine the Counties and Stores tables

* Check for and deal with null values
* Check for duplicates
* Validate data type and convert where necessary

# Section 3: Loading

Now that the transformation stage is complete, the next step is to load the the data into the data warehouse (or integrate it into one single file). 

Loading could involve `joining`, `concatenating` or just loading directly into a database. For our integration exercise, we will join the data into one single integrated file using pandas join functions. 

We need to identify joining fields on the extracted data.

### Exercise

SALES is the central table we will join the others to

* Which field in the STORES, COUNTIES and PRODUCTS tables are foreign keys in sales?
* Using join functions, create one large DataFrame which contains all the information from the 4 tables
* Explore the large dataset
* Use `.to_csv()` to save the large dataset as a CSV file


In [None]:
# join sales to product
data = sales.join(products.set_index('item_no'), on='item', how='left' )

In [None]:
data.shape

# Loading data into a BigQuery Database (Optional)

We can load our data into the database of our choice. For example we can load it into a BigQuery database. To run the code below, you will need to sign up for google bigquery and create projectgenerate bigquery credentials. There is a free version that you can use. Once you have created a project, you will need to generate the credentials to use for the code below. Detals on how to create a google cloud project is [here](https://cloud.google.com/resource-manager/docs/creating-managing-projects) and how to generate credentials [here](https://cloud.google.com/docs/authentication/getting-started)

You also need to install the google cloud BigQuery Library


In [None]:
!pip install google-cloud-bigquery

In [22]:
from google.cloud import bigquery

***Do not run the cells below until you have your credentials generated***

In [31]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = #path to your bigquery credential

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set dataset_id to the ID of the dataset to create.
dataset_id = "{}.warehouse_data".format(client.project)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset)  # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

In [None]:


# TODO(developer): Set table_id to the ID of the table to create.
table_id = f"{client.project}.warehouse_data.master_table"

schema = [
    bigquery.SchemaField("date", "DATE", mode="NULLABLE"),
    bigquery.SchemaField("convenience_store", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("store", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("county_number", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("county", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("category", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("category_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("vendor_no", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("vendor", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("item", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("pack", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("liter_size", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("state_btl_cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("btl_price", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("bottle_qty", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("item_description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("vendor_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("bottle_size", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("inner_pack", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("proof", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("list_date", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("upc", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("scc", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("bottle_price", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("shelf_price", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("case_cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("store_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("store_address", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("population", "INTEGER", mode="NULLABLE"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

In [None]:
client = bigquery.Client()
filename = "master_data.csv"
dataset_id = table.dataset_id
table_id = table.table_id


dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True

with open(filename, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

job.result()  # Waits for table load to complete.

print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))

**Now you can log on to your BigQuery to view your data**


**You can also connect to your BigQuery Datawarehouse right from Tableau and create a dashboard to help make business decisions**