# Exercise 3 - Data Lake on S3

In [7]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [9]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWSUSER']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWSUSER']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [10]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

## The etl.py script runs without errors.

The script, etl.py, runs in the terminal without errors. The script reads song_data and load_data from S3, transforms them to create five different tables, and writes them to partitioned parquet files in table directories on S3.

## Analytics tables are correctly organized on S3.

Each of the five tables are written to parquet files in a separate analytics directory on S3. Each table has its own folder within the directory. Songs table files are partitioned by year and then artist. Time table files are partitioned by year and month. Songplays table files are partitioned by year and month.

## The correct data is included in all tables.

Each table includes the right columns and data types. Duplicates are addressed where appropriate.

# Load data from S3

In [11]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://spark-project4/"
    

In [16]:
df = spark.read.json('s3a://udacity-dend/song_data/*/*/*.json') # s3a://udacity-dend/song_data/*/*/*/*.json to read the whole data

Py4JJavaError: An error occurred while calling o199.json.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: CE8D752C065CC226, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: gWjkFIMTsi7dsDrwkrlhSbGOWGzBL41aj1VeBZO0AbP2y1L7O3QSE30Om3WM2zBb2LzTYuj1TJ4=
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
	at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
	at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
	at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1657)
	at org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:245)
	at org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:255)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:549)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [9]:
df.printSchema()
df.show(5)

NameError: name 'df' is not defined

# schema

In [7]:

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType


songSchema = R([
        Fld("artist_id",Str()),
        Fld("artist_latitude",Dbl()),
        Fld("artist_location",Str()),
        Fld("artist_longitude",Dbl()),
        Fld("artist_name",Str()),
        Fld("duration",Dbl()),
        Fld("num_songs",Int()),
        Fld("title",Str()),
        Fld("year",Int()),
    ])

df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/*.json",
                     schema= songSchema)

In [8]:
df.printSchema()
df.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,title,year
0,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,A Whiter Shade Of Pale (Live @ Fillmore West),0
1,ARIOZCU1187FB3A3DC,,"Hamlet, NC",,JOHN COLTRANE,220.44689,1,Giant Steps (Alternate Version_ Take 5_ Altern...,0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,Streets On Fire (Explicit Album Version),0
3,AR5S9OB1187B9931E3,34.05349,"Los Angeles, CA",-118.24532,Bullet Boys,156.62975,1,All Day & All Of The Night,0
4,AR5T40Y1187B9996C6,,"Lulea, Sweden",,The Bear Quartet,249.3122,1,I Remember Nights Wide Open,1998


# saving the data

In [9]:
import  pyspark.sql.functions as F


song_fields = ["title", "artist_id","year", "duration"]

songs_table = (df
               .select(song_fields)
               .dropDuplicates()
               .withColumn("song_id", monotonically_increasing_id())
              )



In [14]:
songs_table.limit(5).toPandas()

Unnamed: 0,title,artist_id,year,duration,song_id
0,Intro,AR558FS1187FB45658,2003,75.67628,51539607552
1,I Need A Mother,ARZGTK71187B9AC7F5,2010,158.01424,137438953472
2,Intro,ARAADXM1187FB3ECDB,1999,67.63057,335007449088
3,XXX,ARZJDBC1187FB52056,1984,327.00036,395136991232
4,Race Of Hate,AREH0O41187FB4C405,1997,311.90159,481036337152


In [13]:
songs_table.count()

23

### Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.




### Spark Write DataFrame to Parquet file format

Using spark.write.parquet() function we can write Spark DataFrame to Parquet file, and parquet() function is provided in DataFrameWriter class. 




### Spark parquet partition – Improving performance

> Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

> Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as year followed by artist_id hence, it creates a year folder inside the artist_id folder.

In [None]:
    
(songs_table
 .write
 .partitionBy("year", "artist_id")
 .parquet(output_data + 'songs/')
)

In [None]:
artists_fields = ["artist_id",
                  "artist_name as name",
                  "artist_location as location",
                  "artist_latitude as latitude",
                  "artist_longitude as longitude"]

artists_table = df.selectExpr(artists_fields).dropDuplicates()

artists_table.write.parquet(output_data + 'artists/')