# Exercise 3 - Data Lake on S3

In [1]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

## The etl.py script runs without errors.

The script, etl.py, runs in the terminal without errors. The script reads song_data and load_data from S3, transforms them to create five different tables, and writes them to partitioned parquet files in table directories on S3.

## Analytics tables are correctly organized on S3.

Each of the five tables are written to parquet files in a separate analytics directory on S3. Each table has its own folder within the directory. Songs table files are partitioned by year and then artist. Time table files are partitioned by year and month. Songplays table files are partitioned by year and month.

## The correct data is included in all tables.

Each table includes the right columns and data types. Duplicates are addressed where appropriate.

# Load data from S3

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://project4dend/"
    

In [5]:
df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/*.json")# s3a://udacity-dend/song_data/*/*/*/*.json to read the whole data

In [6]:
df.printSchema()
df.limit(3).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
1,ARIOZCU1187FB3A3DC,,"Hamlet, NC",,JOHN COLTRANE,220.44689,1,SOCEMJV12A6D4F7667,Giant Steps (Alternate Version_ Take 5_ Altern...,0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0


# schema

Spark DataFrames schemas are defined as a collection of typed columns. The entire schema is stored as a StructType and individual columns are stored as StructFields.



In [14]:

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType


songSchema = R([
        Fld("artist_id",Str()),
        Fld("artist_latitude",Dbl()),
        Fld("artist_location",Str()),
        Fld("artist_longitude",Dbl()),
        Fld("artist_name",Str()),
        Fld("duration",Dbl()),
        Fld("num_songs",Int()),
        Fld("title",Str()),
        Fld("year",Int()),
    ])

df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/*.json",
                     schema= songSchema)

In [15]:
df.printSchema()
df.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,title,year
0,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,A Whiter Shade Of Pale (Live @ Fillmore West),0
1,ARIOZCU1187FB3A3DC,,"Hamlet, NC",,JOHN COLTRANE,220.44689,1,Giant Steps (Alternate Version_ Take 5_ Altern...,0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,Streets On Fire (Explicit Album Version),0
3,AR5S9OB1187B9931E3,34.05349,"Los Angeles, CA",-118.24532,Bullet Boys,156.62975,1,All Day & All Of The Night,0
4,AR5T40Y1187B9996C6,,"Lulea, Sweden",,The Bear Quartet,249.3122,1,I Remember Nights Wide Open,1998


# saving the data

In [19]:
import  pyspark.sql.functions as F


song_fields = ["title", "artist_id","year", "duration"]

songs_table = (df
               .select(song_fields)
               .dropDuplicates()
               .withColumn("song_id", monotonically_increasing_id())
              )



In [20]:
songs_table.show(4)

+---------------+------------------+----+---------+------------+
|          title|         artist_id|year| duration|     song_id|
+---------------+------------------+----+---------+------------+
|          Intro|AR558FS1187FB45658|2003| 75.67628| 51539607552|
|I Need A Mother|ARZGTK71187B9AC7F5|2010|158.01424|137438953472|
|          Intro|ARAADXM1187FB3ECDB|1999| 67.63057|335007449088|
|            XXX|ARZJDBC1187FB52056|1984|327.00036|395136991232|
+---------------+------------------+----+---------+------------+
only showing top 4 rows



In [21]:
songs_table.count()


23

### Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.




### Spark Write DataFrame to Parquet file format

Using spark.write.parquet() function we can write Spark DataFrame to Parquet file.




### Spark parquet partition – Improving performance

> Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

> Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as year followed by artist_id hence, it creates a artist_id folder inside the year folder.

In [22]:
    
(songs_table
 .write
 .partitionBy("year", "artist_id")
 .parquet(output_data + 'songs/')
)

In [31]:
artists_fields = ["artist_id",
                  "artist_name as name",
                  "artist_location as location",
                  "artist_latitude as latitude",
                  "artist_longitude as longitude"]

artists_table = df.selectExpr(artists_fields).dropDuplicates()

artists_table.write.parquet(output_data + 'artists/')

In [32]:
artists_table.show(3)

+------------------+----------------+---------------+--------+---------+
|         artist_id|            name|       location|latitude|longitude|
+------------------+----------------+---------------+--------+---------+
|AR5T40Y1187B9996C6|The Bear Quartet|  Lulea, Sweden|    null|     null|
|AREH0O41187FB4C405|         Skinlab|        Oakland|    null|     null|
|ARZGTK71187B9AC7F5|            Eels|California, USA|    null|     null|
+------------------+----------------+---------------+--------+---------+
only showing top 3 rows

