# Iowa Liquor Sales -  Import Data

### Setup

In [28]:
import os, pandas as pd, psycopg2
from sqlalchemy import create_engine
import shutil
from IPython.display import display, Markdown

### Login to postgresql via psycopg2 and check tables

#### login to psql console as root (postgres), create database, grant privileges to default user

```bash
# login as postgres (root user) via bash
sudo -u postgres psql
```

```SQL
/* create user, database, schema via psql */
CREATE USER iowa;
CREATE DATABASE iowaalcohol;
\c iowaalcohol;
GRANT ALL ON DATABASE iowaalcohol TO iowa;
CREATE SCHEMA iowa_sch;
ALTER DATABASE iowaalcohol SET search_path TO iowa_sch,"$user",public;
GRANT USAGE ON SCHEMA iowa_sch TO iowa;
GRANT ALL ON SCHEMA iowa_sch TO iowa;
GRANT CREATE ON SCHEMA iowa_sch TO iowa;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA iowa_sch TO iowa;
SET search_path TO iowa_sch, "$user", public;
EXIT
```

```bash
# fix pg_hba.conf to allow logins with password instead of peer auth
cd /etc/postgresql/14/
sudo vim pg_hba.conf
# add the following row (without #):
# # TYPE        DATABASE      USER           ADDRESS         METHOD
#   local       iowaalcohol   iowa                           md5
sudo service postgresql restart
psql -U iowa
```

#### connect to psycopg2 and sqlalchemy

In [82]:
# Connect to your postgres DB
with open('../data/iowa_pass.txt') as f:
    iowa_pass = f.read().strip()
conn = psycopg2.connect(dbname='iowaalcohol', user='iowa', password=iowa_pass)
cur = conn.cursor()
engine = create_engine(f'postgresql://iowa:{iowa_pass}@localhost/iowaalcohol')

In [76]:
try:
    engine.connect()
    print("success")
except SQLAlchemyError as err:
    print("error", err.__cause__) 

success


In [83]:
# check connection: get list of tables in current database
cur.execute("""
    SELECT
        table_schema || '.' || table_name
    FROM
        information_schema.tables
    WHERE
        table_type = 'BASE TABLE'
    AND
        table_schema NOT IN ('pg_catalog', 'information_schema');
    """)
records = cur.fetchall()
records

[('iowa_sch.iowaalcohol',), ('iowa_sch.county_pop',)]

### Create and populate tables

#### connect

In [None]:
# connect to psycopg2
try:
    conn = psycopg2.connect("dbname=iowaalcohol user=bhrdwj")
    cur = conn.cursor()
except:
    print("I am unable to connect to the database") 

#### login to console as root (postgres), create database, grant privileges to default user

In [5]:
# Create empty table iowaalcohol based on provided metadata
# check if store_code is always integer  (select distinct store_code from iowaalcohol;)

sql = """
    /*mysql*/
    CREATE TABLE iowaalcohol (
    address VARCHAR(255) DEFAULT NULL,
    btl_vol_ml INT4 DEFAULT NULL,
    btls_sold INT4 DEFAULT NULL,
    category VARCHAR(20) DEFAULT NULL,
    category_name VARCHAR(100) DEFAULT NULL,
    city VARCHAR(255) DEFAULT NULL,
    county VARCHAR(255) DEFAULT NULL,
    county_99 VARCHAR(255) DEFAULT NULL,
    county_number BIGINT DEFAULT NULL,
    date DATE DEFAULT NULL,
    invoice_item_no VARCHAR(20) DEFAULT NULL,
    item_description VARCHAR(255) DEFAULT NULL,
    item_no INT4 DEFAULT NULL,
    pack_btl_ct INT4 DEFAULT NULL,
    state_btl_cost NUMERIC(7,2) DEFAULT NULL,
    state_btl_retail NUMERIC(7,2) DEFAULT NULL,
    store_code VARCHAR(12) DEFAULT NULL,
    store_location VARCHAR(255) DEFAULT NULL,
    store_name VARCHAR(255) DEFAULT NULL,
    sale_ttl_usd NUMERIC(12,2) DEFAULT NULL,
    vendor_name VARCHAR(255) DEFAULT NULL,
    vendor_no VARCHAR(20) DEFAULT NULL,
    vol_sold_gal NUMERIC(8,3) DEFAULT NULL,
    vol_sold_lt NUMERIC(8,3) DEFAULT NULL,
    zipcode VARCHAR(20) DEFAULT NULL
    );
    """

