# Data Wrangling 2.1

In [2]:
import math
import numpy as np
import pandas as pd

import psycopg2

import json

import csv

from datetime import datetime as dt

from IPython.display import display, HTML


In [3]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [4]:
cursor = connection.cursor()

In [5]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

# Lab: Data Extraction

## So far, we have done several data extractions:

* extracting database tables to csv format
* extracting database tables to flat json format
* extracting flat json files to csv format
* extracting database tables to nested json format
* extracting nested json files to csv format
* extracting database tables to Excel workbooks with several work sheets
* extracting Excel workbooks with several work sheets to csv format

## Last week, we extracted our temp database tables into the stores nested json file; we will now extract our temp database tables starting with sales at the top and perform some more complicated transformations as part of the extract

In [6]:
def my_extract_sales_nested_json(file_name):
    "extract nested json with sales at the top level to the file"
    
    connection.rollback()
    
    file_json = {"creator": "Acme Gourmet Meals",
                  "timestamp": dt.now().strftime("%Y-%d-%m %H:%M:%S"),
                  "file_name": file_name, 
                  "version": "12.4.7",
                  "legal": "Unauthorized use, duplication, or possession, blah, blah",
                  "sales": []
                }
        
    query = """
    
    select row_to_json(a)
    from (select store_id,
                 sale_id,
                 customer_id,
                 (lpad(store_id::text, 3, '0') || '-' || lpad(sale_id::text, 9, '0')) as receipt_number,
                 sale_date,
                 total_amount as sub_total,
                 0 as tax,
                 total_amount
          from temp_sales
          order by store_id, sale_id
          ) as a
           
    """
    
    cursor.execute(query)
    
    connection.rollback()

    sale_rows = cursor.fetchall()
    
    sale_list_json = []
    
    for sale in sale_rows:
        
        sale_json = sale[0]
        store_id = sale_json['store_id']
        sale_id = sale_json['sale_id']
        customer_id = sale_json['customer_id']
        
        del sale_json['store_id']
        del sale_json['sale_id']
        del sale_json['customer_id']
                
            
        query = """

            select row_to_json(a)
            from (select *
                  from temp_stores
                  where store_id = %s
                  ) as a

        """
            
        cursor.execute(query, (store_id,))
    
        connection.rollback()
        
        store_row = cursor.fetchone()
        
        store_json = store_row[0]
        
        sale_json['store'] = store_json
            
            
        query = """

            select row_to_json(a)
            from (select customer_id,
                         cu.first_name,
                         cu.last_name,
                         (cu.first_name || ' ' || cu.last_name) as first_last_name,
                         (cu.last_name || ', ' || cu.first_name) as last_first_name,
                         cu.street,
                         cu.city,
                         cu.state,
                         cu.zip,
                         z.population,
                         z.area,
                         z.density,
                         cu.closest_store_id,
                         cu.distance
                  from temp_customers as cu
                       join zip_codes as z
                           on z.zip = cu.zip
                  where customer_id = %s
                  ) as a

        """
            
        cursor.execute(query, (customer_id,))
    
        connection.rollback()
        
        customer_row = cursor.fetchone()
        
        customer_json = customer_row[0]
        
        sale_json['customer'] = customer_json
        
        
        
        line_item_list_json = []
            
        query = """

            select row_to_json(a)
            from (select l.product_id,
                         p.description,
                         12 as price,
                         l.quantity,
                         12 * l.quantity as line_total
                  from temp_line_items as l
                       join products as p
                           on l.product_id = p.product_id
                  where store_id = %s and sale_id = %s 
                  order by store_id, sale_id, line_item_id
                  ) as a

        """

        cursor.execute(query, (store_id, sale_id))
    
        connection.rollback()
        
        line_item_rows = cursor.fetchall()
        
        line_item_list_json = []
            
        for line_item_row in line_item_rows:
                
            line_item_json = line_item_row[0]
            
            line_item_list_json.append(line_item_json)
            
        sale_json['line_items'] = line_item_list_json
             
    
        file_json['sales'].append(sale_json)
    
        
    f = open(file_name, "w")
    
    json.dump(file_json, f, indent=2)

    f.close()  

In [7]:
my_extract_sales_nested_json("temp_sales_nested.json")

UndefinedTable: relation "temp_sales" does not exist
LINE 12:           from temp_sales
                        ^


## Use recursion to print nested json to show the structure

In [None]:
def my_recursive_print_json(j, level = -1):
    "given a json object print it"
    
    level += 1
    
    spaces = "    "
    
    if type(j) is dict:
        dict_2_list = list(j.keys())
        for k in dict_2_list:
            print(spaces * level + k)
            my_recursive_print_json(j[k], level)
            
    elif type(j) is list:
        for (i, l) in enumerate(j):
            print(spaces * level + "[" + str(i) + "]")
            my_recursive_print_json(l, level)
                  
    else:
        print(spaces * level + "value:", str(j))
                  


In [None]:
def my_read_nested_json(file_name):
    "given a file of json, read it and parse it meaningfully"
    
    f = open(file_name, "r")
    
    j = json.load(f)
    
    f.close
    
    my_recursive_print_json(j)

In [None]:
my_read_nested_json("temp_sales_nested.json")

## You try it - extract our temp database tables into the customer nested json file, temp_customers_nested.json, which we looked at last week; some hints:

* customer:
 * create a derived column first_last_name 
 * create a derived column last_first_name
 * join the temp_customers table to the zip_codes table to pull the population, area, and density
* sale:
 * receipt_number is a derived column made up of store_id and sale_id
 * since we don't have sales tax on food, sub_total is the total_amount and tax is 0
* line items:
 * 12 is the price since all meals cost $12 tax included
 * 12 * quantity is the line_total

