# Sparkify Data Lake with Spark

Exploratory data analysis and first version of the project

In [1]:
from pyspark.sql import SparkSession
import os
import configparser
import pyspark.sql.functions as F

from pyspark.sql.types import IntegerType,BooleanType,DateType,DoubleType

## AWS Credentials

In [2]:
config = configparser.ConfigParser()

config.read_file(open("dl.cfg"))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
INPUT_FILE = "s3a://udacity-dend/"

## Create spark session with hadoop-aws package

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [5]:
spark

## Staging Tables

### Song Table

In [6]:
#reading data from S3 bucket (AWS)
df_song =  spark.read.json(INPUT_FILE + "song_data/A/A/*/*.json")

In [7]:
# total records
df_song.count()

604

In [8]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
# casting some column types
df_song = df_song.withColumn("year",df_song.year.cast('integer'))

df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [10]:
df_song.show(1)

+------------------+---------------+---------------+----------------+--------------------+--------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|         artist_name|duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+--------------------+--------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|      Tennessee|       -85.97874|Royal Philharmoni...|94.56281|        1|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|   0|
+------------------+---------------+---------------+----------------+--------------------+--------+---------+------------------+--------------------+----+
only showing top 1 row



### Log Table

In [11]:
#reading data from S3 bucket (AWS)
df_log = spark.read.json(INPUT_FILE + "log_data/*/*/*.json")

In [12]:
# total records
df_log.count()

8056

In [13]:
# filter by Page = "NextSongs"
df_log = df_log[df_log.page == 'NextSong']
df_log.count()

6820

In [14]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
# casting some column types
df_log = df_log.withColumn("ts",F.to_timestamp(df_log.ts.cast('bigint')/1000))
df_log = df_log.withColumn("userId",df_log.userId.cast('integer'))
df_log = df_log.withColumn("sessionId",df_log.userId.cast('integer'))

df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)



## Data Lake Tables

### Artists table

In [16]:
# selecting columns necessary to create the dimension table: songs
artist_table = df_song.select(df_song.artist_id, df_song.artist_name, df_song.artist_location, \
                              df_song.artist_latitude, df_song.artist_longitude)

In [17]:
# total records
artist_table.count()

604

In [18]:
# removing duplicate records by "artist_id"
artist_table = artist_table.dropDuplicates(['artist_id'])

In [19]:
# total records after removing duplicates
artist_table.count()

587

In [20]:
#showing an example of the table
artist_table.createOrReplaceTempView("artist")
spark.sql("""
    SELECT *
    FROM artist
""").show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR06EB01187FB40150|                NOFX|        Berkeley, CA|           null|            null|
|AR06XSY1187B9B279E|   Little River Band|Melbourne, Australia|           null|            null|
|AR08LXJ1187B9995A4|           Tungtvann|                    |           null|            null|
|AR08VNE1187FB45C2F|Dance With A Stra...|                    |           null|            null|
|AR0IT221187B999C4D|      The Weathermen|             BELGIUM|       50.50101|         4.47684|
|AR0L04E1187B9AE90C|           The Verve|Wigan, Lancashire...|           null|            null|
|AR0MWD61187B9B2B12|The (Internationa...|                    |           null|            null|
|AR0TKGM1187B98B40E|           Stereolab

### Song table

In [21]:
# selecting columns necessary to create the dimension table: songs
song_table = df_song.select(df_song.song_id, df_song.title, df_song.artist_id, df_song.year, df_song.duration)      

In [22]:
# total records
song_table.count()

604

In [23]:
# removing duplicate records by "song_id"
song_table = song_table.dropDuplicates(['song_id'])

In [24]:
# total records after removing duplicates
song_table.count()

604

In [25]:
#showing an example of the table
song_table.createOrReplaceTempView("songs")
spark.sql("""
    SELECT *
    FROM songs
    LIMIT 10
""").show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAADAD12A8C13D5B0|One Shot (Album V...|ARQTC851187B9B03AF|2005|263.99302|
|SOABCEU12A8C132027|          Cold Waste|ARL6NP61187B98C1FC|2007|385.43628|
|SOABWAP12A8C13F82A|           Take Time|AR5LMPY1187FB573FE|1978|258.89914|
|SOABYIT12AB0183026|        Vilda vindar|AR98ZSW1187B98E82C|1985|266.13506|
|SOAESJW12A8C137CC2|     Musical Episode|AR3PN3R1187FB4CEBD|2005|234.44853|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOAFBKM12AB01837A7|          Brain Dead|ARL14X91187FB4CF14|1995| 94.22322|
|SOAFLZM12A8C132A2D|        Rock Rumberu|ARG17O11187FB4A8DA|2006|292.17914|
|SOAFQQN12A58A7CA7C|      At The Unicorn|ARROV6A1187B9913A7|   0| 245.4722|
|SOAFUVR12AB01800F9|         Star Surfer|ARXRKEA11F50C4AE9E|   0|273.55383|
+-----------

### Users table

In [26]:
# selecting columns necessary to create the dimension table: users
users_table = df_log.select(df_log.userId, df_log.firstName, df_log.lastName, df_log.level, df_log.gender)

In [27]:
# total records
users_table.count()

6820

In [28]:
# removing duplicate records by "UserId"
users_table = users_table.dropDuplicates(['userId'])

In [29]:
# total records after removing duplicates
users_table.count()

96

In [30]:
#showing an example of the table
users_table.createOrReplaceTempView("users")
spark.sql("""
    SELECT *
    FROM users
    LIMIT 10
""").show()

+------+---------+--------+-----+------+
|userId|firstName|lastName|level|gender|
+------+---------+--------+-----+------+
|     2|  Jizelle|Benjamin| free|     F|
|     3|    Isaac|  Valdez| free|     M|
|     4|   Alivia| Terrell| free|     F|
|     5|   Elijah|   Davis| free|     M|
|     6|  Cecilia|   Owens| free|     F|
|     7|   Adelyn|  Jordan| free|     F|
|     8|   Kaylee| Summers| free|     F|
|     9|    Wyatt|   Scott| free|     M|
|    10|   Sylvie|    Cruz| free|     F|
|    11|Christian|  Porter| free|     F|
+------+---------+--------+-----+------+



### Time table

In [31]:
# getting some columns like year, month, week of the year, etc.. from timestamp colum and creating the time table
time_table = df_log.select(df_log.ts
                           , F.hour("ts").alias('hour')
                           , F.dayofmonth("ts").alias('day')
                           , F.weekofyear("ts").alias('week')
                           , F.month("ts").alias('month')
                           , F.year("ts").alias('year')
                           , F.dayofweek("ts").alias('weekday'))

In [32]:
time_table.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



### Songsplay Table _(fact table)_

In [33]:
# selecting only the columns necessary from songs table to be used to create the fact table
songs_table = df_song.select(df_song.song_id
                             , df_song.artist_id
                             , df_song.title
                             , df_song.artist_name )
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_name: string (nullable = true)



In [34]:
# selecting only the columns necessary from logs table to be used to create the fact table
events_table = df_log.select(df_log.ts
                             , F.year("ts").alias('year')
                             , F.month("ts").alias('month')
                             , df_log.artist
                             , df_log.song
                             , df_log.userId
                             , df_log.level
                             , df_log.sessionId
                             , df_log.location
                             , df_log.userAgent)
events_table.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- artist: string (nullable = true)
 |-- song: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)



In [35]:
# joining both dataframes to build the fact table
songsplay_table = events_table.join(songs_table
                                , (events_table.artist == songs_table.artist_name) & (events_table.song == songs_table.title ) )

In [36]:
# selecting only the columns necessary to create the fact table
songsplay_table = songsplay_table.select(songsplay_table.ts
                                         , songsplay_table.year
                                         , songsplay_table.month
                                         , songsplay_table.userId
                                         , songsplay_table.level
                                         , songsplay_table.song_id
                                         , songsplay_table.artist_id
                                         , songsplay_table.sessionId 
                                         , songsplay_table.location
                                         , songsplay_table.userAgent)

songsplay_table.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)



