# INGEST CSV FILES INTO POSTGRES TABLES



In [1]:
import csv

import json

import math
import numpy as np
import pandas as pd

import psycopg2


In [2]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [3]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [4]:
cursor = connection.cursor()

In [5]:
def my_read_csv_file(file_name, limit):
    "read the csv file and print only the first limit rows"
    
    csv_file = open(file_name, "r")
    
    csv_data = csv.reader(csv_file)
    
    i = 0
    
    for row in csv_data:
        i += 1
        if i <= limit:
            print(row)
            
    print("\nPrinted ", min(limit, i), "lines of ", i, "total lines.")

# AGM file ingestion

In [6]:
# drop all the tables in the foreign key order
connection.rollback()

query = """
drop table if exists p3_line_items;
drop table if exists p3_sales;
drop table if exists p3_customers;
drop table if exists p3_stores;
drop table if exists p3_holidays;
drop table if exists p3_zip_codes;
drop table if exists p3_cities;
drop table if exists p3_states;
drop table if exists p3_products;
"""
cursor.execute(query)
connection.commit()

In [7]:
# create all the tables in the foreign key order
connection.rollback()
query = """
create table p3_zip_codes (
  zip varchar(5),
  latitude numeric(7,4),
  longitude numeric(7,4),
  city varchar(32),
  state varchar(2),
  population numeric(7),
  area numeric(9,4),
  density numeric(10,2),
  time_zone varchar(32),
  primary key (zip)
);
create table p3_cities (
    city varchar(32),
    state varchar(2),
    latitude numeric(7,4),
    longitude numeric(7,4),
    population numeric(9),
    area numeric(10,4),
    density numeric(10,2),
    time_zone varchar(32),
    primary key (city,state)
);
create table p3_states (
    state varchar(2),
    state_name varchar(32),
    latitude numeric(7,4),
    longitude numeric(7,4),
    population numeric(10),
    area numeric(10,4),
    density numeric(10,2),
    primary key (state)
);
create table p3_holidays (
  holiday_date date,
  description varchar(32),
  closed_flag boolean,
  primary key (holiday_date)
);

create table p3_products (
  product_id numeric(3),
  description varchar(32),
  primary key (product_id)
);
create table p3_stores (
  store_id numeric(6),
  street varchar(32),
  city varchar(32),
  state varchar(2),
  zip varchar(5),
  latitude numeric(7,4),
  longitude numeric(7,4),
  primary key (store_id)
);
create table p3_customers (
  customer_id numeric(6),
  first_name varchar(32),
  last_name varchar(32),
  street varchar(32),
  city varchar(32),
  state varchar(2),
  zip varchar(5),
  closest_store_id numeric(6),
  distance numeric(3),
  primary key (customer_id),
  foreign key (closest_store_id) references stores(store_id)
);
create table p3_sales (
  store_id numeric(6),
  sale_id numeric(8),
  customer_id numeric(6),
  sale_date date,
  total_amount numeric(5),
  primary key (store_id, sale_id),
  foreign key (customer_id) references customers (customer_id)
);
create table p3_line_items (
  store_id numeric(6),
  sale_id numeric(8),
  line_item_id numeric(3),
  product_id numeric(3),
  quantity numeric(3),
  primary key (store_id, sale_id, line_item_id),
  foreign key (product_id) references products (product_id)
);

"""

cursor.execute(query)

connection.commit()

In [8]:
# load the csv files into the database tables in foreign key order
connection.rollback()

query = """

copy p3_zip_codes
from '/data/zip_codes.csv' delimiter ',' NULL '' csv header;

copy p3_cities
from '/data/cities.csv' delimiter ',' NULL '' csv header;

copy p3_states
from '/data/states.csv' delimiter ',' NULL '' csv header;

copy p3_holidays
from '/data/holidays.csv' delimiter ',' NULL '' csv header;

copy p3_products
from '/data/products.csv' delimiter ',' NULL '' csv header;

copy p3_stores
from '/data/stores.csv' delimiter ',' NULL '' csv header;

copy p3_customers
from '/data/customers.csv' delimiter ',' NULL '' csv header;

copy p3_sales
from '/data/sales.csv' delimiter ',' NULL '' csv header;

copy p3_line_items
from '/data/line_items.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# BART data ingestion

In [9]:
connection.rollback()
query = """
drop table if exists p3_travel_times;
drop table if exists p3_lines;
drop table if exists p3_stations;
"""
cursor.execute(query)
connection.commit()

In [10]:
connection.rollback()

query = """

create table p3_travel_times (
    station_1 varchar(32),
    station_2 varchar(32),
    travel_time numeric(3),
    primary key (station_1, station_2)
);

create table p3_lines (
    line varchar(6),
    sequence numeric(2),
    station varchar(32),
    primary key (line, sequence)
);

create table p3_stations (
    station varchar(32),
    latitude numeric(9,6),
    longitude numeric(9,6),
    transfer_time numeric(3),
    primary key (station)
);

"""

cursor.execute(query)

connection.commit()

In [11]:
connection.rollback()

query = """

copy p3_travel_times
from 'data/travel_times.csv' delimiter ',' NULL '' csv header;

copy p3_lines
from 'data/lines.csv' delimiter ',' NULL '' csv header;

copy p3_stations
from 'data/stations.csv' delimiter ',' NULL '' csv header;

"""
cursor.execute(query)

connection.commit()

# Postgres Table Validation

In [12]:
rollback_before_flag, rollback_after_flag = True, True
query = """

SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_name LIKE '%p3%';

"""

tables = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)
p3_tables = list(tables['table_name'])

for table in p3_tables:
    query = f"SELECT count(*) FROM {table}"
    cnt = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag).iloc[0,0] 
    print(f'Table: {table}\nRecord Count: {cnt}\n')

Table: p3_stations
Record Count: 50

Table: p3_lines
Record Count: 114

Table: p3_travel_times
Record Count: 51

Table: p3_line_items
Record Count: 5417974

Table: p3_sales
Record Count: 1537617

Table: p3_customers
Record Count: 31082

Table: p3_stores
Record Count: 5

Table: p3_products
Record Count: 8

Table: p3_holidays
Record Count: 12

Table: p3_states
Record Count: 52

Table: p3_cities
Record Count: 27420

Table: p3_zip_codes
Record Count: 32723

Table: p3_products_temp
Record Count: 8

