# Data Wrangling 1.2

In [None]:
import csv

import math
import numpy as np
import pandas as pd

import psycopg2


In [None]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [None]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [None]:
cursor = connection.cursor()

# Lab: Reading CSV Files

In [None]:
def my_read_csv_file(file_name, limit):
    "read the csv file and print only the first limit rows"
    
    csv_file = open(file_name, "r")
    
    csv_data = csv.reader(csv_file)
    
    i = 0
    
    for row in csv_data:
        i += 1
        if i <= limit:
            print(row)
            
    print("\nPrinted ", min(limit, i), "lines of ", i, "total lines.")

In [None]:
my_read_csv_file("temp_stores.csv", limit=10)

In [None]:
my_read_csv_file("temp_sales.csv", limit=10)

In [None]:
my_read_csv_file("temp_random_sales.csv", limit=100)

## You try it - read and print out some lines from the following csv files: temp_line_items.csv, temp_customers.csv, temp_products, temp_holidays

# Lab: Loading CSV Data into Database Tables

In [None]:
#
# drop all the temp tables in the foreign key order
#

connection.rollback()

query = """

drop table if exists temp_line_items;
drop table if exists temp_sales;
drop table if exists temp_products;
drop table if exists temp_customers;
drop table if exists temp_stores;
drop table if exists temp_holidays;
drop table if exists temp_random_sales;


"""

cursor.execute(query)

connection.commit()



In [None]:
#
# create all the temp tables in the foreign key order
#

connection.rollback()

query = """

create table temp_holidays (
  holiday_date date,
  description varchar(32),
  closed_flag boolean,
  primary key (holiday_date)
);

create table temp_products (
  product_id numeric(3),
  description varchar(32),
  primary key (product_id)
);

create table temp_stores (
  store_id numeric(6),
  street varchar(32),
  city varchar(32),
  state varchar(2),
  zip varchar(5),
  latitude numeric(7,4),
  longitude numeric(7,4),
  primary key (store_id)
);

create table temp_customers (
  customer_id numeric(6),
  first_name varchar(32),
  last_name varchar(32),
  street varchar(32),
  city varchar(32),
  state varchar(2),
  zip varchar(5),
  closest_store_id numeric(6),
  distance numeric(3),
  primary key (customer_id),
  foreign key (closest_store_id) references temp_stores(store_id)
);

create table temp_sales (
  store_id numeric(6),
  sale_id numeric(8),
  customer_id numeric(6),
  sale_date date,
  total_amount numeric(5),
  primary key (store_id, sale_id),
  foreign key (customer_id) references temp_customers (customer_id)
);

create table temp_line_items (
  store_id numeric(6),
  sale_id numeric(8),
  line_item_id numeric(3),
  product_id numeric(3),
  quantity numeric(3),
  primary key (store_id, sale_id, line_item_id),
  foreign key (product_id) references temp_products (product_id)
);

create table temp_random_sales (
  store_id numeric(6),
  sale_id numeric(8)
)

"""

cursor.execute(query)

connection.commit()

In [None]:
#
# load the csv files into the database tables in foreign key order
#

connection.rollback()

query = """

copy temp_stores
from '/user/labs/week_06/temp_stores.csv' delimiter ',' NULL '' csv header;

copy temp_customers
from '/user/labs/week_06/temp_customers.csv' delimiter ',' NULL '' csv header;

copy temp_sales
from '/user/labs/week_06/temp_sales.csv' delimiter ',' NULL '' csv header;

copy temp_random_sales
from '/user/labs/week_06/temp_random_sales.csv' delimiter ',' NULL '' csv header;


"""

cursor.execute(query)

connection.commit()

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from temp_stores;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from temp_customers;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from temp_sales;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from temp_random_sales;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

## You try it - 
* load the file temp_holidays.csv into the table temp_holidays 
* temp_products.csv into table temp_products
* temp_line_items into table temp_line_items 
* verify the loads with a query

# Lab: Extracting CSV Files

In [None]:
connection.rollback()
    
query = """
    
copy (select * 
      from temp_random_sales
      order by store_id, sale_id)
to '/user/labs/week_06/temp_random_sales_2.csv' delimiter ',' NULL '' csv header;

copy (select * 
      from temp_stores 
      order by store_id)
to '/user/labs/week_06/temp_stores_2.csv' delimiter ',' NULL '' csv header;


"""

cursor.execute(query)
    
connection.commit()


In [None]:
my_read_csv_file("temp_random_sales_2.csv", limit=10)

In [None]:
my_read_csv_file("temp_stores_2.csv", limit=10)

## You try it - 
* extract the table temp_sales table to temp_sales_2.csv
* table temp_line_items to temp_line_items_2.csv
* table temp_customers to temp_customers_2.csv 
* table temp_products to temp_products_2.csv
* table temp_holidays to temp_holidays_2.csv
* verify by reading the csv files