### Exploration

In this notebook I am going to explore the data and processes of the first project of the NanoDegree Data Engineering Program.

- 'Walk' the directory with Path
- Retrieve all JSON files
- Load them into Pandas 
- Explore and clean
- Ready for insert -> NOTE: watch out with insertion in the database, something like auto increment oid should be ON 
- Create a SQL database (SQLlite for training purposes?)
- Insert the transformed and cleaned data
- Bonus: Logging (!) -> try to keep it simple but logging is essential for these tasks -> especially for Exception statements etc.

Make sure to write clean robust code, add sensible checks -> assert. Eventually this will become your etl.ipynb 

----------------------------

After the first exploration, there appears to be a recurring process:
- Collect the data for a particular table
- Assert that the data is correct
- Insert the data into the specified table
- Verify the results

There is probably a lot of functionality we can re-use for each table.

#### Find JSON files and return the directories 

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import numpy as np
import pandas as pd
from pathlib import Path

In [5]:
from settings import user, password

In [None]:
def create_path_list(file_path: str, extension: str = '.json') -> list:
    """Returns a list of Paths of all the files with the extension. All subdirectories of file_path are included.
    
    Example
        from pathlib import Path
        
        data_path = Path('.') / 'data'
        csv_path_list = create_path_list(data_path, '.csv')     
    """
    return_list = [x for x in file_path.glob(f"**/*{extension}")]
    print(f"{file_path} contains {len(return_list)} {extension} files.")    
    
    return return_list    

In [None]:
data_path = Path('.') / 'data'

In [None]:
data_path_list = create_path_list(data_path)

### Process song data

From this data 2 tables are created:
- songs - songs in music database -> song_id, title, artist_id, year, duration
- artists - artists in music database -> artist_id, name, location, latitude, longitude

Create a list of tuples for a direct insert into the Postgres table. Validate each row, concatenate to a Dataframe, transform to a list of tuples and return.

https://realpython.com/python-exceptions/#the-assertionerror-exception

Observations:
- Songs: year contains 0 values

#### Songs table

In [None]:
song_path_list = create_path_list(data_path / 'song_data')

In [None]:
song_columns = sorted(['title', 'song_id', 'year', 'duration'])  

In [None]:
def df_assertions(df: pd.DataFrame, target_columns: list) -> None:
    """Assert statements to make sure the retrieved data is valid and clean before insertion into the Postgres table."""     
    df_cols = df.columns.str.lower() 
    found_cols = [x for x in df_cols if x in target_columns]
    
    assert sorted(found_cols) == sorted(target_columns), f"The columns do not match."
    assert df[target_columns].isnull().values.any() == False, f"Missing values in not nullable target columns."
    # assert data types of each column   
    # assert any constraint important to the Postgres Table 
    
    return None

In [None]:
song_df = pd.DataFrame(columns=song_columns)

for idx, file in enumerate(song_path_list):
    temp_df = pd.read_json(file, lines=True)    
    try:
        df_assertions(temp_df, song_columns)
    except AssertionError as error:
        print(f"Error @ file {idx} {file}: {error} NOTE: this file will not be inserted.")
    else:
        song_df = song_df.append(temp_df[song_columns], ignore_index=True)

In [None]:
song_table_data = list(song_df.to_records(index=False))

#### Artists table

