## Sparkify Project

Este projeto foi proposto pelo Nanodegree Data Engineer da Udacity.

### Introdução

**Modelagem de Dados com Postgres e ETL pipeline**

Uma startup chamada Sparkify quer analisar os dados que eles tem coletados das músicas e atividades dos usuários no seu novo aplicativo de streaming de música. O time de análises está interessando em entender quais músicas os usuários estão ouvindo. Atualmente, eles não tem uma forma fácil de consultar esses dados, que estão num diretório em formato JSON, um com os dados dos logs das atividades dos usuarios no aplicativo e outro com os metadados das musicas na biblioteca do aplicativo.


Eles gostariam que um engenheiro de dados criasse um banco de dados Postgres com as tabelas desenvolvidas para otimizar as consultas das análises das músicas reproduzidas. Seu papel é criar um database schema e um ETL pipeline para estas análises.

### Descrição do Projeto

Neste projeto, iremos aplicar o que aprendemos em modelagem de dados com Postgres e construir um pipeline para extrair, transformar e carregar os dados no banco de dados usando Python. Para completar o projeto, precisaremos definir as tabelas fatos e dimensão, usando um star schema para facilitar as consultas do time de análise de dados, além de escrever um pipeline de dados para transferir os dados dos arquivos locais em formato JSON para estas tabelas criadas no Postgres, usando Python e SQL

Consiste em extrair os dados de duas fontes:

- song_data: Exemplo de cada registro

`{
"num_songs": 1, 
"artist_id": "ARMJAGH1187FB546F3", 
"artist_latitude": 35.14968, 
"artist_longitude": -90.04892, 
"artist_location": "Memphis, TN", 
"artist_name": "The Box Tops", 
"song_id": "SOCIWDW12A8C13D406", 
"title": "Soul Deep", 
"duration": 148.03546, 
"year": 1969
}`

- log_data: Exemplo de cada registro

`{
"artist":"Des'ree",
"auth":"Logged In",
"firstName":"Kaylee",
"gender":"F",
"itemInSession":1,
"lastName":"Summers",
"length":246.30812,
"level":"free",
"location":"Phoenix-Mesa-Scottsdale, AZ",
"method":"PUT",
"page":"NextSong",
"registration":1540344794796.0,
"sessionId":139,
"song":"You Gotta Be",
"status":200,
"ts":1541106106796,
"userAgent":"\"Mozilla\/5.0 (Windows NT 6.1; WOW64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/35.0.1916.153 Safari\/537.36\"",
"userId":"8"
}`

### Print da tabela dos logs de eventos dos usuários:

![Screenshot of log_data](images/log-data.png)

## Definindo Schema e tabelas de relacionamentos

Como o intuito é facilitar as consultas para a área de análise de dados executar e obter dados para futuras análises, vamos modelar seguindo o modelo Star Schema, um processamento OLAP (On-line Analytical Processing), que é desenhado para performar melhor e tem as seguintes características:

- Aplicação: No nível estratégico, auxilia na análise empresarial e tomada de decisões;
- Funcionalidade: Gera análises e relatórios gerenciais com leitura otimizada;
- Estrutura de dados: Poucos detalhes, pois tem alto nível de sumarização;
- Armazenamento dos dados: Utiliza-se da Data Warehouse para otimizar o desempenho da grande quantidade de dados;
- Usuários: Destinados aos gestores e time analítico;
- Frequência de utilização: Baixa, conforme programação da empresa;
- Volatilidade: Dados não sofrem alterações, pois os usuários apenas realizarão sua leitura.

O OLAP (On-line Analytical Processing) é voltado para a tomada de decisões, proporciona uma visão dos dados orientado à análise, além de uma navegação rápida e flexível. O OLAP recebe dados do OLTP (On-line Transactional Processing) para que possa realizar as análises. Essa carga de dados acontece conforme a necessidade da empresa. Sendo um sistema para tomada de decisões, não realiza transações (INSERT, UPDATE, DELETE) pois sua finalidade são consultas. Possui dados atuais e históricos e não há necessidade de backups regularmente, sendo que ele possui informações do OLTP. Caso algo aconteça com a base OLAP basta fazer uma carga novamente.


### Tabelas

Vamos agora começar a definir nossas tabelas fato e tabelas dimensão, lembrando que uma tabela fato é um evento, uma venda, uma transação ocorrida, um fato transacional que ocorreu no nosso sistema, no caso, quando um usuário acessa o aplicativo sparkify e clica numa música para ouvir, esses dados da musica selecionada, do usuário, localização, página em que ele estava e etc, fazem parte do evento "tocar música", esse é um bom candidato para nossa tabela fato. As tabelas fatos geralmente são de dados numéricos e não categóricos. Resumindo:

