# ELT S3 => S3 parquet

In [None]:
from pyspark.sql import SparkSession
import os
import configparser
import boto3

## Load credentials

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID'] = config.get("default", "AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get("default", "AWS_SECRET_ACCESS_KEY")

## Create spark session with haddop-aws package

In [None]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

# Check out the sample data sources on S3

In [None]:
s3 = boto3.resource('s3',
                  region_name='us-east-1',
                 )

## Load data from S3

In [None]:
songDataDbBucket =  s3.Bucket("udacity-dend")
s3_song_files = [file for _, file in map(lambda x: (x.bucket_name, x.key),songDataDbBucket.objects.filter(Prefix="song_dat"))]
print(s3_song_files[:4])

In [None]:
len(s3_song_files)

In [None]:
results = map(lambda x: "".join(x.split("/")[1:4]), s3_song_files[1:])
d = { x: 0 for x in results}
print(d.keys())

## Load some song example

In [None]:
df = spark.read.json("s3a://udacity-dend/{0}".format(s3_song_files[2]))
print(df.count())
df.printSchema()

In [None]:
df.show()

In [None]:
#path= "s3a://udacity-dend/song_data/*/*/*/*.json"
path= "s3a://udacity-dend/song_data/A/B/*/*.json"
print(path)
dfrw_songs = spark.read.json(path)
print(dfrw_songs.count())
dfrw_songs.printSchema()

### Songs table

> Table: songs - songs in music database <br>
>   fields: song_id, title, artist_id, year, duration

In [None]:
dfsongs = dfrw_songs.select("song_id", "title", "artist_id", "year", "duration")
dfsongs = dfsongs.dropDuplicates(["song_id"])
dfsongs.show()

In [None]:
dfsongs.write.\
    partitionBy("year","artist_id").\
    parquet("s3a://data-lake-udacity-sparkify/song/songs.parquet", mode="overwrite")

In [None]:
dfsongs_read = spark.read.parquet("s3a://data-lake-udacity-sparkify/song/songs.parquet")
dfsongs_read.show()

### Artist table

> Table: artists in music database <br>
>   fields: artist_id, name, location, lattitude, longitude

In [None]:
dfartists = dfrw_songs.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
dfartists.show()

In [None]:
dfartists = dfartists.toDF("artist_id","name","location","lattitude","longitude")
dfartists = dfartists.dropDuplicates(["artist_id"])
dfartists.printSchema()

In [None]:
dfartists.write.\
    parquet("s3a://data-lake-udacity-sparkify/artist/artists.parquet", mode="overwrite")

In [None]:
dfartists_read = spark.read.parquet("s3a://data-lake-udacity-sparkify/artist/artists.parquet")
dfartists_read.show()

## Load some log data example

In [None]:
logDataDbBucket =  s3.Bucket("udacity-dend")
s3_log_files = [file for _, file in map(lambda x: (x.bucket_name, x.key),songDataDbBucket.objects.filter(Prefix="log_data"))]
print(s3_log_files[:4])

In [None]:
path = 's3a://udacity-dend/log_data/2018/11/*.json'
print(path)
dfrw_log = spark.read.json(path)
print(dfrw_log.count())
dfrw_log.printSchema()

In [None]:
dfrw_log.select("ts").show()

In [None]:
from pyspark.sql.functions import from_unixtime

In [None]:
dfrw_log = dfrw_log.withColumn("ts",from_unixtime((dfrw_log.ts.cast('bigint')/1000)).cast('timestamp'))
dfrw_log.printSchema()

In [None]:
dfrw_log.select("ts").show()

### Users table

> users - users in the app <br>
>    user_id, first_name, last_name, gender, level

In [None]:
dfusers = dfrw_log.select("userId", "firstName", "lastName", "gender", "level")
dfusers = dfusers.dropDuplicates(["userId"])
dfusers.show(4)

In [None]:
dfusers.write.\
    parquet("s3a://data-lake-udacity-sparkify/user/users.parquet", mode="overwrite")

In [None]:
dfusers_read = spark.read.parquet("s3a://data-lake-udacity-sparkify/user/users.parquet")
dfusers_read.show()

### Time table

> time - timestamps of records in songplays broken down into specific units <br>
>    start_time, hour, day, week, month, year, weekday

In [None]:
from pyspark.sql.functions import *

In [None]:
dftime = dfrw_log.select(\
                  dfrw_log.ts.alias('start_time'),       
                  hour(dfrw_log.ts).alias('hour'), \
                  dayofmonth(dfrw_log.ts).alias('day'),\
                  weekofyear(dfrw_log.ts).alias('week'),\
                  month(dfrw_log.ts).alias('month'),\
                  year(dfrw_log.ts).alias('year'),\
                  dayofweek(dfrw_log.ts).alias('weekday'),\
                )
dftime.show()

In [None]:
dftime.write.\
    partitionBy("year","month").\
    parquet("s3a://data-lake-udacity-sparkify/time/time.parquet", mode="overwrite")

In [None]:
dftime_read = spark.read.parquet("s3a://data-lake-udacity-sparkify/time/time.parquet")
dftime_read.show()

### Songplays table

> songplays - records in log data associated with song plays i.e. records with page NextSong <br>
>    songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [None]:
dfrw_log = dfrw_log.withColumn("year",year(dfrw_log.ts).alias('year'))

In [None]:
dfrw_log = dfrw_log.withColumn("month",month(dfrw_log.ts).alias('month'))

In [None]:
dfsongplays = dfrw_log.join(
        dfsongs.select("song_id", "title", "artist_id", "duration").alias("songs")
    ).where((dfrw_log["song"] == dfsongs["title"]))

In [None]:
dfsongplays = dfsongplays.filter(dfsongplays["page"] == "NextSong")

In [None]:
dfsongplays.printSchema()

In [None]:
dfsongplays = dfsongplays.select("year", "month", "ts", "userId", "level", "songs.song_id", "songs.artist_id", "sessionId", "location", "userAgent").distinct()

In [None]:
dfsongplays.show(4)

In [None]:
dfsongplays.write.\
    partitionBy("year","month").\
    parquet("s3a://data-lake-udacity-sparkify/songplays/songplays.parquet", mode="overwrite")

In [None]:
dfsongplays_read = spark.read.parquet("s3a://data-lake-udacity-sparkify/songplays/songplays.parquet")
dfsongplays_read.show()