In [37]:
#showing an example of the table
songsplay_table.createOrReplaceTempView("songsplay")
spark.sql("""
    SELECT *
    FROM songsplay
    LIMIT 10
""").show()

+--------------------+----+-----+------+-----+------------------+------------------+---------+--------------------+--------------------+
|                  ts|year|month|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+--------------------+----+-----+------+-----+------------------+------------------+---------+--------------------+--------------------+
|2018-11-21 22:56:...|2018|   11|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       15|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|2018-11-05 18:49:...|2018|   11|    73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|       73|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|
|2018-11-13 23:39:...|2018|   11|    55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|       55|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|
|2018-11-16 15:21:...|2018|   11|    85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|       85|       Red Bluff, CA|"Mozilla/5.0 (Mac...|
|2018-11-20 18:46:...|2018|   11|    49| 

## Parquet files 

Saving pyspark dataframes already created as parquet files in my own machine, some of them with partitions

In [38]:
artist_table.write.mode('overwrite').parquet('data-lake/' + 'artists')

In [39]:
song_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet('data-lake/' + 'songs')

In [40]:
users_table.write.mode('overwrite').parquet('data-lake/' + 'users')

In [41]:
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet('data-lake/' + 'time')

In [42]:
songsplay_table.write.mode('overwrite').partitionBy('year', 'month').parquet('data-lake/' + 'songsplay')

## Parquet files to S3 (AWS)

Previously it is necessary to create a S3 bucket

AWS CLI --> ```aws s3 mb s3://udacity-datalake-sparkify-edls --region us-west-2```

To configure the AWS CLI following this [AWS Tutorial](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html)

In [1]:
OUTPUT_FILE = 's3a://udacity-datalake-sparkify-edls/'

In [44]:
artist_table.write.mode('overwrite').parquet(OUTPUT_FILE  + 'artists')

In [None]:
song_table.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(OUTPUT_FILE + 'songs')

In [None]:
users_table.write.mode('overwrite').parquet(OUTPUT_FILE + 'users')

In [None]:
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(OUTPUT_FILE + 'time')

In [None]:
songsplay_table.write.mode('overwrite').partitionBy('year', 'month').parquet(OUTPUT_FILE + 'songsplay')

## Documentation

- Install Spark: [Spark Pre-built for Apache Hadoop 2.7](https://spark.apache.org/downloads.html)

- Problems running Hadoop on Windows: [WINUTILS.EXE](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems)

- Transform epoch dates: [PySpark dataframe epoch dates](https://stackoverflow.com/questions/49971903/converting-epoch-to-datetime-in-pyspark-data-frame-using-udf)

- Join pyspark dataframes with multiples conditions: [PySpark Join With Multiple Columns & Conditions](https://sparkbyexamples.com/pyspark/pyspark-join-two-or-multiple-dataframes/)

- SaveMode parquet files: [Append or Overwrite](https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/)

- Saving parquet files with partitions: [PySpark Partitions](https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/)