### Import

In [3]:
import configparser
import os
import pyspark.sql.functions as F

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

### Spark Session

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

### Data

In [5]:
input_data = "data/"
output_data = "output/"

### Process Data

In [6]:
#decompress data
import zipfile
with zipfile.ZipFile("data/song-data.zip","r") as zip_ref:
    zip_ref.extractall("data/song_data")

In [7]:
#read in song file and check
song_data = input_data + "song_data/*/*/*/*/"

In [8]:
song_df = spark.read.json(song_data)

In [14]:
song_df.count()

71

In [9]:
song_df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [10]:
songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"])
songs_table.show(5, False)

+------------------+----------------------------------------------------+------------------+----+---------+
|song_id           |title                                               |artist_id         |year|duration |
+------------------+----------------------------------------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fingevo di dormire                     |ARDR4AC1187FB371A1|0   |511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (Album Version)                     |AREBBGV1187FB523D2|0   |173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                     |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand In Mine [Live At Royal Albert Hall]|ARPBNLO1187FB3D52F|2000|43.36281 |
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Around On Me (Taco Hell) |ARDNS031187B9924F0|2005|186.48771|
+------------------+----------------------------------------------------+------------------+----+---------+
only showing top 5 rows



In [11]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])
artists_table.show(5, truncate = False)

+------------------+----------------------------------------------------------------------------------------------+-----------------+---------------+----------------+
|artist_id         |artist_name                                                                                   |artist_location  |artist_latitude|artist_longitude|
+------------------+----------------------------------------------------------------------------------------------+-----------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti|                 |null           |null            |
|AREBBGV1187FB523D2|Mike Jones (Featuring CJ_ Mello & Lil' Bran)                                                  |Houston, TX      |null           |null            |
|ARMAC4T1187FB3FA4C|The Dillinger Escape Plan                                                                     |Morris Plains, NJ|40.82624       |-74.47995       

In [12]:
log_data = input_data + "log_data/"

In [13]:
with zipfile.ZipFile("data/log-data.zip","r") as zip_ref:
    zip_ref.extractall("data/log_data")

In [14]:
log_df = spark.read.json(log_data)
log_df.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [15]:
log_df.count()

8056

In [17]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [16]:
log_df = log_df.where('page="NextSong"')
log_df.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [23]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
users_table.show(5, truncate = False)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|57    |Katherine|Gay     |F     |free |
|84    |Shakira  |Hunt    |F     |free |
|22    |Sean     |Wilson  |F     |free |
|52    |Theodore |Smith   |M     |free |
|80    |Tegan    |Levine  |F     |paid |
+------+---------+--------+------+-----+
only showing top 5 rows



In [24]:
log_df = log_df.withColumn('timestamp',( (log_df.ts.cast('float')/1000).cast("timestamp")) )
log_df.select('timestamp').show(3, False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:29:39.712|
|2018-11-15 00:40:35.072|
|2018-11-15 00:44:57.216|
+-----------------------+
only showing top 3 rows



In [25]:
time_table = log_df.select(
                    F.col("timestamp").alias("start_time"),
                    F.hour("timestamp").alias('hour'),
                    F.dayofmonth("timestamp").alias('day'),
                    F.weekofyear("timestamp").alias('week'),
                    F.month("timestamp").alias('month'), 
                    F.year("timestamp").alias('year'), 
                    F.date_format(F.col("timestamp"), "E").alias("weekday")
                )


time_table.show(5, False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-15 00:29:39.712|0   |15 |46  |11   |2018|Thu    |
|2018-11-15 00:40:35.072|0   |15 |46  |11   |2018|Thu    |
|2018-11-15 00:44:57.216|0   |15 |46  |11   |2018|Thu    |
|2018-11-15 03:44:05.12 |3   |15 |46  |11   |2018|Thu    |
|2018-11-15 05:48:36.224|5   |15 |46  |11   |2018|Thu    |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [27]:
song_df = spark.read.json(input_data+'song_data/*/*/*/*/*.json')

In [28]:
song_log_joined_table = log_df.join(song_df, (log_df.song == song_df.title) & (log_df.artist == song_df.artist_name) & (log_df.length == song_df.duration), how='inner')

In [29]:
songplays_table = song_log_joined_table.distinct() \
                    .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
                    .withColumn("songplay_id", F.row_number().over( Window.partitionBy("timestamp").orderBy("timestamp"))) \
                    .withColumnRenamed("userId","user_id")        \
                    .withColumnRenamed("timestamp","start_time")  \
                    .withColumnRenamed("sessionId","session_id")  \
                    .withColumnRenamed("userAgent", "user_agent") \
                    

songplays_table.show(5)

+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|user_id|          start_time|           song_id|         artist_id|level|session_id|            location|          user_agent|songplay_id|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|     15|2018-11-21 21:56:...|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| paid|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          1|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+



In [30]:
songs_table.write.parquet(output_data + 'songs/' + 'artists.parquet')
artists_table.write.parquet(output_data + 'artists/' + 'artists.parquet')
users_table.write.parquet(output_data + 'users/' + 'artists.parquet')
time_table.write.parquet(output_data + 'time/' + 'artists.parquet')
songplays_table.write.parquet(output_data + 'songplays/' + 'artists.parquet')

In [None]:
# aws

In [9]:
import boto3
import configparser
import os

config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [10]:
!aws configure list

      Name                    Value             Type    Location
      ----                    -----             ----    --------
   profile                <not set>             None    None
access_key     ****************NNMV              env    
secret_key     ****************l9GO              env    
    region                <not set>             None    None


In [14]:
s3 = boto3.resource('s3', region_name='us-west-2')
for bucket in s3.buckets.all():
    print(bucket.name)


aws-emr-resources-063864077654-us-east-1
aws-logs-063864077654-us-east-1


In [12]:
!aws sts get-caller-identity


{
    "UserId": "AIDAQ5XUYPVLEVXQFOMP2",
    "Account": "063864077654",
    "Arn": "arn:aws:iam::063864077654:user/s3_user"
}
