# Sparkify - Data Lake with Spark
In this notebook, we'll be testing out migrating JSON data hosted in Udacity's Sparkify S3 buckets parquet files hosted in our own S3 bucket. In this test notebook, we will demonstrate that this all works appropriately prior to utilizing the information within etl.py directly.

## Project Setup
Getting everything together that we'll need for this project

In [1]:
# Importing packages from etl.py template file
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
# Creating the Spark session
spark = SparkSession \
    .builder \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0') \
    .getOrCreate()

In [3]:
# Importing AWS credentials
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [5]:
# Creating variables pointing to where data is stored
input_data = 's3a://udacity-dend/'
output_data = 'aws_data/'

song_data = input_data + 'song_data/*/*/*/*.json'
log_data = input_data + 'log_data/*/*/*.json'

In [None]:
# Extracting song data
with zipfile.ZipFile(song_data, 'r') as song_zip:
    song_zip.extractall('data/song_data/')

In [None]:
# Extracting log data
with zipfile.ZipFile(log_data, 'r') as log_zip:
    log_zip.extractall('data/log_data/')

## Song Data
In this section, we'll process through the steps to appropriate populate our dimension tables from the song data provided by Udacity.

In [6]:
# Reading in the song data into the 'song_df' DataFrame
song_df = spark.read.json('data/song_data/song_data/A/*/*/*.json')

In [7]:
# Viewing the schema of the newly imported song_df
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
# Viewing first few rows of song_df
song_df.take(5)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

### Song Table
We'll peel off the columns needed for this specific table and write them to the approproriate parquet file, partitioned by year and artist.

In [9]:
# Peeling off appropriate columns from song_df for songs_table
songs_table = song_df['song_id', 'title', 'artist_id', 'year', 'duration']

In [10]:
# Partitioning by year & artist_id and writing to the parquet file
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs/')

### Artist Table
We'll peel off the columns needed for this specific table and write them to the appropriate parquet file.

In [11]:
# Peeling off appripriate columns from song_df for the artists_table
artists_table = song_df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [12]:
# Writing the artists_table information to the appropriate parquet file
artists_table.write.parquet(output_data + 'artists/')

## Log Data
Following a similar approach as the "Song Data" section, we'll use the log data to appropriately populate our fact table and respective dimension tables

In [13]:
# Reading in the log data into the 'log_df' DataFrame
log_df = spark.read.json('data/log_data/*.json')

In [14]:
# Viewing the schema of the newly imported log_df
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
# Viewing first few rows of log_df
log_df.take(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

### User Table
We'll peel off the columns needed for this specific table and write them to the appropriate parquet file.

In [16]:
# Peeling off appripriate columns from log_df for the users_table
users_table = log_df['userId', 'firstName', 'lastName', 'gender', 'level']

In [17]:
# Writing the users_table information to the appropriate parquet file
users_table.write.parquet(output_data + 'users/')

### Time Table
This table is going to be a bit tricky because while we are given the timestamp, we are not provided with the individual fields called for by the project. No matter, we'll simply extract this appropriate information from the timestamp ('ts') field.

In [18]:
# Creating a UDF to extract the information needed appropriately
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [19]:
# Creating new 'start_date' column using UDF defined above
log_df = log_df.withColumn('start_date', get_datetime(log_df.ts))

In [20]:
# Creating time_table using PySpark SQL functions to extract information appropriately
time_table = log_df.select('start_date',
                           hour('start_date').alias('hour'),
                           dayofmonth('start_date').alias('day'),
                           weekofyear('start_date').alias('week'),
                           month('start_date').alias('month'),
                           year('start_date').alias('year'),
                           dayofweek('start_date').alias('weekday')
                          )

In [21]:
# Writing the time_table information to the appropriate parquet file and partitioning by year/month
time_table.write.partitionBy('year', 'month').parquet(output_data + 'time/')

### Songplays Table
Being the fact table, creating this table will be a tad tricky as we'll need to leverage some of the data we already worked with above. Let's first read back in those parquet files we established above for this task.

In [22]:
# Reading in parquet file from above
songs_table_df = spark.read.parquet(output_data + '/songs')

In [23]:
# Joining all the tables together
log_df = log_df.join(songs_table_df, (log_df.song == songs_table_df.title))

In [24]:
# Creating baseline songplay_table WITHOUT songplay_id
songplay_table = log_df['start_date', 'userId', 'level', 'song_id', 'artist_id', 'location', 'userAgent']

*Special thanks to this post for helping me figure out how to add songplay_id column: https://stackoverflow.com/questions/32086578/how-to-add-row-id-in-pyspark-dataframes*

In [25]:
# Adding songplay_id column
songplay_table = songplay_table.withColumn('songplay_id', monotonically_increasing_id())

In [26]:
# Writing songplay_table to appropriate parquet file
songplay_table.write.parquet(output_data + 'songplay/')