In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date, monotonically_increasing_id

from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import shutil
import urllib 


In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
config = configparser.ConfigParser()
config.read_file(open('aws/credentials/dl.cfg'))

In [None]:
os.environ['AWS_ACCESS_KEY_ID']=config.get('default','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('default','AWS_SECRET_ACCESS_KEY')

In [None]:
hadoop_conf=spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3n.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY'])

In [None]:
log_data = "data/log_data/*.json"

In [None]:
song_data = "data/song_data/*/*/*/*.json"


In [None]:
df = spark.read.json(song_data)

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df_songs = df.selectExpr('song_id', 'title', 'artist_id', 'year', 'duration')

In [None]:
df_songs.printSchema()

In [None]:
df_songs.write.mode('overwrite').partitionBy('year', 'artist_id').parquet('output/songs')

In [None]:
df_artists = df.selectExpr('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')

In [None]:
df_artists.printSchema()

In [None]:
df_artists.write.mode('overwrite').parquet('output/artists')

In [None]:
log_data = "data/log_data/*.json"

In [None]:
df = spark.read.json(log_data)

In [None]:
df.printSchema()

In [None]:
df = df.filter("page = 'NextSong'")

In [None]:
df_users = df.selectExpr('userId', 'firstName', 'lastName', 'gender', 'level')

In [None]:
df_users.write.mode('overwrite').parquet('output/users')

In [None]:
df_log = df.withColumn('new_ts', to_timestamp("ts"))

In [None]:
df_log.printSchema()

In [None]:
df_log = df_log.withColumn('new_date', to_date("new_ts", "timestamp"))

In [None]:
df_times = df_log.select('new_ts', hour('new_ts').alias('hour'),
                             dayofmonth('new_date').alias('day'),
                             weekofyear('new_date').alias('week'),
                            month('new_date').alias('month'),
                            year('new_date').alias('year'),
                            date_format('new_ts', 'EEEE').alias('weekday'))

In [None]:
df_times.printSchema()

In [None]:
df_times.show(5)

In [None]:
df_times.write.partitionBy('year', 'month').mode('overwrite').parquet('output/times')

In [None]:
df_song = spark.read.parquet('output/songs')

In [None]:
df_song.show(1)
df_log.show(1)

In [None]:
df_song.createOrReplaceTempView('songs')
df_log.createOrReplaceTempView('events')

song_plays = spark.sql("""
SELECT 
        row_number() over (partition by new_ts order by new_ts) as id,
        new_ts,
        userId,
        level,
        song_id,
        artist_id,
        sessionId,
        location,
        userAgent
        FROM songs s JOIN events e
        ON s.title = e.song
        AND s.duration = e.length
""")

In [None]:
song_plays = song_plays.withColumn("songplay_id", monotonically_increasing_id())

In [None]:
song_plays.printSchema()

In [None]:
song_plays.show()

In [None]:
song_plays.select(col("*"), year('new_ts').alias('year'), month('new_ts').alias('month')) \
.write \
.partitionBy('year', 'month') \
.mode('overwrite') \
.parquet('output/song_plays')

In [None]:
song_plays.printSchema()