In [45]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

#os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

In [46]:
# the entry point of a spark application
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

In [3]:
# read song_data s3 input
df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json")
df.printSchema()
df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|              title|year|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+-------------------+----+
|ARJIE2Y1187B994AB7|           null|               |            null|Line Renaud|152.92036|        1|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff| 

In [4]:
# read log_data from s3 input
dff = spark.read.json("s3a://udacity-dend/log_data/2018/11/2018-11-12-events.json")
dff.printSchema()
dff.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------------------+---------+----------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth| firstName|gender|itemInSession|lastName|   le

In [58]:
# Create output s3 pucket which 'abdullahoutput' in our case in region 'us-west-2'
import boto3

s3a = boto3.client('s3')
region='us-west-2'
s3a.create_bucket(Bucket='abdullahoutput', CreateBucketConfiguration={ 'LocationConstraint' : region})

{'ResponseMetadata': {'RequestId': '1ZDFVKE9KZ7M6KS8',
  'HostId': 'XuyFiof3GTEDNjpgtiN7Iw7qic12kUKmnLfGS2IsmJJ84mvAX4XOgCfTOyh/4UGWIrM7Xax8rRY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'XuyFiof3GTEDNjpgtiN7Iw7qic12kUKmnLfGS2IsmJJ84mvAX4XOgCfTOyh/4UGWIrM7Xax8rRY=',
   'x-amz-request-id': '1ZDFVKE9KZ7M6KS8',
   'date': 'Sat, 05 Nov 2022 01:11:46 GMT',
   'location': 'http://abdullahoutput.s3.amazonaws.com/',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': 'http://abdullahoutput.s3.amazonaws.com/'}

In [None]:
# Run the command python etl.py in terminal to execute the ETL steps.
!python etl.py

In [59]:
# explore the objects for songs_table
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])

my_bucket = s3.Bucket('abdullahoutput')
for my_bucket_object in my_bucket.objects.filter(Prefix='songs_table'):
    print(my_bucket_object)

s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/_SUCCESS')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/artist_id=AR10USD1187B99F3F1/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/artist_id=AR1C2IX1187B99BF74/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/artist_id=AR1KTV21187B9ACD72/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/artist_id=AR9Q9YC1187FB5609B/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/artist_id=ARA23XO1187B9AF18F/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='songs_table/year=0/arti

In [61]:
# read songs_table
df_song = spark.read.parquet("s3a://abdullahoutput/songs_table/year=2009/artist_id=ARJNIUY12298900C91/part-00000-56d1cdba-91d7-403e-8279-7dce1a734b0f.c000.snappy.parquet")
df_song.printSchema()
df_song.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)

+------------------+------+--------+
|           song_id| title|duration|
+------------------+------+--------+
|SOBLFFE12AF72AA5BA|Scream|213.9424|
+------------------+------+--------+



In [62]:
# explore the objects for artists_table

my_bucket = s3.Bucket('abdullahoutput')
for my_bucket_object in my_bucket.objects.filter(Prefix='artists_table'):
    print(my_bucket_object)

s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/_SUCCESS')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00000-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00007-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00010-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00023-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00030-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00053-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='artists_table/part-00054-955074f5-6b23-4f4e-8983-776aa2

In [64]:
# read artists_table
df_artists = spark.read.parquet("s3a://abdullahoutput/artists_table/part-00175-955074f5-6b23-4f4e-8983-776aa29cfdb2-c000.snappy.parquet")
df_artists.printSchema()
df_artists.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

+------------------+--------------------+---------------+---------------+----------------+
|         artist_id|         artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+---------------+---------------+----------------+
|AR0MWD61187B9B2B12|International Noi...|               |           null|            null|
+------------------+--------------------+---------------+---------------+----------------+



In [65]:
# explore the objects for users_table

my_bucket = s3.Bucket('abdullahoutput')
for my_bucket_object in my_bucket.objects.filter(Prefix='users_table'):
    print(my_bucket_object)

s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/_SUCCESS')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00000-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00002-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00006-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00008-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00009-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00010-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='users_table/part-00012-dbef0345-04d1-4cff-80c2-31ef51e9ca9a-c000.snap

In [24]:
# read users_table

df_users = spark.read.parquet("s3a://abdullahoutput/users_table/part-00194-b428205e-2450-4e0f-bea9-a032a6e788d7-c000.snappy.parquet")
df_users.printSchema()
df_users.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|    101|    Jayden|      Fox|     M| free|
|     20|     Aiden|  Ramirez|     M| paid|
|      3|     Isaac|   Valdez|     M| free|
+-------+----------+---------+------+-----+



In [69]:
# explore the objects for time_table

my_bucket = s3.Bucket('abdullahoutput')
for my_bucket_object in my_bucket.objects.filter(Prefix='time_table'):
    print(my_bucket_object)

s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/_SUCCESS')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00000-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00001-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00002-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00003-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00004-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSummary(bucket_name='abdullahoutput', key='time_table/year=2018/month=11/part-00005-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet')
s3.ObjectSumma

In [70]:
# read time_table

df_time = spark.read.parquet("s3a://abdullahoutput/time_table/year=2018/month=11/part-00000-0dc9f58e-89c4-4b45-95c8-c5293edbd3db.c000.snappy.parquet")
df_time.printSchema()
df_time.show(5)

root
 |-- datetime: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+--------------------+----+---+----+-------+
|            datetime|          start_time|hour|day|week|weekday|
+--------------------+--------------------+----+---+----+-------+
|2018-11-15 15:37:...|2018-11-15 15:37:...|  15| 15|  46|      5|
|2018-11-15 17:42:...|2018-11-15 17:42:...|  17| 15|  46|      5|
|2018-11-21 04:23:...|2018-11-21 04:23:...|   4| 21|  47|      4|
|2018-11-21 08:52:...|2018-11-21 08:52:...|   8| 21|  47|      4|
|2018-11-21 11:42:...|2018-11-21 11:42:...|  11| 21|  47|      4|
+--------------------+--------------------+----+---+----+-------+
only showing top 5 rows



In [71]:
# explore the objects for songplays_table

my_bucket = s3.Bucket('abdullahoutput')
for my_bucket_object in my_bucket.objects.filter(Prefix='songplays_table'):
    print(my_bucket_object)

s3.ObjectSummary(bucket_name='abdullahoutput', key='songplays_table/_SUCCESS')
