# Testing copy_from

In [1]:
import psycopg2
import os, glob
import pandas as pd
import numpy as np
import warnings

In [2]:
warnings.filterwarnings('ignore')

In [3]:
!conda info --envs

# conda environments:
#
                         /Users/manuel/.julia/conda/3
base                     /Users/manuel/opt/anaconda3
courseragcp              /Users/manuel/opt/anaconda3/envs/courseragcp
iapucp                   /Users/manuel/opt/anaconda3/envs/iapucp
mitxpro                  /Users/manuel/opt/anaconda3/envs/mitxpro
udacity                  /Users/manuel/opt/anaconda3/envs/udacity
udacity-de            *  /Users/manuel/opt/anaconda3/envs/udacity-de



In [4]:
!python create_tables.py

In [5]:
connection = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")

In [6]:
def get_files(filepath):
    """
    return a list with all json files on the filepath folder
    
    params
    filepath: path of parent folder
    """
    
    all_files = []
    
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files


def insert_df_to_table(df, temp_file, connection, table_name):
    """
    insert a dataframe into a postgres table
    
    params
    df: dataframe with data
    temp_file: local path to checkpoint the dataframe
    connection: connection object to the database
    table_name: table to insert the data
        
    based on the implementation of Naysan Saran
    https://naysan.ca/2020/06/21/pandas-to-postgresql-using-psycopg2-copy_from/
    """
    
    #save values to temp file
    df = df.fillna('NULL')
    df.to_csv(temp_file, header=False, index=False, sep = '\t')
    
    #load temp file
    file = open(temp_file, 'r')
    
    cursor = connection.cursor()
    
    try:
        
        #copy data from file to postgres table
        cursor.copy_from(file, table_name, sep = '\t', null = 'NULL')
    
    except (Exception, psycopg2.DatabaseError) as error:

        #print error if completed
        print(f"Error({table_name}): %s" % error)
        
        #remove temp file, rollback and close cursor
        os.remove(temp_file)
        connection.rollback()
        cursor.close()
        
        return None
    
    #print message if completed
    print(f"copy_from_file({table_name}) done")
    
    #remove temp file and close cursor
    cursor.close()
    os.remove(temp_file)
    
    return 1

## Song files

In [7]:
song_files = get_files('data/song_data')
df = pd.concat([pd.read_json(file, lines = True) for file in song_files])
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
0,1,AR8ZCNI1187B9A069B,,,,Planet P Project,SOIAZJW12AB01853F1,Pink World,269.81832,1984
0,1,ARXR32B1187FB57099,,,,Gob,SOFSOCN12A8C143F5D,Face the Ashes,209.60608,2007
0,1,AR10USD1187B99F3F1,,,"Burlington, Ontario, Canada",Tweeterfriendly Music,SOHKNRJ12A6701D1F8,Drop of Rain,189.57016,0
0,1,ARGSJW91187B9B1D6B,35.21962,-80.01955,North Carolina,JennyAnyKind,SOQHXMF12AB0182363,Young Boy Blues,218.77506,0


Songs table

In [8]:
song_data = df.get(['song_id', 'title', 'artist_id', 'year', 'duration'])
song_data.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
0,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
0,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608
0,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
0,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506


In [9]:
insert_df_to_table(song_data, 'tables/songs.csv', connection, 'songs')

copy_from_file(songs) done


1

In [10]:
pd.read_sql('select * from songs', connection)

Unnamed: 0,song_id,title,artist_id,year,duration
0,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
1,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
2,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608
3,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
4,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506
...,...,...,...,...,...
66,SOLEYHO12AB0188A85,Got My Mojo Workin,ARAGB2O1187FB3A161,0,338.23302
67,SOGVQGJ12AB017F169,Ten Tonne,AR62SOJ1187FB47BB5,2005,337.68444
68,SOFFKZS12AB017F194,A Higher Place (Album Version),ARBEBBY1187B9B43DB,1994,236.17261
69,SOFNOQK12AB01840FC,Kutt Free (DJ Volume Remix),ARNNKDK1187B98BBD5,0,407.37914


Artist table

In [11]:
artist_data = df.get(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])
artist_data = artist_data.drop_duplicates()
artist_data.head()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR7G5I41187FB4CE6C,Adam Ant,"London, England",,
0,AR8ZCNI1187B9A069B,Planet P Project,,,
0,ARXR32B1187FB57099,Gob,,,
0,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
0,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955


In [12]:
insert_df_to_table(artist_data, 'tables/artists.csv', connection, 'artists')

copy_from_file(artists) done


1

