In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS CREDS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()
    return spark

In [2]:
spark = create_spark_session()
input_data = "s3a://udacity-dend/"

In [3]:
song_data = "s3a://udacity-dend/song_data/A/A/*/*.json"

In [4]:
df =spark.read.json(song_data)

In [5]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [6]:
songs_table = df.select(['song_id','title','duration','year','artist_id'])

In [7]:
songs_table.write.mode('overwrite').parquet('songs.parquet')

In [8]:
artists_table = df.select(['artist_id','artist_name','artist_latitude','artist_longitude','artist_location'])

In [9]:
artists_table.write.mode('overwrite').parquet('artists.parquet')

In [10]:
df = spark.read.json(input_data+ "log_data/2018/*/*.json")

In [11]:
len(df.collect())

8056

In [12]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [13]:
df = df.where(df.page == 'NextSong')

In [14]:
len(df.collect())

6820

In [15]:
users_table = df.select(['userId','firstName','lastName','gender','level'])

In [16]:
users_table.write.mode('overwrite').parquet('users.parquet')

In [17]:
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000),TimestampType())
df = df.withColumn('timestamp', get_timestamp("ts"))

In [18]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [19]:
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn('datetime', get_datetime("ts"))

In [20]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)



In [21]:
df.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), datetime='2018-11-15 00:30:26')]

In [22]:
df.createOrReplaceTempView('time_df')

In [23]:
time_table = spark.sql('''
SELECT datetime as start_time, 
hour(timestamp) as hour,
dayofmonth(timestamp) as day,
dayofweek(timestamp) as weekday,
weekofyear(timestamp) as week,
month(timestamp) as month,
year(timestamp) as year
FROM time_df''')

In [24]:
song_df = spark.read.parquet('songs.parquet')

In [25]:
# extract columns from joined song and log datasets to create songplays table 


In [26]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)



In [27]:
song_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: long (nullable = true)
 |-- artist_id: string (nullable = true)



In [28]:
songplay_df = df.join(song_df, (df.artist == song_df.artist_id) & (df.song == song_df.song_id))

In [29]:
songplay_df = songplay_df.withColumn('songplay_id', monotonically_increasing_id())

NameError: name 'artists_songs_logs' is not defined

In [30]:
songplay_df = songplay_df.join(
        time_table,
        songplay_df.ts == time_table.start_time, 'left'
    ).drop(songplay_df.year)

In [31]:
songplay_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- songplay_id: long (nullable = false)
 |-- start_time: stri

In [None]:
songplay_df.createOrReplaceTempView('songplay_df')

In [None]:
spark.sql('''
SELECT songplay_id AS songplay_id,
timestamp   AS start_time,
userId      AS user_id,
level       AS level,
song_id     AS song_id,
artist_id   AS artist_id,
sessionId   AS session_id,
location    AS location,
userAgent   AS user_agent
FROM songplay_df
ORDER BY (user_id, session_id)
''').collect()