try:
    cur.execute(sql)
    conn.commit()
except Exception as e:
    print("If you want to overwrite this database, make sure it's on-purpose!")
    print(e)

In [6]:
# Create indices for empty database iowaalcohol
sql = """
    CREATE INDEX store_code ON iowaalcohol ( store_code ); 
    CREATE INDEX date ON iowaalcohol ( date ); 
    CREATE INDEX category_name ON iowaalcohol ( category_name ); 
    CREATE INDEX category ON iowaalcohol ( category ); 
    """

try:
    cur.execute(sql)
    conn.commit() # <--- makes sure the change is shown in the database
except Exception as e:
    print(e)

#### create and populate table county_pop

In [78]:
# Get county names and population from census data 
census = pd.read_csv('../data/co-est2020.csv', encoding='latin')
census = census.loc[census.STNAME == 'Iowa'].loc[census.CTYNAME != 'Iowa']
census = census[['CTYNAME', 'POPESTIMATE2012']]
census['CTYNAME'] = census.CTYNAME.str.replace(' County', '')
census['CTYNAME'] = census.CTYNAME.str.lower().str.replace("'", '')
county_pop = census.copy().rename(columns={'CTYNAME':'name', 'POPESTIMATE2012':'population'})

In [81]:
# create empty table
sql = "DROP TABLE IF EXISTS iowa_sch.county_pop;"

try:
    cur.execute(sql)
    conn.commit() # <--- makes sure the change is shown in the database
except Exception as e:
    print(e)

current transaction is aborted, commands ignored until end of transaction block



In [79]:
# create empty table
sql = """
CREATE TABLE county_pop (
    name VARCHAR(255) NOT NULL,
    population INT4 DEFAULT NULL
    );
"""

try:
    cur.execute(sql)
    conn.commit() # <--- makes sure the change is shown in the database
except Exception as e:
    print(e)

relation "county_pop" already exists



In [27]:
# Create indices for empty database county_pop
sql = """
    CREATE INDEX name ON county_pop ( name ); 
    CREATE INDEX population ON county_pop ( population ); 
    """

try:
    cur.execute(sql)
    conn.commit() # <--- makes sure the change is shown in the database
except Exception as e:
    print(e)

```bash
# login as postgres (root user) via bash
sudo -u postgres psql
```

```SQL
/* create database via psql */
ALTER TABLE iowaalcohol SET SCHEMA iowa_sch;
ALTER TABLE county_pop SET SCHEMA iowa_sch;
ALTER TABLE iowa_sch.iowaalcohol OWNER TO iowa;
ALTER TABLE iowa_sch.iowaalcohol OWNER TO iowa;

EXIT
```

In [74]:
# Populate county_pop with fields county and population
county_pop.to_sql(name='county_pop', con=engine, schema='iowa_sch')

### preprocess and import the data to postgres from the original csv

In [7]:
# from the original csv import into postgresql

# PRE-IMPORT CLEANING: reformat dates, output to path1
!sed -E "s#([0-9]{2})/([0-9]{2})/([0-9]{4})#\3-\1-\2#" < ../data/Iowa_Liquor_Sales.csv | tr -d '$' > ../data/iowa-liquor-datefixed.csv
path1 = '../data/iowa-liquor-datefixed.csv'

# PRE-IMPORT CLEANING: define fieldnames crosswalk
fieldname_crosswalk = {
    'State Bottle Retail': 'state_btl_retail',
    'Volume Sold (Gallons)': 'vol_sold_gal',
    'Volume Sold (Liters)': 'vol_sold_lt',
    'Store Number': 'store_code',
    'Bottle Volume (ml)': 'btl_vol_ml',
    'Invoice/Item Number': 'invoice_item_no',
    'Item Number': 'item_no',
    'Store Name': 'store_name',
    'Bottles Sold': 'btls_sold', 
    'Sale (Dollars)': 'sale_ttl_usd',
    'Address': 'address',
    'Category': 'category', 
    'Category Name': 'category_name',
    'City': 'city',
    'County': 'county',
    'County Number': 'county_number',
    'Date': 'date',
    'Item Description': 'item_description',
    'Pack': 'pack_btl_ct',
    'State Bottle Cost': 'state_btl_cost',
    'Store Location': 'store_location',
    'Vendor Name': 'vendor_name',
    'Vendor Number': 'vendor_no',
    'Zip Code': 'zipcode'
    }

