In [1]:
# import PostgreSQL python library
import psycopg2 as pg2
import os

### Connect to database and get cursor

In [2]:
# connect to database
conn = pg2.connect(dbname = 'kkbox', user='postgres', password = 'Hallmark')
# create a cursor for queries
cur = conn.cursor()

### Create Activity Table

In [3]:
query1 = '''
        CREATE TABLE Activity (
            log_id serial PRIMARY KEY,
            msno VARCHAR(50) NOT NULL,
            date DATE,
            num_25 integer,
            num_50 integer,
            num_75 integer,
            num_985 integer,
            num_100 integer,
            num_unq integer,
            total_secs real
        );
        '''

In [4]:
# run/execute query
cur.execute(query1)

### Insert data from csv file
This operation took less than 30 minutes. When one aggreagates with Pandas by chunk, it takes 35-40 min to go through the whole log file.

In [5]:
query2 = '''
        COPY Activity (msno, date, num_25, num_50, num_75, num_985, num_100, num_unq, total_secs)
        FROM %(file_path)s
        (FORMAT CSV,
        HEADER TRUE)
        ;
        '''
user_log_dir = os.path.abspath(os.path.join(os.pardir, 'data', 'raw', 'user_logs.csv'))

In [6]:
# run/execute query with file path as argument
cur.execute(query2, {'file_path': user_log_dir})

### Commit changes, close cursor and connection

In [7]:
# Don't forget to commit your changes
conn.commit()

In [8]:
# close cursor and connection
cur.close()
conn.close()

### Data cleansing on total listening time
Activity log is a daily activity log. We learned from data exploratory that total seconds have unrealistic high values (~1e9) or even negative values. Here we set them to NULL. An alternative would be to use the mean over a month for each user. However, we are planning to use the average engagement per month so even if we replace extreme value by the montly mean, it won't change our monthly average! It saves us time. 

In [12]:
# connect to database
with pg2.connect(dbname = 'kkbox', user='postgres', password = 'Hallmark') as conn:

    # create a cursor for queries
    cur = conn.cursor()
    # compose query
    query_totsec = '''
            UPDATE Activity
            SET total_secs = NULL
            WHERE total_secs < 0 OR total_secs > 86400;
            '''
    cur.execute(query_totsec)
    # Don't forget to commit your changes
    conn.commit()
    # close cursor
    cur.close()

### Draft (Query data Example and others)

In [10]:
# # create query
# query1 = '''
#         SELECT starttime, firstname || ' ' || surname AS fullname
#         FROM cd.bookings AS book
#         JOIN cd.members AS me ON me.memid = book.memid
#         WHERE firstname || ' ' || surname = 'David Farrell'
#         '''

# # exe query
# cur.execute(query1)
# # recover result of query
# line = cur.fetchone()
# lines = cur.fetchmany(5)
# alllines = cur.fetchall()

In [11]:
# query1 = '''
#         CREATE TABLE Activity_temp (
#             log_id serial PRIMARY KEY,
#             msno VARCHAR(50) NOT NULL,
#             date DATE,
#             num_25 integer CONSTRAINT positive_num25 CHECK(num_25 >= 0),
#             num_50 integer CONSTRAINT positive_num50 CHECK(num_50 >= 0),
#             num_75 integer CONSTRAINT positive_num75 CHECK(num_75 >= 0),
#             num_985 integer CONSTRAINT positive_num985 CHECK(num_985 >= 0),
#             num_100 integer CONSTRAINT positive_num100 CHECK(num_100 >= 0),
#             num_unq integer CONSTRAINT positive_numunq CHECK(num_unq >= 0),
#             total_secs real CONSTRAINT realistic_sec CHECK(total_secs > 0 AND total_secs < 86400)
#         );
#         '''

### Cast total listening time to integer
We could cast total time to an integer having the effect to round each value to the lowest second. It is better to to do so after aggregation as the difference between the rounded value and actual value can become non-negligible.

In [None]:
# ALTER TABLE Activity ALTER COLUMN total_secs TYPE integer;