In [None]:
example_df[['artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]

In [None]:
artist_columns = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [None]:
artist_df = pd.DataFrame(columns=artist_columns)

for idx, file in enumerate(song_path_list):
    temp_df = pd.read_json(file, lines=True)    
    artist_df = artist_df.append(temp_df[artist_columns], ignore_index=True)

In [None]:
artist_df.info()

In [None]:
artist_df

In [None]:
artist_df.replace({'': None, np.nan: None})

### Logging data

There are 2 tables we want to extract from logging data
- users
- time

Challenge:
- users needs to be filtered on ['auth']=='Logged In']
- time needs to be filterd on ['page']=='NextSong'
- Ideally, we do not want to load the data twice since that would cause A lot of overhead...

Create a function which takes temp_df as input, and returns a temp_users and temp_time df

In [None]:
log_path_list = create_path_list(data_path / 'log_data')

In [None]:
log_df = pd.read_json(log_path_list[0], lines=True)

In [None]:
def expands_dfs(temp_df):
    """Returns 2 dataframes which can be used for the users and time tables in Postgres."""
    users_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']
    time_columns = ['ts']
    
    song_df = temp_df[temp_df['auth']=='Logged In']
    time_df = temp_df[temp_df['page']=='NextSong']
    
    return (song_df[users_columns], time_df[time_columns])

In [None]:
expands_dfs(log_df)

In [None]:
song_df = pd.DataFrame()
time_df = pd.DataFrame()

for idx, file in enumerate(log_path_list):
    temp_df = pd.read_json(file, lines=True)   
    
    temp_song, temp_time = expands_dfs(temp_df)
    
    song_df = song_df.append(temp_song, ignore_index=True)
    time_df = time_df.append(temp_time, ignore_index=True)

In [None]:
time_df

In [None]:
time_df[]

In [None]:
expand_ms(time_df['ts']).drop_duplicates(subset='start_time')

In [None]:
def expand_ms(ms_series: pd.Series) -> pd.DataFrame:
    """Expands a Pandas series with milliseconds with several datetime attributes."""
    df = pd.DataFrame({'start_time': pd.to_datetime(ms_series, unit='ms')})
    
    df['hour'] = df['start_time'].dt.hour
    df['day'] = df['start_time'].dt.day
    df['day'] = df['start_time'].dt.isocalendar().week  
    df['month'] = df['start_time'].dt.month
    df['year'] = df['start_time'].dt.year
    df['weekday'] = df['start_time'].dt.weekday

    return df    

In [None]:
for time in time_df:
    pd.DataFrame({'start_time': pd.to_datetime(time, unit='ms')}

In [None]:
type(time_df)

In [None]:
log_path_list = create_path_list(data_path / 'log_data')

In [None]:
log_df = pd.read_json(log_path_list[0], lines=True)

In [None]:
log_df.head()

In [None]:
log_df.info()

In [None]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [None]:
log_df['page'].value_counts()

In [None]:
next_songs = log_df[log_df['page']=='NextSong']

#### Users -> [user_id, first_name, last_name, gender, level]
Levels feels like something we should update... hence the hints on the cheat sheet :D -> take time into account, the files are in chronological order

- Per file we should only keep one row per userid
- If the userid already exists, check if we can update gender and / or level, else skip
- make sure to read the files in the correct order

In [None]:
users_dtype_dict = {'userId': int,
                    'firstName': str, 
                    'lastName': str,
                    'gender': str,
                    'level': str} 

In [None]:
users_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']

In [None]:
log_path_list = create_path_list(data_path / 'log_data')

#### make sure the data files are in correct chronological order + make sure the data is appended in chronological order!! this way we can keep track whom changes their level accordingly

In [None]:
sorted(log_path_list)

In [None]:
def df_log_assertions(df: pd.DataFrame, target_columns: list, not_nullable_columns: list = None) -> None:
    """Assert statements to make sure the retrieved data is valid and clean before insertion into the Postgres table."""     
    low_df_columns = [x.lower() for x in df.columns]
    low_target_columns = [x.lower() for x in target_columns] 
    
    found_cols = [x for x in low_df_columns if x in low_target_columns]    
    assert sorted(found_cols) == sorted(low_target_columns), f"The columns do not match."
    
    if not_nullable_columns:
        assert df[not_nullable_columns].isnull().values.any() == False, f"Missing values in not nullable target columns."
    else:
        assert df[target_columns].isnull().values.any() == False, f"Missing values in the target columns, if allowed please specify these columns."
    
    return None

In [None]:
def create_log_insert_list(file_path_list: list, insert_columns: list, primary_keys: list,
                           dtype_dict: dict, not_nullable_columns: list = None) -> list:
    """Takes a raw file_path list as input, performs several validation checks, and returns a list of tuples
    ready for insertion in Postgres."""
    target_df = pd.DataFrame(columns=insert_columns)

    for idx, file in enumerate(file_path_list):
        temp_df = pd.read_json(file, lines=True)   

        try:
            df_log_assertions(temp_df, insert_columns, not_nullable_columns)
        except AssertionError as error:
            print(f"AssertionError @ file {idx} {file}: {error} NOTE: this file will not be inserted.")
        else:
            try:
                # we do not want to store non logged in users
                temp_df = temp_df[temp_df['auth']=='Logged In']
                temp_df[insert_columns] = temp_df[insert_columns].astype(dtype_dict)
            except ValueError as error:
                print(f"ValueError @ file {idx} {file}: {error} NOTE: this file will not be inserted")
            else:
                target_df = target_df.append(temp_df[insert_columns], ignore_index=True)
                
    insert_df = target_df.drop_duplicates(subset=primary_keys)
    print(f"There were {target_df.shape[0]-insert_df.shape[0]} duplicate primary keys removed from the insert dataframe")
                
    if not_nullable_columns:
        insert_df = insert_df.replace({'': None, np.nan: None})  # Postgres does not recognize '' or np.nan as NULL
    
    # This list comprehension converts the numpy dtypes to standard python dtypes which are necessary for Postgres
    return (insert_df, [tuple(row) for row in insert_df.itertuples(index=False)])

In [None]:
log_df, log_tuple_list = create_log_insert_list(file_path_list=log_path_list,
                                                insert_columns=users_columns,
                                                primary_keys=['userId', 'gender', 'level'],
                                                dtype_dict=users_dtype_dict,
                                                not_nullable_columns=['userId', 'level'])

In [None]:
log_df

In [None]:
def _df_assertions(df: pd.DataFrame, target_columns: list, not_nullable_columns: list = None) -> None:
    """Assert statements to make sure the retrieved data is valid and clean before insertion into the Postgres table."""     
    low_df_columns = [x.lower() for x in df.columns]
    low_target_columns = [x.lower() for x in target_columns] 
    
    found_cols = [x for x in low_df_columns if x in low_target_columns]    
    assert sorted(found_cols) == sorted(low_target_columns), f"The columns do not match."
    
    if not_nullable_columns:
        assert df[not_nullable_columns].isnull().values.any() == False, f"Missing values in not nullable target columns."
    else:
        assert df[target_columns].isnull().values.any() == False, f"Missing values in the target columns, if allowed please specify these columns."
    
    return None

In [None]:
def _expand_ms(ms_series: pd.Series) -> pd.DataFrame:
    """Expands a Pandas series of milliseconds with several datetime attributes."""
    df = pd.DataFrame({'start_time': pd.to_datetime(ms_series, unit='ms')})
    
    df['hour'] = df['start_time'].dt.hour
    df['day'] = df['start_time'].dt.day
    df['day'] = df['start_time'].dt.isocalendar().week  
    df['month'] = df['start_time'].dt.month
    df['year'] = df['start_time'].dt.year
    df['weekday'] = df['start_time'].dt.weekday

    return df  

In [None]:
def create_log_insert_lists(file_path_list: list, insert_columns: list, primary_keys: list,
                            dtype_dict: dict, not_nullable_columns: list = None) -> list:
    """Takes a raw file_path list as input, performs several validation checks, and returns a list of tuples
    ready for insertion in Postgres."""
    target_df = pd.DataFrame(columns=insert_columns)

    for idx, file in enumerate(file_path_list):
        temp_df = pd.read_json(file, lines=True)   

        try:
            _df_log_assertions(temp_df, insert_columns, not_nullable_columns)
        except AssertionError as error:
            print(f"AssertionError @ file {idx} {file}: {error} NOTE: this file will not be inserted.")
        else:
            try:
                # we do not want to store non logged in users
                temp_df = temp_df[temp_df['auth']=='Logged In']
                temp_df[insert_columns] = temp_df[insert_columns].astype(dtype_dict)
            except ValueError as error:
                print(f"ValueError @ file {idx} {file}: {error} NOTE: this file will not be inserted")
            else:
                target_df = target_df.append(temp_df[insert_columns], ignore_index=True)
                
    insert_users_df = target_df.drop_duplicates(subset=primary_keys)
    print(f"There were {target_df.shape[0]-insert_users_df.shape[0]} duplicate primary keys removed from the insert dataframe")
    
    insert_time_df = _expand_ms(target_df['ts'])
                
    if not_nullable_columns:
        insert_users_df = insert_users_df.replace({'': None, np.nan: None})  # Postgres does not recognize '' or np.nan as NULL
    
    # The list comprehension converts the numpy dtypes to standard python dtypes which are necessary for Postgres
    return ([tuple(row) for row in insert_users_df.itertuples(index=False)], [tuple(row) for row in insert_time_df.itertuples(index=False)])

In [None]:
log_dtype_dict = {'userId': int,
                  'firstName': str, 
                  'lastName': str,
                  'gender': str,
                  'level': str,
                  'ts': int} 

In [None]:
log_columns = ['userId', 'firstName', 'lastName', 'gender', 'level', 'ts']

In [None]:
users_table_data, time_table_data = create_log_insert_lists(file_path_list=log_path_list,
                                                            insert_columns=log_columns,
                                                            primary_keys=['userId', 'gender', 'level'],
                                                            dtype_dict=log_dtype_dict,
                                                            not_nullable_columns=['userId', 'level'])

In [None]:
len(time_table_data)

### Time table
- start_time, hour, day, week, month, year, weekday
- ts (timestamp) of records in log data with page=NextSong
- Convert back to ms when joining on the other tables..if necessary..

#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [None]:
log_path_list = create_path_list(data_path / 'log_data')

In [None]:
log_df = pd.read_json(log_path_list[0], lines=True)

In [None]:
log_df

In [None]:
log_df['ts'].head()

In [None]:
def expand_ms(ms_series: pd.Series) -> pd.DataFrame:
    """Expands a Pandas series with milliseconds with several datetime attributes."""
    df = pd.DataFrame({'start_time': pd.to_datetime(ms_series, unit='ms')})
    
    df['hour'] = df['start_time'].dt.hour
    df['day'] = df['start_time'].dt.day
    df['day'] = df['start_time'].dt.isocalendar().week  
    df['month'] = df['start_time'].dt.month
    df['year'] = df['start_time'].dt.year
    df['weekday'] = df['start_time'].dt.weekday

    return df    

In [None]:
expand_ms(log_df['ts'])

In [None]:
expand_ms(log_df['ts']).info()

In [None]:
list(expand_ms(log_df['ts']).columns)

### Songplays

#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.