# PRE-IMPORT CLEANING: Get fields from csv, store as list: csv_fields
path1 = '../data/iowa-liquor-datefixed.csv'
with open(path1) as csv_f:
    csv_fields = csv_f.readline().split(',')
    csv_fields = [i.strip() for i in csv_fields]

# PRE-IMPORT CLEANING: Rename the fields of csv as new_colnames using fieldnames_crosswalk, output to path2
def translate_list(l:list, d:dict, strip=True):
    "translate each element of list through dictionary"
    def translate_str(s:str, d:dict, strip=True):
        """
        - Strip whitespace at beginning and end of string.
        - If string is a key in dict d, return the corresponding value.
        - Else return the original string.
        """
        if strip:
            s = s.strip()
        if s in d:
            return d[s]
        else:
            return s
    return [translate_str(i, d) for i in l]
new_colnames = ','.join(translate_list(csv_fields, fieldname_crosswalk))
path2 = '../data/iowa-liquor-datefixed-fieldsfixed.csv'
with open(path1) as f1:
    f1.readline() # and discard
    with open(path2, 'w') as f2:
        f2.close()
    with open(path2, 'a') as f2:
        f2.write(new_colnames + '\n')
        shutil.copyfileobj(f1, f2)
os.remove(path1)

# Generate query to populate database iowaalcohol (import to PostgreSQL)
# RUN THE QUERY AS ROOT USER (POSTGRES) FROM PSQL
sql_md = (
    f"```SQL\n"
    f"/*mysql*/\n"
    f"COPY iowaalcohol({new_colnames})\n"
    f"FROM '{os.path.abspath(path2)}'\n"
    f"WITH (DELIMITER ',', FORMAT CSV, HEADER)\n"
    f";\n"
    f"```"
    )

print(sql_md)

```SQL
/*mysql*/
COPY iowaalcohol(invoice_item_no,date,store_code,store_name,address,city,zipcode,store_location,county_number,county,category,category_name,vendor_no,vendor_name,item_no,item_description,pack_btl_ct,btl_vol_ml,state_btl_cost,state_btl_retail,btls_sold,sale_ttl_usd,vol_sold_lt,vol_sold_gal)
FROM '/home/bhrdwj/git/iowa_liquor_sales/data/iowa-liquor-datefixed-fieldsfixed.csv'
WITH (DELIMITER ',', FORMAT CSV, HEADER)
;
```


### post-import cleaning

In [11]:
# Clean county names by populating field county_99
# There are 99 counties in Iowa plus one 'UNKNOWN'
sql = """
    UPDATE iowaalcohol
    SET county_99 = CASE
        WHEN (LOWER(county) = 'buena vist') THEN 'buena vista'
        WHEN (LOWER(county) = 'cerro gord') THEN 'cerro gordo'
        WHEN (LOWER(county) = 'pottawatta') THEN 'pottawattamie'
        WHEN (LOWER(county) = 'o''brien') THEN 'obrien'
        WHEN county IS NULL THEN 'UNKNOWN'
        ELSE LOWER(county)
        END
    ;
    """

try:
    cur.execute(sql)
    conn.commit()
except Exception as e:
    print(e)

## References

County data:
https://www2.census.gov/programs-surveys/popest/datasets/2010-2020/counties/totals/co-est2020.csv

##### postgresql code to stop everything

```SQL
SELECT pid, pg_terminate_backend(pid) 
FROM pg_stat_activity 
WHERE datname = current_database() AND pid <> pg_backend_pid();
```

##### source for snippet for reformatting dates

The following bash snippet converts the dates from mm/dd/yyyy to yyyy-mm-dd.
- I apply it to the csv data before importing to postgresql.
- I got it from [this gist](https://gist.github.com/dannguyen/18ed71d3451d147af414)
- It uses [backreferences](https://www.gnu.org/software/sed/manual/html_node/Back_002dreferences-and-Subexpressions.html)
- It also uses the hash `#` as an [alternative delimiter](https://backreference.org/2010/02/20/using-different-delimiters-in-sed/index.html) for `/` in the substitution command syntax.

```bash
# via bash
sed -E "s#([0-9]{2})/([0-9]{2})/([0-9]{4})#\3-\1-\2#" < Iowa_Liquor_Sales.csv | 
  tr -d '$' > iowa-liquor-datefixed.csv
```