In [2]:
import pandas as pd
pd.set_option('max_colwidth', 800)

In [3]:
import os
import glob
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
spark = SparkSession.builder\
            .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
            .getOrCreate()

In [6]:
spark

In [7]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    return all_files

In [8]:
log_files = get_files('data/log-data')
song_files = get_files('data/song-data')

In [9]:
log_data = spark.read.json(log_files)

In [10]:
songs_data = spark.read.json(song_files)

In [18]:
input_data = "s3a://udacity-dend/"

In [14]:
output_data = "s3a://sparkify-partitioned-tables/"

In [64]:
log_data_input = os.path.join(input_data, 'log-data/*/*/*.json')

In [None]:
songs_data_input = os.path.join(input_data, 'song-data/*/*/*/*.json')

In [65]:
log_data_s3 = spark.read.json(log_data_input)

In [11]:
log_data = log_data.filter(log_data.page == 'NextSong')

In [16]:
user_df = log_data.selectExpr(
    'cast(userId as int) userId', 
    'firstName', 
    'lastName', 
    'gender', 
    'level'
).dropDuplicates(['userId', 'level'])

In [17]:
user_df.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,49,Chloe,Cuevas,F,free
1,14,Theodore,Harris,M,free
2,80,Tegan,Levine,F,paid
3,73,Jacob,Klein,M,paid
4,39,Walter,Frye,M,free


In [19]:
user_df.write.mode('overwrite').parquet(output_data + 'users_table/')

In [17]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)



In [14]:
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))

In [15]:
log_data = log_data.withColumn('timestamp', (col("ts") / 1000).cast("timestamp"))

In [16]:
log_data = log_data.withColumn('datetime', get_datetime(log_data.ts))

In [18]:
time_df = log_data.select(
            col("ts"),
            year(col("datetime")).alias("year"),
            month(col("datetime")).alias("month"),
            dayofmonth(col("datetime")).alias("dayofmonth"),
            hour(col("datetime")).alias("hour"),
            weekofyear(col("datetime")).alias("weekofyear"),
).dropDuplicates()

In [19]:
time_df.limit(10).toPandas()

Unnamed: 0,ts,year,month,dayofmonth,hour,weekofyear
0,1542787214796,2018,11,21,8,47
1,1542841824796,2018,11,21,23,47
2,1542178191796,2018,11,14,6,46
3,1542185846796,2018,11,14,8,46
4,1542186926796,2018,11,14,9,46
5,1542201595796,2018,11,14,13,46
6,1542236727796,2018,11,14,23,46
7,1543426351796,2018,11,28,17,48
8,1543432234796,2018,11,28,19,48
9,1541413092796,2018,11,5,10,45


In [35]:
time_df.count()

6813

In [20]:
songs_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [21]:
songs_data.createOrReplaceTempView("songs")
log_data.createOrReplaceTempView("logs")

In [23]:
songsplay_df = spark.sql("""
    SELECT DISTINCT
            logs.ts, 
            logs.userid, 
            logs.level, 
            songs.song_id, 
            songs.artist_id, 
            logs.sessionid, 
            logs.location, 
            logs.useragent
    FROM logs LEFT JOIN songs
    ON songs.title=logs.song AND songs.artist_name=logs.artist AND songs.duration=logs.length
    WHERE logs.ts IS NOT NULL
""")

In [None]:
songsplay_df.limit(10).toPandas()

In [25]:
songsplay_df.printSchema()

root
 |-- ts: long (nullable = true)
 |-- userid: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionid: long (nullable = true)
 |-- location: string (nullable = true)
 |-- useragent: string (nullable = true)



In [33]:
songs_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [35]:
songs_df = songs_data.select([
    'song_id', 
    'title', 
    'artist_id', 
    'year', 
    'duration'
]).dropDuplicates(['song_id'])

In [None]:
songs_df.limit(5).toPandas()

In [37]:
songs_df.count()

71

In [39]:
artists_df = songs_data.select([
    'artist_id', 
    'artist_name', 
    'artist_location', 
    'artist_latitude', 
    'artist_longitude'
]).dropDuplicates(['artist_id'])

In [21]:
import boto3
s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                   )

In [None]:
sampleDbBucket =  s3.Bucket("udacity-dend")
for obj in sampleDbBucket.objects.all():
    print(obj)