A tabela fato é a principal tabela do Data Warehouse, ela vai conectar nas dimensões. Nessa tabela são armazenadas duas coisas: as métricas, que são os fatos propriamente ditos, e as foreign keys, chaves que servem para ligar os dados das dimensões com a fato. Ou seja, a tabela fato é composta pelas métricas, que são tudo aquilo que a empresa quer medir, junto com as foreign keys, chaves que ligam às dimensões que descrevem essas métricas. As métricas são utilizadas para medir, quantificar algo, são sempre números provenientes de transações da empresa. Tudo que a empresa quer mensurar é métrica, geralmente sendo o que o usuário quer medir.

Mas o que é uma Foreign Key? É uma chave estrangeira que serve para relacionar os dados entre as tabelas fato e dimensão.

Já as tabelas Dimensão, são as categorias de cada entidade envolvida na tabela fato, que podem ser usadas para trazer mais informações sobre os dados e facilitar as análises. Os dados do usuário que reproduziu uma música é um exemplo para uma tabela dimensão, o nome, sobrenome, gênero, nível na aplicação são os dados dessa dimensão. Os dados da música selecionadas já fazem parte de outra dimensão, como o nome da música, o ano de lançamento, tempo de duração e etc.

A Dimensão possui característica descritiva dentro do DW. Ela qualifica as informações provenientes da tabela Fato. Através dela é possível analisar os dados sob múltiplas perspectivas.

### Fact Table

- songplays: registros nos dados log de eventos associados com músicas reproduzidas
    - songplay_id, 
    - start_time, 
    - user_id, 
    - level, 
    - song_id, 
    - artist_id, 
    - session_id, 
    - location, 
    - user_agent

### Dimension Tables

- users: usuários do aplicativo
    - user_id, 
    - first_name, 
    - last_name, 
    - gender, 
    - level
    
    
- songs: músicas no banco de dados
    - song_id, 
    - title, 
    - artist_id, 
    - year, 
    - duration
    
    
- artists: Artistas das músicas no banco de dados
    - artist_id, 
    - name, 
    - location, 
    - latitude, 
    - longitude


- time: Timestamps dos registros das músicas reproduzidas quebrados em unidades específicas para facilitar futuras análises
    - start_time, 
    - hour, 
    - day, 
    - week, 
    - month, 
    - year, 
    - weekday

## Passo a passo

Primeiro vamos escrever as queries para dropar e criar as tabelas e selecionar as músicas, depois vamos criar o banco de dados e as tabelas, vou explicando linha a linha para fácil entendimento

In [65]:
# DROP TABELAS

drop_table_songplays = "DROP TABLE IF EXISTS songplays"
drop_table_users = "DROP TABLE IF EXISTS users"
drop_table_songs = "DROP TABLE IF EXISTS songs"
drop_table_artists = "DROP TABLE IF EXISTS artists"
drop_table_time = "DROP TABLE IF EXISTS time"

# CREATE TABELAS

create_table_songplays = ("""
    CREATE TABLE IF NOT EXISTS songplays
    (songplay_id int PRIMARY KEY, 
    start_time date, 
    user_id text NOT NULL,
    level text, 
    song_id text,
    artist_id text,
    session_id int,
    location text,
    user_agent text)
""")

create_table_users = ("""
  CREATE TABLE IF NOT EXISTS users
  (user_id text PRIMARY KEY,
  first_name text NOT NULL,
  last_name text NOT NULL,
  gender text,
  level text)
""")

create_table_songs = ("""
  CREATE TABLE IF NOT EXISTS songs
  (song_id text PRIMARY KEY,
  title text NOT NULL,
  artist_id text NOT NULL,
  year int,
  duration float NOT NULL)
""")

create_table_time = ("""
  CREATE TABLE IF NOT EXISTS time
    (start_time date PRIMARY KEY,
     hour int, 
     day int, 
     week int, 
     month int, 
     year int, 
     weekday text)
""")

create_table_artists = ("""
  CREATE TABLE IF NOT EXISTS artists
  (artist_id text PRIMARY KEY,
  name text NOT NULL,
  location text,
  latitude float,
  longitude float)
""")

In [66]:
# INSERT REGISTROS

insert_song = ("""
  INSERT INTO songs
  (song_id, title, artist_id, year, duration)
  VALUES (%s, %s, %s, %s, %s)
  ON CONFLICT (song_id) DO NOTHING
""")

insert_artist = ("""
  INSERT INTO artists
  (artist_id, name, location, latitude, longitude)
  VALUES (%s, %s, %s, %s, %s)
  ON CONFLICT (artist_id) DO NOTHING
""")

insert_time = ("""
  INSERT INTO time
  (start_time, hour, day, week, month, year, weekday)
  VALUES (%s, %s, %s, %s, %s, %s, %s)
  ON CONFLICT (start_time) DO NOTHING
""")

