In [51]:
%%sh
#unzip data/log-data.zip
#unzip data/song-data.zip

In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark=create_spark_session()

In [None]:
input_data = "s3a://udacity-dend/"
song_data = input_data + 'song-data/*/*/*.json'
df =spark.read.json(song_data,multiLine=True)

In [4]:
import glob
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [5]:
# Diretórios
song_data = get_files('song_data')
log_data=get_files('log_data')

# Song Data

In [15]:
df =spark.read.json(song_data[1:5],multiLine=True)
songs_table=df.select('song_id','title','artist_id','year','duration')
artist_table=df.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude')

In [18]:
artist_table.limit(10).show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|           null|            null|
|ARMJAGH1187FB546F3|        The Box Tops|         Memphis, TN|       35.14968|       -90.04892|
|ARNTLGG11E2835DDB9|                 Clp|                    |           null|            null|
|AR8ZCNI1187B9A069B|    Planet P Project|                    |           null|            null|
+------------------+--------------------+--------------------+---------------+----------------+



In [20]:
output_data = "s3a://myawsbuckethermit97/"
songs_table.write.partitionBy('year','artist_id').mode('overwrite').format("csv").save(output_data+'songs_table')

# Log Data

In [8]:
df =spark.read.json(log_data,multiLine=True)
df = df.filter(df.page=='NextSong')

In [None]:
artists_table=df.select('userId','firstName','lastName','gender','level')

In [None]:
get_timestamp = udf(lambda x : datetime.fromtimestamp(x/ 1000.0).strftime("%Y-%m-%d %H:%M:%S"))

#apply this udf in the dataframe with your timestamp
df = df.withColumn("timestamp", get_timestamp(df.ts))

In [None]:
time_table = df.select(year(df.timestamp).alias('year'), month(df.timestamp).alias('month'), dayofmonth(df.timestamp).alias('day_of_month'), hour(df.timestamp).alias('hour'),weekofyear(df.timestamp).alias('weekofyear'),df.timestamp)

In [None]:
songs_df = songs_table.join(artist_table, songs_table.artist_id == artist_table.artist_id).drop(songs_table.artist_id)
songs_df=songs_df.select('artist_id','song_id','title','artist_name','duration')
songs_df=songs_df.join(df,(songs_df.title==df.song) & (songs_df.artist_name==df.artist) & (songs_df.duration==df.length),'right')

In [None]:
songplays_table = songs_df.select('timestamp','userId','level','song_id','artist_id','sessionId','location','userAgent')

In [11]:
input_data = "s3a://udacity-dend/"
input_data + 'song_data'

's3a://udacity-dend/song_data'

In [None]:

def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data + 'song_data'
    
    # read song data file
    df =spark.read.json(song_data,multiLine=True) 

    # extract columns to create songs table
    songs_table=df.select('song_id','title','artist_id','year','duration')
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year','artist_id').mode('overwrite').format("csv").save('s3://myawsbuckethermit97/songstable/')

    # extract columns to create artists table
    artist_table=df.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude')
    
    # write artists table to parquet files
    artists_table.write.mode('overwrite').format("csv").save('s3://myawsbuckethermit97/artiststable/')


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data=input_data + 'log_data'

    # read log data file
    df =spark.read.json(log_data,multiLine=True)
    
    # filter by actions for song plays
    df = df = df.filter(df.page=='NextSong')

    # extract columns for users table    
    artists_table=df.select('userId','firstName','lastName','gender','level')
    
    # write users table to parquet files
    artists_table.write.mode('overwrite').format("csv").save('s3://myawsbuckethermit97/userstable/')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x : datetime.fromtimestamp(x/ 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
    df = df.withColumn("timestamp", get_timestamp(df.ts))
    
    # extract columns to create time table
    time_table = df.select(year(df.timestamp).alias('year'), month(df.timestamp).alias('month'), dayofmonth(df.timestamp).alias('day_of_month'), hour(df.timestamp).alias('hour'),weekofyear(df.timestamp).alias('weekofyear'),df.timestamp)
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year','month').mode('overwrite').format("csv").save('s3://myawsbuckethermit97/timetable/')

    # read in song data to use for songplays table
    songs_df = songs_table.join(artist_table, songs_table.artist_id == artist_table.artist_id).drop(songs_table.artist_id)
    songs_df=songs_df.select('artist_id','song_id','title','artist_name','duration')
    songs_df=songs_df.join(df,(songs_df.title==df.song) & (songs_df.artist_name==df.artist) & (songs_df.duration==df.length),'right')

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = songs_df.select('timestamp','userId','level','song_id','artist_id','sessionId','location','userAgent')


    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year','month').mode('overwrite').format("csv").save('s3://myawsbuckethermit97/songplaystable/')


def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3://myawsbuckethermit97"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)