In [13]:
pd.read_sql('select * from artists', connection)

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR7G5I41187FB4CE6C,Adam Ant,"London, England",,
1,AR8ZCNI1187B9A069B,Planet P Project,,,
2,ARXR32B1187FB57099,Gob,,,
3,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
4,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
...,...,...,...,...,...
64,ARAGB2O1187FB3A161,Pucho & His Latin Soul Brothers,,,
65,AR62SOJ1187FB47BB5,Chase & Status,,,
66,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,
67,ARNNKDK1187B98BBD5,Jinx,Zagreb Croatia,45.80726,15.96760


## Log files

In [14]:
all_files = get_files('data/log_data')

#concat all files into one dataframe
df = pd.concat([pd.read_json(file, lines=True) for file in all_files])
df = df.query("page == 'NextSong'")
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,1541904034796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,1541910973796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
5,Matmos,Logged In,Joseph,M,0,Gutierrez,1449.11628,free,"Columbia, SC",PUT,NextSong,1540809000000.0,284,Supreme Balloon,200,1541911006796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",75


Time table

In [15]:
t = pd.to_datetime(df['ts'])
time_data = (df['ts'], t.dt.hour, t.dt.day, t.dt.isocalendar()['week'], t.dt.month, t.dt.year, t.dt.weekday)
column_labels = ('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')
time_df = pd.DataFrame({col: values for col, values in zip(column_labels, time_data)})
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1541903636796,0,1,1,1,1970,3
1,1541903770796,0,1,1,1,1970,3
2,1541904034796,0,1,1,1,1970,3
4,1541910973796,0,1,1,1,1970,3
5,1541911006796,0,1,1,1,1970,3


In [16]:
insert_df_to_table(time_df, 'tables/time.csv', connection, 'time')

copy_from_file(time) done


1

In [17]:
pd.read_sql('select * from time', connection)

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1541903636796,0,1,1,1,1970,3
1,1541903770796,0,1,1,1,1970,3
2,1541904034796,0,1,1,1,1970,3
3,1541910973796,0,1,1,1,1970,3
4,1541911006796,0,1,1,1,1970,3
...,...,...,...,...,...,...,...
6815,1543092353796,0,1,1,1,1970,3
6816,1543092558796,0,1,1,1,1970,3
6817,1543097750796,0,1,1,1,1970,3
6818,1543097978796,0,1,1,1,1970,3


Users table

In [18]:
user_df = df.get(['userId', 'firstName', 'lastName', 'gender', 'level'])
user_df['userId'] = user_df['userId'].astype(int)
user_df = user_df.drop_duplicates(['userId', 'level'])
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,69,Anabelle,Simpson,F,free
4,32,Lily,Burns,F,free
5,75,Joseph,Gutierrez,M,free
6,92,Ryann,Smith,F,free
12,49,Chloe,Cuevas,F,free


In [19]:
insert_df_to_table(user_df, 'tables/users.csv', connection, 'users')

copy_from_file(users) done


1

In [20]:
pd.read_sql('select * from users', connection)

Unnamed: 0,user_id,first_name,last_name,gender,level
0,69,Anabelle,Simpson,F,free
1,32,Lily,Burns,F,free
2,75,Joseph,Gutierrez,M,free
3,92,Ryann,Smith,F,free
4,49,Chloe,Cuevas,F,free
...,...,...,...,...,...
99,45,Dominick,Norris,M,free
100,65,Amiya,Davidson,F,paid
101,15,Lily,Koch,F,free
102,19,Zachary,Thomas,M,free


Songplays table generation

In [21]:
artists = pd.read_sql('select * from artists', connection)
songs = pd.read_sql('select * from songs', connection)

In [22]:
df.head(1)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


In [23]:
full_songs = pd.merge(songs, artists, on ='artist_id').get(['song_id', 'artist_id', 'title', 'name', 'duration'])
full_songs.head(1)

Unnamed: 0,song_id,artist_id,title,name,duration
0,SONHOTT12A8C13493C,AR7G5I41187FB4CE6C,Something Girls,Adam Ant,233.40363


In [24]:
songplays = pd.merge(df, full_songs, left_on=['song', 'artist', 'length'], right_on=['title', 'name', 'duration'])
songplays = songplays.reset_index()
songplays.head()

Unnamed: 0,index,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,...,song,status,ts,userAgent,userId,song_id,artist_id,title,name,duration
0,0,Elena,Logged In,Lily,F,5,Koch,269.58322,paid,"Chicago-Naperville-Elgin, IL-IN-WI",...,Setanta matins,200,1542837407796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",15,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,Setanta matins,Elena,269.58322


In [25]:
songplays.get(['index','ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent'])

Unnamed: 0,index,ts,userId,level,song_id,artist_id,sessionId,location,userAgent
0,0,1542837407796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
