# Test Code for Unzipping Local Files

In [12]:
import configparser
from pyspark.sql import SparkSession, Window
import pandas as pd
import numpy as np
from zipfile import ZipFile

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf, col, monotonically_increasing_id, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql import functions as F

from datetime import datetime

import os

## General

In [5]:
configure = SparkConf().setAppName('name').setMaster('local')
sc = SparkContext(conf = configure)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=name, master=local) created by __init__ at <ipython-input-2-504fa353233c>:2 

In [6]:
# getOrCreate() modifies existing spark session 
spark = SparkSession.builder.appName('local_name').config('config option', 'config value').getOrCreate()

## Song Data

In [7]:
with ZipFile('data/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/local_song_data')

In [8]:
df = spark.read.json('data/local_song_data/song_data/*/*/*/*.json')

In [9]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
df.describe().show()

+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|summary|         artist_id|   artist_latitude|artist_location|  artist_longitude|artist_name|          duration|num_songs|           song_id|               title|             year|
+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|  count|                71|                31|             71|                31|         71|                71|       71|                71|                  71|               71|
|   mean|              null| 36.55297161290323|           null|-73.25123258064517|       null|239.72967605633804|      1.0|              null|                null|785.9577464788732|
| stddev|              null|12.431023413063544|           null| 36.05807592882607|       n

#### Create songs_table

In [11]:
songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']

#### Partition table and export to parquet file

In [12]:
output_data = 'output/'

#songs_table.write.partitionBy('year', 'artist_id').mode('overwrite').parquet(os.path.join(output_data, 'songs'))

#### Create artists_table

In [13]:
artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']

In [14]:
output_data = 'output/'

#artists_table.write.parquet(os.path.join(output_data, 'artists'))

## Log Data

In [15]:
with ZipFile('data/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/local_log_data')

In [16]:
spark = SparkSession.builder.getOrCreate()

In [17]:
df = spark.read.json('data/local_log_data')

In [18]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [19]:
df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


#### Filter by actions for song plays

In [20]:
df = df[df['page'] == 'NextSong']

#### Extract columns for users table    

In [21]:
users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']

In [28]:
users_table = users_table.dropDuplicates(['userId'])

#### Write users table to parquet files

In [20]:
output_data = 'output/'

#users_table.write.parquet(os.path.join(output_data, 'users'))

#### Create timestamp column from original timestamp column

In [21]:
def format_datetime(ts):
    '''
    Returns integer as datetime object.
    '''
    return datetime.fromtimestamp(ts / 1000.0)

In [22]:
get_timestamp = udf(lambda dt: format_datetime(int(dt)), TimestampType())

In [23]:
df = df.withColumn('timestamp', get_timestamp(df.ts))

#### Create datetime column from original timestamp column

In [24]:
get_datetime = udf(lambda dt: format_datetime(int(dt)), DateType())
df = df.withColumn('datetime', get_datetime(df.ts))

In [25]:
df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15


#### Extract columns to create time table

time_table columns = start_time, hour, day, week, month, year, weekday

In [26]:
df = df.withColumn('day_of_week', date_format(col('timestamp'), 'E'))

In [27]:
df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime,day_of_week
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15,Thu
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15,Thu
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15,Thu


In [28]:
time_table = df.select(col('timestamp').alias('start_time'),
                       hour(df.timestamp).alias('hour'),
                       dayofmonth(df.timestamp).alias('day'),
                       weekofyear(df.timestamp).alias('week'),
                       month(df.timestamp).alias('month'),
                       year(df.timestamp).alias('year'),
                       'day_of_week')


#### Write time table to parquet files partitioned by year and month

In [29]:
output_data = 'output/'

time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(os.path.join(output_data, 'time'))

#### Read in song data to use for songplays table

In [47]:
#song_df = spark.read.parquet('output/songs/year=*/artist_id=*/*.parquet')
song_df = spark.read.json('data/local_song_data/song_data/*/*/*/*.json')

#### Extract columns from joined song and log datasets to create songplays table 

**songplays** columns: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [51]:
df.createOrReplaceTempView('log_data_table')
song_df.createOrReplaceTempView('song_data_table')

In [68]:
songplays_table = spark.sql('''
                            SELECT
                                l.timestamp AS start_time,
                                l.userId,
                                l.level,
                                s.song_id,
                                s.artist_id,
                                l.sessionId,
                                l.location,
                                l.userAgent
                            FROM log_data_table l
                                LEFT JOIN song_data_table s
                                    ON l.song = s.title
                            ''')

In [69]:
songplays_table = songplays_table.withColumn('mono_increasing_id', monotonically_increasing_id())
window = Window.orderBy(col('mono_increasing_id'))
songplays_table = songplays_table.withColumn('songplay_id', row_number().over(window))
songplays_table = songplays_table.drop('mono_increasing_id')

#### write songplays table to parquet files partitioned by year and month

In [73]:
songplays_table = songplays_table.withColumn('month', month(col('start_time')))
songplays_table = songplays_table.withColumn('year', year(col('start_time')))

In [75]:
#time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(os.path.join(output_data, 'time'))
output_data = 'output/'

songplays_table.write.partitionBy('year', 'month').mode('overwrite').parquet(os.path.join(output_data, 'songplays'))

## Test AWS connections

In [13]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

In [21]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.9.0") \
        .getOrCreate()
    return spark

In [22]:
aws_spark = create_spark_session()

In [23]:


df = aws_spark.read.json("s3a://{}:{}@udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY))

IllegalArgumentException: 'The bucketName parameter must be specified.'