# Faster etl

The official documentation for PostgreSQL features an entire section on [Populating a Database](https://www.postgresql.org/docs/current/populate.html#POPULATE-COPY-FROM). According to the documentation, the best way to load data into a database is using the `copy` command - this is much faster than the INSERT. Therefore I created this etl to do exactly that.

In [1]:
import os
import io
import glob
import psycopg2
import pandas as pd
from typing import Iterator, Dict, Any, Optional
from sql_queries import *


In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=great")
cur = conn.cursor()
conn.set_session(autocommit=True)


In [3]:
%load_ext sql
%sql postgresql+psycopg2://postgres:great@localhost/sparkifydb

'Connected: postgres@sparkifydb'

In [4]:
%%sql 
SELECT COUNT(*) FROM songplays

 * postgresql+psycopg2://postgres:***@localhost/sparkifydb
1 rows affected.


count
7752


In [4]:
# get all files matching extension from directory
def get_files(filepath):
    all_files = []
    # walk() generates the file names in a directory tree
    for root, dirs, files in os.walk(filepath):
        # glob finds all the pathnames matching a specified pattern
        # join combines the two path elements
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            # add the absolute path to the list
            all_files.append(os.path.abspath(f))
    return all_files

## The String Iterator

thanks to the [beer iterator](https://hakibenita.com/fast-load-data-python-postgresql#copy-data-from-a-string-iterator)
the following class creates a file-like object that will act as a buffer between the remote source and the COPY command. The buffer will consume JSON via the iterator, clean and transform the data, and output clean CSV.

In [5]:
class StringIteratorIO(io.TextIOBase):
    def __init__(self, iter: Iterator[str]):
        self._iter = iter
        self._buff = ''

    def readable(self) -> bool:
        return True

    def _read1(self, n: Optional[int] = None) -> str:
        while not self._buff:
            try:
                self._buff = next(self._iter)
            except StopIteration:
                break
        ret = self._buff[:n]
        self._buff = self._buff[len(ret):]
        return ret

    def read(self, n: Optional[int] = None) -> str:
        line = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                line.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                line.append(m)
        return ''.join(line)

Internally, it fetches the rows from only when its internal line buffer is empty.

## Clean Values
Empty values are transformed to `\N`. It is the default string used by PostgreSQL to indicate NULL in `COPY` (this can be changed using the NULL option).

In [6]:
def clean_csv_value(value: Optional[Any]) -> str:
    if value is None:
        return r'\N'
    if value == 'NaN':
        return r'\N'
    return str(value)

## The json file generator
Create a generator that reads a list of data paths and loads each json file as a dictionary.
If a json file has multiple dictionaries inside, it yields them separately.

In [7]:
def json_gen(file_list: list) -> Iterator[Dict[str, Any]]:
    import json
    for file in file_list:    
        with open(file) as json_file: 
            data = []
            for line in json_file:
                data = json.loads(line)
                if not data:
                    break
                yield data
            

## json to PostgreSQL 

In [8]:
def process_song_data(datapath: str) -> None:
    file_list = get_files(datapath)
    jsonfile = json_gen(file_list)
    x = StringIteratorIO((
        '|'.join(map(clean_csv_value, (
            i['song_id'],
            i['title'],
            i['artist_id'],
            i['year'],
            i['duration'],
            i['artist_name'],
            i['artist_location'],
            i['artist_latitude'],
            i['artist_longitude']
        ))) + '\n'
        for i in jsonfile if i['song_id'] != ''
    ))
    cur.execute("""DELETE FROM staging_songs;""")   
    cur.copy_expert("COPY staging_songs FROM STDIN DELIMITER '|'", x)
    

In [9]:
def process_event_data(datapath: str) -> None:
    from datetime import datetime
    file_list = get_files(datapath)
    jsonfile = json_gen(file_list)
    x = StringIteratorIO((
        '|'.join(map(clean_csv_value, (          
            datetime.fromtimestamp(i['ts']/1000.0),
            i['userId'],
            i['level'],
            i['sessionId'],
            i['location'],
            i['userAgent'],
            i['firstName'],
            i['lastName'],
            i['gender'],
            i['song'],
            i['artist'],
            i['length']
        ))) + '\n'
        for i in jsonfile if i['userId'] != ''
    ))
    cur.execute("""DELETE FROM staging_events;""")   
    cur.copy_expert("COPY staging_events FROM STDIN DELIMITER '|'", x)
    

In [10]:
datapath = os.path.normcase(os.getcwd()) + '/data/log_data'
process_event_data(datapath)

datapath = os.path.normcase(os.getcwd()) + '/data/song_data'
process_song_data(datapath)

In [11]:
cur.execute("""INSERT INTO artists (artist_id, name, location, latitude, longitude)
                SELECT artist_id, name, location, latitude, longitude FROM staging_songs
                ON CONFLICT (artist_id) DO NOTHING;""")
cur.execute("""INSERT INTO songs (song_id, title, artist_id, year, duration)
                SELECT song_id, title, artist_id, year, duration FROM staging_songs
                ON CONFLICT (song_id) DO NOTHING;""")

In [12]:
cur.execute("""INSERT INTO time (start_time, hour, day, week, month, year, weekday)
                SELECT 
                start_time, 
                EXTRACT(hour FROM start_time),
                EXTRACT(day FROM start_time),
                EXTRACT(week FROM start_time),
                EXTRACT(month FROM start_time),
                EXTRACT(year FROM start_time),
                EXTRACT(dow FROM start_time)
                FROM staging_events
                ON CONFLICT (start_time) DO NOTHING;""")
cur.execute("""INSERT INTO users (user_id, first_name, last_name, gender, level)
                SELECT user_id, first_name, last_name, gender, level 
                FROM staging_events 
                WHERE user_id IS NOT NULL AND user_id <> 0
                ON CONFLICT (user_id) DO NOTHING;""")
cur.execute("""INSERT INTO songplays (start_time, user_id, level, 
                song_id, artist_id, session_id, location, user_agent) 
                SELECT g.start_time, g.user_id, g.level, h.song_id, 
                h.artist_id, g.session_id, g.location, g.user_agent 
                FROM staging_events g
                LEFT JOIN (
                    SELECT song_id, j.artist_id, k.name as artist, title, 
                    duration FROM songs j INNER JOIN artists k 
                    ON j.artist_id = k.artist_id) h ON g.song = h.title 
                    AND g.length = h.duration AND g.artist = h.artist
                ON CONFLICT (start_time, user_id) DO NOTHING;""")

# Close Connection to Sparkify Database

In [21]:
conn.close()