In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"

In [5]:
song_data = os.path.join(input_data, "song_data/A/A/A/TRAAAAK128F9318786.json")
song_data

's3a://udacity-dend/song_data/A/A/A/TRAAAAK128F9318786.json'

In [6]:
 # read song data file
df = spark.read.json(song_data)

   

In [7]:
df.head()

Row(artist_id='ARJNIUY12298900C91', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Adelitas Way', duration=213.9424, num_songs=1, song_id='SOBLFFE12AF72AA5BA', title='Scream', year=2009)

In [8]:
output_data = "s3a://udacitydan"

In [9]:
 # extract columns to create songs table
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration'].dropDuplicates()

# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").mode('overwrite').parquet(os.path.join(output_data, 'songs'))

In [10]:
# extract columns to create artists table
artists_table = df['artist_id','artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [11]:
# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, 'artists'))

In [12]:
log_data = os.path.join(input_data, "log_data/2018/11/2018-11-12-events.json")

# read log data file
df = spark.read.json(log_data)
df.head()

Row(artist=None, auth='Logged In', firstName='Celeste', gender='F', itemInSession=0, lastName='Williams', length=None, level='free', location='Klamath Falls, OR', method='GET', page='Home', registration=1541077528796.0, sessionId=438, song=None, status=200, ts=1541990217796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='53')

In [13]:

# filter by actions for song plays
df = df[df['page'] == 'NextSong']
df.head()


Row(artist='Pavement', auth='Logged In', firstName='Sylvie', gender='F', itemInSession=0, lastName='Cruz', length=99.16036, level='free', location='Washington-Arlington-Alexandria, DC-VA-MD-WV', method='PUT', page='NextSong', registration=1540266185796.0, sessionId=345, song='Mercy:The Laundromat', status=200, ts=1541990258796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', userId='10')

In [14]:

# extract columns for users table    
users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']

# write users table to parquet files
users_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))



In [15]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import expr
#01/Jul/1995:00:00:13 -0400
#df = df.withColumn("timestamp", expr("to_timestamp(ts, unit = 'ms')"))
df = df.withColumn("start_time", expr("cast(ts/1000.0 as timestamp)"))

df.head()

Row(artist='Pavement', auth='Logged In', firstName='Sylvie', gender='F', itemInSession=0, lastName='Cruz', length=99.16036, level='free', location='Washington-Arlington-Alexandria, DC-VA-MD-WV', method='PUT', page='NextSong', registration=1540266185796.0, sessionId=345, song='Mercy:The Laundromat', status=200, ts=1541990258796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', userId='10', start_time=datetime.datetime(2018, 11, 12, 2, 37, 38, 796000))

In [23]:
df.head()

Row(artist='Pavement', auth='Logged In', firstName='Sylvie', gender='F', itemInSession=0, lastName='Cruz', length=99.16036, level='free', location='Washington-Arlington-Alexandria, DC-VA-MD-WV', method='PUT', page='NextSong', registration=1540266185796.0, sessionId=345, song='Mercy:The Laundromat', status=200, ts=1541990258796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', userId='10', start_time=datetime.datetime(2018, 11, 12, 2, 37, 38, 796000))

In [17]:
# extract columns to create time table
df.createOrReplaceTempView("time_temp")
time_table = spark.sql("""
    SELECT  DISTINCT start_time,
                     hour(start_time) AS hour,
                     day(start_time)  AS day,
                     weekofyear(start_time) AS week,
                     month(start_time) AS month,
                     year(start_time) AS year,
                     dayofweek(start_time) AS weekday
    FROM time_temp
""")

time_table.head()

Row(start_time=datetime.datetime(2018, 11, 12, 16, 50, 55, 796000), hour=16, day=12, week=46, month=11, year=2018, weekday=2)

In [18]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").mode('overwrite').parquet(os.path.join(output_data, 'time'))

KeyboardInterrupt: 

In [27]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)

song_df.createOrReplaceTempView("songs")
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
    SELECT  DISTINCT sp.start_time,
        month(sp.start_time) AS month,
        year(sp.start_time) AS year,
        sp.userId as user_id,
        sp.level,
        s.song_id, 
        s.artist_id,
        sp.sessionId as session_id,
        sp.location,
        sp.userAgent as user_agent
    FROM time_temp sp 
    LEFT JOIN songs s 
    ON sp.song = s.title
    AND sp.artist = s.artist_name
""")

songplays_table.head()

Row(start_time=datetime.datetime(2018, 11, 12, 21, 3, 22, 796000), month=11, year=2018, user_id='58', level='paid', song_id=None, artist_id=None, session_id=494, location='Augusta-Richmond County, GA-SC', user_agent='"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"')

In [28]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").mode('overwrite').parquet(os.path.join(output_data, 'songplays'))

KeyboardInterrupt: 