# Lab: Creating Staging Tables, Loading Raw Data into Staging Tables

In [None]:
connection.rollback()

query = """

drop table if exists stage_1_customers;
drop table if exists stage_1_sales;
drop table if exists stage_1_line_items;



"""

cursor.execute(query)

connection.commit()



## Using varchar(100) for all columns so data out of format will load so we can clean it

In [None]:
#
# create staging tables with all varchar(100)
#

connection.rollback()

query = """


create table stage_1_customers (
  stage_id serial,
  customer_id varchar(100),
  first_name varchar(100),
  last_name varchar(100),
  street varchar(100),
  city varchar(100),
  state varchar(100),
  zip varchar(100),
  closest_store_id varchar(100),
  distance varchar(100)
);

create table stage_1_sales (
  stage_id serial,
  store_id varchar(100),
  sale_id varchar(100),
  customer_id varchar(100),
  sale_date varchar(100),
  total_amount varchar(100)
);

create table stage_1_line_items (
  stage_id serial,
  store_id varchar(100),
  sale_id varchar(100),
  line_item_id varchar(100),
  product_id varchar(100),
  quantity varchar(100)
);

"""

cursor.execute(query)

connection.commit()



In [1]:
connection.rollback()

query = """

copy stage_1_customers (customer_id, first_name, last_name, street, city, state, zip, closest_store_id, distance)
from '/user/labs/week_07/clean_data/clean_customers.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

NameError: name 'connection' is not defined

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_1_customers
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

## You try it - load the stage_1_sales table from the csv file clean_sales.csv;  load the stage_1_line_items table from the csv file clean_line_items.csv; verify each with a query

# Lab: Raw Data Exploration Using Staging Tables

## Given a table, query the column names;  for each column, find max length, min length, max value, min value, how distinct is the column; etc;

In [None]:
def my_explore_staging_table(table_name):
    "given a table name, explore it"
    
    print("\n---------------------------------------------------")
    print("Exploring Columns for Table:", table_name)
    print("---------------------------------------------------\n")
    
    rollback_before_flag = True
    rollback_after_flag = True
    
    connection.rollback()
    
    query = "select * from " + table_name + " where 0 = 1;"
    
    cursor.execute(query)

    connection.rollback()    

    column_list = [d[0] for d in cursor.description]
    
    for column_name in column_list:
        
        if column_name == "stage_id":
            continue;
        
        print("---------------------------------------------------")
        print("Column:", column_name)
        print("---------------------------------------------------")

        
        query = "select min(length(" + column_name + ")) as min_length, "
        query += " max(length(" + column_name + ")) as max_length, "
        query += " count(*) as total_rows, "
        query += " count(distinct " + column_name + ") as total_distinct_values"
        query += " from " + table_name + ";"
        
        df = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)
        
        display(HTML(df.to_html()))
        
        query = "select " + column_name + ", count(*) from " + table_name 
        query += " group by " + column_name + " order by 2 desc limit 10;"
        
        df = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)
        
        display(HTML(df.to_html()))
    

In [None]:
my_explore_staging_table("stage_1_customers")

## In the coming labs for this week, we will do further exploration to clean the data as we go

## You try it - using the above function my_explore_staging_table(), explore the tables stage_1_sales and stage_1_line_items

# Lab: Transforming Data by Parsing, Joining, Augmenting, Consolidating, and Filtering

## In our extractions so far, we have seen examples of Parsing, Joining, Augmenting, Consolidating, and Filtering:

## Parsing

* created a receipt_id from store_id and sale_id
* extracting the date to a dow number
* extracting the date to a day of week string
* extracting the date to a month string
* creating first and last name in the same column, also with last name first

## Joining

* joining the stores table to the sales table
* joining the sales table to the customers table
* joining the sales table to the line items table
* joining the line items table to the products table

## Augmenting (joining to a table in a secondary dataset)

* joining the zip in customer table to the secondary dataset table zip_codes to pull the population, area, and density

## Consolidating (Aggregation)

* number of rows in a table
* number of disctince rows in a table
* max value of a column
* min value of a column
* average value of a column

## Filtering

* where clauses (pre-aggregation)
* having clauses (post-aggregation)


## Once data is clean, we can create another staging table with actual data types and copy 

In [None]:
connection.rollback()

query = """

drop table if exists stage_2_customers;
drop table if exists stage_2_sales;
drop table if exists stage_2_line_items;

"""

cursor.execute(query)

connection.commit()



In [None]:
connection.rollback()

query = """


create table stage_2_customers (
  stage_id serial,
  customer_id numeric(6),
  first_name varchar(32),
  last_name varchar(32),
  street varchar(32),
  city varchar(32),
  state varchar(2),
  zip varchar(5),
  closest_store_id numeric(6),
  distance numeric(3)
);

create table stage_2_sales (
  stage_id serial,
  store_id numeric(6),
  sale_id numeric(8),
  customer_id numeric(6),
  sale_date date,
  total_amount numeric(5)
);

create table stage_2_line_items (
  stage_id serial,
  store_id numeric(6),
  sale_id numeric(8),
  line_item_id numeric(3),
  product_id numeric(3),
  quantity numeric(3)
);

"""

cursor.execute(query)

connection.commit()



In [None]:
connection.rollback()

query = """

insert into stage_2_customers
(stage_id, customer_id, first_name, last_name, street, city, state, zip, closest_store_id, distance)
select stage_id,
       customer_id::numeric,
       first_name,
       last_name, 
       street,
       city,
       state,
       zip,
       closest_store_id::numeric,
       distance::numeric
from stage_1_customers
order by stage_id;

"""

cursor.execute(query)

connection.commit()



In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_2_customers
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

## You try it - copy data from stage_1_sales to stage_2_sales and from stage_1_line_items to stage_2_line_items; query to verify