In [1]:
import os,glob
import numpy as np
import pandas as pd
import json
import datetime,time

## Reading Data from json files

In [2]:
data_path = 'E:/Web/Dataset/Million_Song/log_data/2018/11/*'

log_dataset = pd.DataFrame()
for file in glob.glob(data_path):
    print(file)
    data_file = pd.read_json(file,lines=True)
    log_dataset = pd.concat([log_dataset,data_file])

E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-01-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-02-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-03-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-04-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-05-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-06-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-07-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-08-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-09-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-10-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-11-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-12-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-13-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\2018-11-14-events.json
E:/Web/Dataset/Million_Song/log_data/2018/11\201

In [3]:
log_dataset.sample(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
33,Alliance Ethnik,Logged In,Lily,F,68,Koch,195.94404,paid,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,NextSong,1541048000000.0,818,SinceritÃÂ© Et Jalousie,200,1542852842796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",15
114,,Logged In,Kinsley,F,0,Young,,paid,"Red Bluff, CA",GET,Home,1540465000000.0,436,,200,1542373125796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_...",85


In [4]:
data_path = 'E:/Web/Dataset/Million_Song/song_data/A/*'
song_dataset = pd.DataFrame()
for folder in glob.glob(data_path):
    if os.path.isdir(folder):
        for sub_folder in glob.glob(folder+'/*'):
            print(sub_folder)
            for dir in glob.glob(sub_folder+'/*'):
                song_file = pd.read_json(dir,lines=True)
                song_dataset = pd.concat([song_dataset,song_file])

E:/Web/Dataset/Million_Song/song_data/A\A\A
E:/Web/Dataset/Million_Song/song_data/A\A\B
E:/Web/Dataset/Million_Song/song_data/A\A\C
E:/Web/Dataset/Million_Song/song_data/A\B\A
E:/Web/Dataset/Million_Song/song_data/A\B\B
E:/Web/Dataset/Million_Song/song_data/A\B\C


In [5]:
song_dataset.sample(2)

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARGCY1Y1187B9A4FA5,36.16778,-86.77836,"Nashville, TN.",Gloriana,SOQOTLQ12AB01868D0,Clementina Santafè,153.33832,0
0,1,ARMBR4Y1187B9990EB,37.77916,-122.42005,California - SF,David Martin,SOTTDKS12AB018D69B,It Wont Be Christmas,241.47546,0


## Database --PostgreSQL

In [9]:
import psycopg2

In [10]:
conn = psycopg2.connect("dbname=excalibur user=postgres password=123")
cur = conn.cursor()
conn.set_session(autocommit=True)

### Creating fact and dim table

 * fact table "songplays" - records in log data associated with song plays i.e. records with page NextSong
 
       songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
 * dim tables :
         * users - users in the app
              user_id, first_name, last_name, gender, level
        * songs - songs in music database
              song_id, title, artist_id, year, duration
        * artists - artists in music database
              artist_id, name, location, latitude, longitude
        * time - timestamps of records in songplays broken down into specific units
              start_time, hour, day, week, month, year, weekday

### Create tables

In [294]:
query_create_songplay = """     create table if not exists songplays (
                                songplay_id serial primary key,
                                start_time timestamp references time (start_time),
                                user_id int references users (user_id),
                                level varchar(10),
                                song_id varchar references songs (song_id),
                                artist_id varchar references artists (artist_id),
                                session_id int,
                                location text,
                                user_agent text
                            ); 
                        """
query_create_users = """
                        create table if not exists users(
                            user_id int primary key,
                            first_name varchar(15),
                            last_name varchar(15),
                            gender char(1),
                            level varchar(10)
                        );
                    """

query_create_songs = """
                        create table if not exists songs(
                            song_id varchar primary key,
                            title varchar,
                            artist_id varchar references artists (artist_id),
                            year int,
                            duration float
                        );
                    """
query_create_artists = """
                            create table if not exists artists(
                                artist_id varchar primary key,
                                name varchar,
                                location text,
                                latitude float,
                                longitude float
                            );
                        """

query_create_time = """
                        create table if not exists time(
                        start_time timestamp primary key,
                        hour int,
                        day int,
                        week int,
                        month int,
                        year int,
                        weekday varchar
                        
                        );
                    """



create_table_queries = [query_create_artists,query_create_songs,query_create_users,query_create_time,query_create_songplay]

In [295]:
for query in create_table_queries:
    cur.execute(query)

In [296]:
cur.execute("select table_name from information_schema.tables where table_schema='public'")
rows = cur.fetchall()
for row in rows:
    print(row)

('artists',)
('songs',)
('time',)
('songplays',)
('users',)


## insert into tables

#### insert into artists table

In [297]:
artist_rows = song_dataset[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']].values.tolist()

query = "insert into artists values (%s,%s,%s,%s,%s) on conflict (artist_id) do nothing"
for row in artist_rows:
    cur.execute(query,row)

#### insert into songs table

In [298]:
song_rows = song_dataset[['song_id','title','artist_id','year','duration']].values.tolist()

query = "insert into songs values (%s,%s,%s,%s,%s) on conflict(song_id) do nothing"

for row in song_rows:
    cur.execute(query,row)

#### insert into users table

In [299]:
user_rows = log_dataset.loc[log_dataset.userId!='',['userId','firstName','lastName','gender','level']].values.tolist()

query = "insert into users values(%s,%s,%s,%s,%s) on conflict(user_id) do update set level=excluded.level"

for row in user_rows:
    cur.execute(query,row)

#### insert into time table

In [300]:
time_rows = log_dataset.loc[log_dataset.page == 'NextSong']
time_rows.index = np.arange(1,len(time_rows)+1)
t = pd.to_datetime(time_rows.ts,unit='ms')
time_rows.loc['ts'] = t

time_rows.dropna(inplace=True)

time_data = [t,t.dt.hour,t.dt.day,t.dt.week,t.dt.month,t.dt.year,t.dt.weekday]
column_names = ('start_time','hour','day','week','month','year','weekday')

time_df = pd.DataFrame.from_dict(dict(zip(column_names,time_data)))
time_df = time_df.values.tolist()

query = "insert into time values(%s,%s,%s,%s,%s,%s,%s) on conflict(start_time) do nothing"

for row in time_df:
    cur.execute(query,row)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  iloc._setitem_with_indexer(indexer, value, self.name)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return func(*args, **kwargs)
  


#### insert into songplays table

In [301]:
select_query = """
            select s.song_id,a.artist_id
            from songs s join artists a
            on s.artist_id = a.artist_id
            where s.title =%s and a.name=%s and s.duration=%s
        """

songplay_table_insert = ("""
    INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""")

In [302]:
for index,row in time_rows.iterrows():
    cur.execute(select_query,(row.song,row.artist,row.length))
    result = cur.fetchone()
    if result:
        song_id,artist_id = result
    else:
        song_id,artist_id = None,None
    
    
    songplay_data = [t[index],row.userId,row.level,song_id,artist_id,row.sessionId,row.location,row.userAgent]
    cur.execute(songplay_table_insert,songplay_data)

### --------------------------------------------

In [306]:
query = "select * from songplays order by song_id"
cur.execute(query)
rows = cur.fetchmany(2)
for row in rows:
    print(row)


(4627, datetime.datetime(2018, 11, 21, 21, 56, 47, 796000), 15, 'paid', 'SOZCTXZ12AB0182364', 'AR5KOSW1187FB35FF4', 818, 'Chicago-Naperville-Elgin, IL-IN-WI', '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"')
(2, datetime.datetime(2018, 11, 1, 21, 5, 52, 796000), 8, 'free', None, None, 139, 'Phoenix-Mesa-Scottsdale, AZ', '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"')


## Drop all tables

In [293]:
cur.execute("drop table songplays")
cur.execute("drop table songs")
cur.execute("drop table users")
cur.execute("drop table artists")
cur.execute("drop table time")