insert_user = ("""
  INSERT INTO users
  (user_id, first_name, last_name, gender, level)
  VALUES (%s, %s, %s, %s, %s)
  ON CONFLICT (user_id) DO NOTHING
""")

insert_songplays = ("""
  INSERT INTO songplays
  (songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
  VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  ON CONFLICT (songplay_id) DO NOTHING
""")

# SONG SELECT

song_select = ("""
  SELECT song_id, artists.artist_id, duration
  FROM songs 
  JOIN artists 
  ON songs.artist_id = artists.artist_id
  WHERE songs.title = %s
  AND artists.name = %s
  AND songs.duration = %s
""")

In [67]:
create_table_queries = [create_table_songplays, create_table_users, create_table_songs, create_table_time, create_table_artists]

drop_table_queries = [drop_table_songplays, drop_table_users, drop_table_songs, drop_table_artists, drop_table_time]

In [68]:
# Agora vamos começar a criar o database e as tabelas

import psycopg2

def main():
    """Inicia conexão e cursor para criar database, drop e criar as tabelas"""
    
    cur, conn = create_database()
    
    drop_tables(cur, conn)
    create_tables(cur, conn)
    
    cur.close()
    conn.close()

In [69]:
def create_database():
    """Cria o banco de dados, mas antes faz o drop do banco de dados 
    se ele já existe, e em seguida, cria-o novamente se não existe"""
    
    conn = psycopg2.connect("host=localhost dbname=postgres user=ebraim password=ebraim")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    cur.execute("DROP DATABASE sparkifydb")
    cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf-8'")
    cur.close()
    conn.close()
    
    conn = psycopg2.connect("host=localhost dbname=sparkifydb user=ebraim password=ebraim")
    cur = conn.cursor()
    
    return cur, conn


In [70]:
def drop_tables(cur, conn):
    """Drops all tables created on the database"""
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

def create_tables(cur, conn):
    """Create all tables on the database"""
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [71]:
main()

In [74]:
# ETL PIPELINE

import os
import glob
import psycopg2
import pandas as pd

def main_etl():
    """ Function used to extract, transform all data from song_data and log_data and load it into a PostgreSQL DB
    """

    conn = psycopg2.connect("host=localhost dbname=sparkifydb user=ebraim password=ebraim")
    cur = conn.cursor()

    process_data(cur, conn, filepath="data/song_data", func=process_song_data)
    process_data(cur, conn, filepath="data/log_data", func=process_log_data)

    cur.close()
    conn.close()

def process_data(cur, conn, filepath, func):
    """Walks through all files nested under filepath, and processes all logs found.

    Parameters:
      cur (psycopg2.cursor()): Cursor of the sparkifydb database
      conn (psycopg2.connect()): Connection to the sparkifycdb database
      filepath (str): Filepath parent of the logs to be analyzed
      func (python function): Function to be used to process each log

    Returns:
      Name of files processed
    """

    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))
  
    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()

    return all_files


def process_song_data (cur, datafile):
    """ Reads songs log file row by row, selects needed fields and inserts them into song and artist tables.

    Parameters:
      cur (psycopg2.cursor()): Cursor of the sparkifydb database
      filepath (str): Filepath of the file to be analyzed
    """

    # open song file
    df = pd.read_json(datafile, lines=True)

    for value in df.values:
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value

        # insert artist record
        artist_data = [artist_id, artist_name, artist_location, artist_latitude, artist_longitude]
        cur.execute(insert_artist, artist_data)

        song_data = [song_id, title, artist_id, year, duration]
        cur.execute(insert_song, song_data)


def process_log_data (cur, datafile):
    """ Reads user activity log file row by row, filters by NextSong, selects needed fields, transforms them and inserts them into time, user and songplay tables.

    Parameters: 
      cur (psycopg2.cursor()): Cursor of the sparkifydb database
      filepath (str): Filepath of the file to be analyzed
    """

    # open log file
    df = pd.read_json(datafile, lines=True)

    # filter by NextSong action
    df = df[df['page'] == 'NextSong']

    # convert timestamp column to datetime
    t = pd.to_datetime(df['ts'], unit='ms')

    # insert time records
    time_data = []
    for line in t:
        time_data.append([line, line.hour, line.day, line.week, line.month, line.year, line.day_name()])
    column_labels = ('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')
    time_df = pd.DataFrame(time_data, columns=column_labels)

    for i, row in time_df.iterrows():
        cur.execute(insert_time, list(row))

    # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(insert_user, list(row))

    # insert songplay records
    for i, row in df.iterrows():

        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()

        if results:
            songid, artistid, duration = results
        else:
            songid, artistid, duration = None, None, None

        # insert songplay record
        # songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
        songplay_data = (i, pd.to_datetime(row.ts, unit='ms'), int(row.userId), row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(insert_songplays, songplay_data)


In [None]:
main_etl()