# Project 2, Part 4, Validate data in the staging tables using SQL

University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering


# Included Modules and Packages

Code cell containing your includes for modules and packages

In [1]:
import csv

import json

import math
import numpy as np
import pandas as pd

import psycopg2

# Supporting code

Code cells containing any supporting code, such as connecting to the database, any functions, etc.  

Remember you can freely use any code from the labs. You do not need to cite code from the labs.

In [2]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [3]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [4]:
cursor = connection.cursor()

# 2.4.1 Validate the data types in the staging table stage_1_peak_sales

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that is is numeric
* sales_date - validate that it is a date
* sub_total - validate that it is numeric
* tax - validate that it is numeric
* total_amount - validate that it is numeric

Hint: make use of the operators: 
* xxxx::numeric
* xxxx::date

Sort by stage_id

The query should return 97 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||stage_id|sale_id|sale_date|sub_total|tax|total_amount|
|---|---|---|---|---|---|---|
|0|1|5763728874|2020-10-03|12|0|12|
|...|...|...|...|...|...|...|
|96|97|5763728673|2020-10-03|48|0|48|

In [5]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric,
       sale_id::numeric,
       sale_date::date,
       sub_total::numeric,
       tax::numeric,
       total_amount::numeric
from stage_1_peak_sales
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,sale_date,sub_total,tax,total_amount
0,1,5763728874,2020-10-03,12,0,12
1,2,5763729036,2020-10-03,72,0,72
2,3,5763728904,2020-10-03,24,0,24
3,4,5763728973,2020-10-03,96,0,96
4,5,5763728757,2020-10-03,108,0,108
...,...,...,...,...,...,...
92,93,5763728927,2020-10-03,72,0,72
93,94,5763729096,2020-10-03,48,0,48
94,95,5763729268,2020-10-03,84,0,84
95,96,5763729237,2020-10-03,60,0,60


# 2.4.2 Validate the data types in the staging table stage_1_peak_stores

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* location_id - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

The query should return 97 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||stage_id|sale_id|location_id|
|---|---|---|---|
|0|1|5763728874|12573|
|...|...|...|...|
|96|97|5763728673|12573|


In [6]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric,
       sale_id::numeric,
       location_id::numeric
from stage_1_peak_stores
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,location_id
0,1,5763728874,12573
1,2,5763729036,12573
2,3,5763728904,12573
3,4,5763728973,12573
4,5,5763728757,12573
...,...,...,...
92,93,5763728927,12573
93,94,5763729096,12573
94,95,5763729268,12573
95,96,5763729237,12573


# 2.4.3 Validate the data types in the staging table stage_1_peak_customers

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* customer_id - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

The query should return 97 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||stage_id|sale_id|customer_id|
|---|---|---|---|
|0|1|5763728874|3728404|
|...|...|...|...|
|96|97|5763728673|3728691|

In [7]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric,
       sale_id::numeric,
       customer_id::numeric
from stage_1_peak_customers
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,customer_id
0,1,5763728874,3728404
1,2,5763729036,3729309
2,3,5763728904,3728508
3,4,5763728973,3728534
4,5,5763728757,3729188
...,...,...,...
92,93,5763728927,3728568
93,94,5763729096,3728990
94,95,5763729268,3728901
95,96,5763729237,3729019


# 2.4.4 Validate the data types in the staging table stage_1_peak_line_items

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* line_item_id - validate that it is numeric
* product_id - validate that it is numeric
* price - validate that it is numeric
* quantity - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

The query should return 352 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||stage_id|sale_id|line_item_id|product_id|price|quantity|
|---|---|---|---|---|---|---|
|0|1|5763728874|1|42314680|12|1|
|...|...|...|...|...|...|...|
|351|352|5763728673|3|42314682|12|1|

In [8]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric,
       sale_id::numeric,
       line_item_id::numeric,
       product_id::numeric,
       price::numeric,
       quantity::numeric
from stage_1_peak_line_items
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,line_item_id,product_id,price,quantity
0,1,5763728874,1,42314680,12,1
1,2,5763729036,0,42314677,12,1
2,3,5763729036,1,42314682,12,3
3,4,5763729036,2,42314684,12,2
4,5,5763728904,0,42314680,12,1
...,...,...,...,...,...,...
347,348,5763729237,1,42314678,12,2
348,349,5763729237,2,42314682,12,2
349,350,5763728673,0,42314677,12,2
350,351,5763728673,1,42314678,12,1


# 2.4.5 Validate the math on sub_total, tax, and total_amount in stage_1_peak_sales

Generally, we do not expect any issues with the math.  Write a simple query that validates the math:

total_amount = sub_total + tax

It's usually best to write a query that will return the stage_id of rows with errors.  In our case, the query should return no rows, just the stage_id column name.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id


In [9]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_sales
where total_amount::numeric <> (sub_total::numeric + tax::numeric)
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id


# 2.4.6 Validate the math between stage_1_peak_sales and stage_1_peak_line_items

Generally, we do not expect any issues with the math.  Write a simple query that validates the math:

total_sales in stage_1_peak_sales matches the sum of (price x quantity) in stage_1_peak_line_items

It's usually best to write a query that will return stage_id of rows with errors.  In our case, the query should return no rows, just the stage_id column name.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id


In [10]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_sales as sa
where total_amount::numeric <> (select sum(price::numeric * quantity::numeric)
                                from stage_1_peak_line_items as l
                                where sa.sale_id = l.sale_id) 
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id


# 2.4.7 Validate the tax is always zero in stage_1_peak_sales

It's usually best to write a query that will return stage_id of rows with errors.  In our case, the query should return no rows, just the stage_id column name.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id


In [11]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_sales
where tax::numeric <> 0
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id


# 2.4.8 Validate the price is always 12 in stage_1_peak_line_items

It's usually best to write a query that will return the stage_id of rows with errors.  In our case, the query should return no rows, just the column name of stage_id.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id


In [12]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_line_items
where price::numeric <> 12
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id


# 2.4.9 Validate taxable is always N in stage_1_peak_line_items

It's usually best to write a query that will return the stage_id of rows with errors.  In our case, the query should return no rows, just the stage_id column name.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs. 

In [13]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_line_items
where taxable::char <> 'N'
order by stage_id
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id


# 2.4.10 Validate the store is the same for all in stage_1_peak_stores

It's usually best to write a query that will return rows with errors.  In our case, the query should return no rows, just the column headers.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Consider grouping by location_id, name, street, city, state, and zip to find the number_of_duplicates.  Consider using a having clause to verify that the number_of_duplicates is not equal to the number of records in the staging table.

This will not return the stage_id header as the previous queries have.  It would return location_id, name, street, city, state, zip, and number_of_duplicates with no rows.


In [26]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select location_id,
       name,
       street,
       city,
       state,
       zip,
       count(*) number_of_duplicates
from stage_1_peak_stores
group by location_id,
         name,
         street,
         city,
         state,
         zip
having count(*) <> (select count(*) from stage_1_peak_stores)
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,location_id,name,street,city,state,zip,number_of_duplicates


# 2.4.11 Validate the product_id in stage_1_peak_line_items against peak_product_mapping

It's usually best to write a query that will return the stage_id of rows with errors.  In our case, the query should return no rows, just the stage_id column name.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id


In [45]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select stage_id::numeric
from stage_1_peak_line_items
where product_id::numeric not in (select peak_product_id from peak_product_mapping)
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id
