# Data migration to the PostgreSQL

Connection to the database

In [67]:
from psycopg2 import connect, sql
credentials = dict(host='rds-test-av.cnf9kpbogrso.us-east-1.rds.amazonaws.com',
                   port='5432',
                   dbname='av_test',
                   user='avian',
                   passwd='testtest'
                  )
conn_string = "host={host} port={port} dbname={dbname} user={user} password={passwd}" 
conn = connect(conn_string.format(**credentials))

Create schema for the database

In [23]:
# Helper function for executing bunch of queries
def execute(conn, queries):
    try:
        with conn:
            with conn.cursor() as cur:
                for query in queries:
                    cur.execute(query)
    except:
        raise
    else:
        print(execute.__name__ + ' : ' + "done!")
        
drop_table = "DROP TABLE IF EXISTS {table_name}" 

Create a schema for \`devices\` table 

In [29]:
SQL_devices_table = """
CREATE TABLE devices
( 
  id SERIAL PRIMARY KEY,
  device_id TEXT,
  type TEXT,
  time timestamp with time zone,
  metric_type TEXT,
  metric_value double precision,
  CONSTRAINT u_devices_columns UNIQUE (device_id, time, metric_type)
)
;
"""
queries = [
            sql.SQL(drop_table).format(table_name=sql.Identifier("devices")),
            SQL_devices_table
          ]
execute(conn, queries)

execute : done!


In [4]:
# Helper function for role creation
def add_role(conn, role, revoke, drop_role, create, grant, override = False):
    try:
        with conn:
            with conn.cursor() as cur:
                cur.execute("SELECT 1 FROM pg_roles WHERE rolname=%s", (role,))
                if cur.fetchone():
                    if override:
                        execute(conn, [revoke, drop_role, create, grant])
                    else:
                        print("Role {} already exists".format(role))
                else:
                    execute(conn, [create, grant])
    except:
        raise
    else:
        print(add_role.__name__ + ' : ' + "done!")

Create \`ds_team_member\` role and grant all permissions to it

In [5]:
role = 'ds_team_member'

revoke_permissions = """
REVOKE CONNECT ON DATABASE {db} FROM {role};
REVOKE ALL ON ALL TABLES IN SCHEMA public FROM {role};
REVOKE ALL ON SCHEMA public FROM {role};
REVOKE ALL ON ALL SEQUENCES IN SCHEMA public FROM {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA public REVOKE ALL ON TABLES FROM {role};""" 

drop_role = "DROP ROLE IF EXISTS {role}" 

create_role = """CREATE ROLE {role} WITH CREATEDB """

grant_permissions = """
REVOKE CONNECT ON DATABASE {db} FROM PUBLIC;
GRANT CONNECT ON DATABASE {db} TO {role};

REVOKE ALL ON SCHEMA public FROM PUBLIC;
GRANT ALL ON SCHEMA public TO {role};

REVOKE ALL ON ALL TABLES IN SCHEMA public FROM PUBLIC;
GRANT ALL ON ALL TABLES IN SCHEMA public TO {role};

REVOKE ALL ON ALL SEQUENCES IN SCHEMA public FROM PUBLIC;
GRANT ALL ON ALL SEQUENCES IN SCHEMA public to {role};

ALTER DEFAULT PRIVILEGES 
IN SCHEMA public
GRANT ALL ON TABLES TO {role};
"""                                                                        

In [13]:
add_role(conn, role, 
         sql.SQL(revoke_permissions).format(db=sql.Identifier(credentials['dbname']),role=sql.Identifier(role)),
         sql.SQL(drop_role).format(role=sql.Identifier(role)),
         sql.SQL(create_role).format(role=sql.Identifier(role)),
         sql.SQL(grant_permissions).format(db=sql.Identifier(credentials['dbname']),role=sql.Identifier(role))
        )

execute : done!
add_role : done!


Create user and assign it to the \`ds_team_member\` role

In [10]:
new_user = 'ds_1'
new_password = 'test'
drop_user = 'DROP USER IF EXISTS {user}' 
create_user = "CREATE USER {user} WITH CREATEDB PASSWORD {passwd}"
grant_membership = "GRANT {role} to {user}"

execute(conn, 
        [sql.SQL(drop_user).format(user=sql.Identifier(new_user)),
         sql.SQL(create_user).format(user=sql.Identifier(new_user), passwd=sql.Literal(new_password)),
         sql.SQL(grant_membership).format(role=sql.Identifier(role), user=sql.Identifier(new_user))
        ])

execute : done!


Test connection for the newly created user

In [28]:
credentials_test = credentials
credentials_test['user'] = new_user
credentials_test['password'] = new_password
conn_test = connect(conn_string.format(**credentials_test))

cur_test = conn_test.cursor()
cur_test.execute("""
SELECT 
    table_schema || '.' || table_name
FROM
    information_schema.tables
WHERE
    table_type = 'BASE TABLE'
AND
    table_schema NOT IN ('pg_catalog', 'information_schema');
    """)
print(cur_test.fetchall())

cur_test.execute("SELECT* FROM devices")
print(cur_test.fetchall())

## Close test connection
cur_test.close()
conn_test.close()

[('public.devices',)]
[]


#### Push data to PostgreSQL

In [12]:
import uuid
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import time
from tabulate import tabulate

Generate synthetic data and save it to csv file

In [24]:
n = 100
ids = [str(uuid.uuid4()) for x in range(n)]
start = '2017-04-27'
days = 5
t = pd.date_range(start=start, periods = 12*24*days, freq='5min', tz='US/Pacific')
df_id = pd.DataFrame({'device_id': ids})
df_id.loc[:, 'key'] = 1
df_t = pd.DataFrame({'time': t})
df_t.loc[:, 'key'] = 1
rez = pd.merge(df_t, df_id, on = 'key').drop('key', axis = 1)
x = np.random.randn(rez.shape[0])*2 + 25
rez.loc[:, 'metric_value'] = x
rez.loc[:, 'metric_type'] = 'temperature'
rez.loc[:, 'type'] = 'SHU001'

path = '/home/karimlulu/DataMigration/data.csv'
rez = rez.round(0).sort_values(by='metric_value').reset_index(drop=True)
rez.to_csv(path, index = False, sep=',', header = False)
columns = list(rez.columns)
print(tabulate(rez.head(), headers='keys', tablefmt='psql'))

+----+---------------------------+--------------------------------------+----------------+---------------+--------+
|    | time                      | device_id                            |   metric_value | metric_type   | type   |
|----+---------------------------+--------------------------------------+----------------+---------------+--------|
|  0 | 2017-04-27 00:55:00-07:00 | f59292b1-149d-48b3-aa88-a90a49434168 |             18 | temperature   | SHU001 |
|  1 | 2017-04-27 16:20:00-07:00 | 7df18f86-e179-4cb2-8892-1362ff2ca790 |             18 | temperature   | SHU001 |
|  2 | 2017-04-27 12:10:00-07:00 | 1c047ccf-9758-4d0d-a46e-182ff92b2a47 |             19 | temperature   | SHU001 |
|  3 | 2017-04-27 20:25:00-07:00 | a84921c8-f8e5-45d1-b4ff-a7f583b56153 |             19 | temperature   | SHU001 |
|  4 | 2017-04-27 06:40:00-07:00 | 7df18f86-e179-4cb2-8892-1362ff2ca790 |             19 | temperature   | SHU001 |
+----+---------------------------+--------------------------------------

In [25]:
#try:
#    with conn:
#        with conn.cursor() as cur:
#            cur.execute('TRUNCATE devices')
#except: raise

In [27]:
file = open(path, 'r')

In [28]:
%%time
try:
    with conn:
        with conn.cursor() as cur:            
            cur.copy_from(file, 'devices', sep=',', columns = columns)
except: raise

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 1.88 s


In [100]:
engine = create_engine('postgresql+psycopg2://{user}:{passwd}@{host}:{port}/{dbname}'.format(**credentials), echo=False)

In [110]:
%%time
chunksize = 1000
i = 0
start = time.time()
for df in pd.read_csv(path, names = columns, chunksize = chunksize, iterator = True):
    df.to_sql(name='devices', con=engine, index=False, if_exists='append', chunksize=1000)
    i += 1
    print('Finished: {0}, Time spent: {1:0.2f} sec'.format(i * chunksize, time.time() - start))
    if i > 9: break

Finished: 1000, Time spent: 140.17 sec
Finished: 2000, Time spent: 280.85 sec
Finished: 3000, Time spent: 422.42 sec
Finished: 4000, Time spent: 562.85 sec
Finished: 5000, Time spent: 703.84 sec
Finished: 6000, Time spent: 845.41 sec
Finished: 7000, Time spent: 987.09 sec
Finished: 8000, Time spent: 1129.38 sec
Finished: 9000, Time spent: 1270.75 sec
Finished: 10000, Time spent: 1412.61 sec
CPU times: user 1.19 s, sys: 208 ms, total: 1.4 s
Wall time: 23min 32s


#### Temporary tables

In [34]:
SQL_temp_devices_table = """
CREATE TEMPORARY TABLE IF NOT EXISTS tmp_devices
( 
  id SERIAL PRIMARY KEY,
  device_id TEXT,
  type TEXT,
  time timestamp with time zone,
  metric_type TEXT,
  metric_value double precision
)
ON COMMIT DROP
;
"""
insert_statement = """
INSERT INTO devices(device_id, type, time, metric_type, metric_value) 
SELECT device_id, type, time, metric_type, metric_value 
FROM tmp_devices
ON CONFLICT ON CONSTRAINT u_devices_columns DO NOTHING;
"""

In [37]:
file = open(path, 'r')

In [38]:
%%time
try:
    with conn:
        with conn.cursor() as cur:
            cur.execute(SQL_temp_devices_table)
            cur.copy_from(file, 'tmp_devices', sep=',', columns = columns)
            cur.execute(insert_statement)
except: raise

CPU times: user 68 ms, sys: 24 ms, total: 92 ms
Wall time: 5.16 s


In [39]:
## VACUUM
try:
    with conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("VACUUM FULL")
except: raise
finally: conn.autocommit = False

In [68]:
## Experiments with window functions
conn.rollback()
cur = conn.cursor()
cur.execute("""select device_id, metric_value,
                sum(metric_value) OVER(ORDER BY metric_value DESC ROWS UNBOUNDED PRECEDING)
                from devices""")
tuples = cur.fetchall()
cols = [el[0] for el in cur.description]
df_window = pd.DataFrame(tuples, columns = cols)
df_window.head(20)

Unnamed: 0,device_id,metric_value,sum
0,9ac48182-9423-4cb2-90f7-2a3ab99d16e7,31.0,31.0
1,1c047ccf-9758-4d0d-a46e-182ff92b2a47,31.0,62.0
2,bcd76980-67c3-46ee-97e6-a4e6347c0a44,31.0,93.0
3,f6b15d24-311e-4da5-938a-79dfb88c814a,31.0,124.0
4,9ac48182-9423-4cb2-90f7-2a3ab99d16e7,31.0,155.0
5,a84921c8-f8e5-45d1-b4ff-a7f583b56153,31.0,186.0
6,a84921c8-f8e5-45d1-b4ff-a7f583b56153,31.0,217.0
7,dc0ec733-4759-46ab-986b-4afb8bbf9be7,31.0,248.0
8,a84921c8-f8e5-45d1-b4ff-a7f583b56153,30.0,278.0
9,bcd76980-67c3-46ee-97e6-a4e6347c0a44,30.0,308.0


In [85]:
conn.close()