## Full ETL Project Code

---

**what does this code do?**

- ✅ **Extract**: Ingests data from aws s3 bucket directly using spark.  
- ✅ **Extract**: Uses boto3 package to verify connection and extract basic s3 bucket information
- ✅ **Tranform**: Performs data transformation to enrich the data
- ✅ **Load**: Loads transformed data back to aws 3 bucket

In [None]:
# finding and importing spark on my machine

"""
1. Verify that the IAM role or user has the necessary permisisons to read from the S3 bucket
2. The policy should include `s3:GetObject` and `s3:ListBucket` (for the bucket) at minimum.
3. Check if the bucket policy or any S3 bucket policy denies the access.
4. Version of `hadoop-aws` must match the Hadoop version that Spark is built with. In our case, spark is built with hadoop 3.3.4

"""
import findspark
findspark.init()

In [3]:
# importing packages

import os
import sys

# Set Hadoop home
os.environ['HADOOP_HOME'] = 'C:\\hadoop'
sys.path.append(os.environ['HADOOP_HOME'] + "\\bin")

# Set Hive temp dirs
os.environ['TEMP'] = 'C:\\tmp'
os.environ['TMP'] = 'C:\\tmp'

from pyspark.sql import SparkSession 
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, year, month, sum, avg, count # type: ignore

import boto3
from botocore.exceptions import ClientError
import json

import configparser
from datetime import datetime

In [110]:
# setting configurations

config = configparser.ConfigParser()
config.read('config.ini')

aws_access_key = config['AWS']['access_key']
aws_secret_key = config['AWS']['Secret_key']

s3_input_bucket = config['S3']['input_bucket'] # top level container name
s3_input_prefix = config['S3']['input_prefix'] # the 'folder path' within the bucket

single_file = config['PATH']['sinlge_file_path']
songs_files = config['PATH']['all_songs_files']
s3_logs_files = config['PATH']['all_log_files']

s3_output_songs_table = config['songs_table']['songs_path']
s3_output_logs_table = config['log_table']['logs_path']
s3_output_combined_table = config['combined_table']['combined_path']

In [5]:
# 3. Get S3 file list using Boto3

s3 = boto3.client('s3',
        aws_access_key_id= aws_access_key,
        aws_secret_access_key= aws_secret_key)

In [6]:
response = s3.list_objects_v2(
        Bucket=s3_input_bucket,
        Prefix=s3_input_prefix )

In [None]:
# finding number of files in the bucket 

files = [f"s3a://{s3_input_bucket}/{obj['Key']}" 
             for obj in response.get('Contents', []) 
             if not obj['Key'].endswith('/')]

print(f"Found {len(files)} files to process")

Found 105 files to process


In [None]:
# extracting bucket location

s3.get_bucket_location(Bucket=s3_input_bucket)

{'ResponseMetadata': {'RequestId': 'ECRXTEYZB483QZJG',
  'HostId': 'XM1QMA8G6n3xnnyHaWnEQ0UYzxZawM2/9Hw69rPYha2RyaLGcoqDiFDFNf9Es0BrbaYFf9rdVZA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'XM1QMA8G6n3xnnyHaWnEQ0UYzxZawM2/9Hw69rPYha2RyaLGcoqDiFDFNf9Es0BrbaYFf9rdVZA=',
   'x-amz-request-id': 'ECRXTEYZB483QZJG',
   'date': 'Mon, 23 Jun 2025 09:14:49 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'LocationConstraint': 'eu-west-2'}

In [None]:
# extracting bucket policy

s3.get_bucket_policy(Bucket=s3_input_bucket)

{'ResponseMetadata': {'RequestId': '9DAW4FJV534Y4QF6',
  'HostId': '0HEXANQj5AKoODtXAuEld9IZaxYvhtWaY+GPiWwDZ1TNz6ymXIT39f/77JfRfBe8Q5RbiF0jW2mKD/NebqZWXwWv29x3GFEH',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '0HEXANQj5AKoODtXAuEld9IZaxYvhtWaY+GPiWwDZ1TNz6ymXIT39f/77JfRfBe8Q5RbiF0jW2mKD/NebqZWXwWv29x3GFEH',
   'x-amz-request-id': '9DAW4FJV534Y4QF6',
   'date': 'Mon, 23 Jun 2025 09:14:52 GMT',
   'content-type': 'application/json',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Policy': '{"Version":"2012-10-17","Statement":[{"Sid":"Statement1","Effect":"Allow","Principal":"*","Action":"s3:*","Resource":"arn:aws:s3:::spark-datalake-bucket"}]}'}

In [None]:
# initializing Spark Session with enhanced S3 configurations

spark =SparkSession.builder \
        .appName("S3-403-Fix") \
        .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.endpoint", f"s3.{"eu-west-2"}.amazonaws.com") \
        .config("spark.hadoop.fs.s3a.region", "eu-west-2") \
        .config("spark.hadoop.fs.s3a.path.style.access", "false") \
        .config("spark.hadoop.fs.s3a.acl.default", "BucketOwnerFullControl") \
        .config("spark.sql.warehouse.dir", "C:/spark-warehouse") \
        .config("hive.exec.scratchdir", "C:/tmp/hive") \
        .enableHiveSupport() \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
        .config("spark.hadoop.fs.s3a.attempts.maximum", "20") \
        .config("spark.hadoop.fs.s3a.connection.timeout", "100000") \
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
        .config("spark.hadoop.fs.s3a.change.detection.version.required", "false") \
        .getOrCreate()

In [None]:
# reading a sinlge file

single_song = spark.read.json(single_file) # path is saved in config.ini file

single_song.printSchema()
single_song.show()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+----------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|           title|year|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+----------------+----+
|ARD7TVE1187B99BFB1|           NULL|California - LA|            NULL|     Casual|218.93179|        1|SOMZWCG12A8C13C480|I Didn't Mean To|   0|
+------

In [None]:
# reading all files in s3 song_data directory

all_songs = spark.read.option("recursiveFileLookup", "true").json(songs_files)
all_songs.printSchema()
all_songs.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



71

In [9]:
# creating a temporary sql view for data processing

all_songs.createOrReplaceTempView("songs")

In [10]:
# Discarding any duplicate songs using distinct function

songs_table_query = (spark.sql (
    '''
    select distinct
    song_id,
    title as song_title,
    artist_id,
    year,
    duration
    from songs

    '''
)
) 

songs_table_query.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|          song_title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
|SODUJBS12A8C132150|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SOGXHEG12AB018653E|It Makes No Diffe...|AR0RCMP1187FB3F427|1992|133.32853|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
|SODREIN12A5

In [11]:
# discarding any duplicate artists by using distinct function

artist_table_query = (spark.sql (
    '''
    select distinct
    artist_id,
    artist_name,
    artist_location,
    artist_latitude,
    artist_longitude
    from songs

    '''
)
) 

artist_table_query.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|       40.82624|       -74.47995|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|       32.67828|       -83.22295|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|       51.50632|        -0.12714|
|ARPFHN61187FB575F6|         Lupe Fiasco

In [None]:
# creating a spark SQL table directly via SQL using CTAS

spark.sql(
    '''
  CREATE TABLE songs_table
  AS
  select distinct
    song_id,
    title as song_title,
    artist_id,
    year,
    duration
    from songs
'''
)

In [None]:
# confirming if the table is craeated successfully. Below list will show both persistent table and temp view

spark.sql("SHOW TABLES").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|songs_table|      false|
|         |      songs|       true|
+---------+-----------+-----------+



In [14]:
# querying the persistent table

spark.sql(
    '''
select*
from songs_table

'''
).show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|          song_title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [15]:
# verifying number of songs created

spark.sql(
    '''
select
count(*) as total_song_records
from songs_table

'''
).show()

+------------------+
|total_song_records|
+------------------+
|                71|
+------------------+



In [None]:
# reading the table and writing to s3

(
   spark.table("songs_table") # picking up the source table to write
     .write
     .mode("overwrite")   # this mode replaces existing data. 'Apend' mode is used to add more data to existing data
     .format("parquet")   # Use "csv", "json", etc. for other formats. Parquet is recommended for datalakes
     .option("path", s3_output_songs_table) # specifiying s3 destination and outcome table name
     .save() # executing the write operation
)

In [None]:
# reading all files in s3 log_data directory

all_song_logs = spark.read.option("recursiveFileLookup", "true").json(s3_logs_files)
all_song_logs.printSchema()
all_song_logs.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



8056

In [18]:
# creating a temporary sql view for data processing

all_song_logs.createOrReplaceTempView("logs")

In [66]:
logs_query = (spark.sql (
    '''
    select 
    song,
    artist as artistName,
    concat(firstName,' ',lastName) as userName,
    case when gender = 'F' then 'Female' else 'Male' end as userGender,
    cast(ts/1000 as Timestamp) as timestamp,
    day(cast(ts/1000 as Timestamp)) as day,
    hour(cast(ts/1000 as Timestamp)) as hour,
    month(cast(ts/1000 as Timestamp)) as month,
    year(cast(ts/1000 as Timestamp)) as year
    from logs

    '''
)
) 
logs_query.show(15)

+--------------------+--------------------+---------------+----------+--------------------+---+----+-----+----+
|                song|          artistName|       userName|userGender|           timestamp|day|hour|month|year|
+--------------------+--------------------+---------------+----------+--------------------+---+----+-----+----+
|       Sehr kosmisch|            Harmonia|     Ryan Smith|      Male|2018-11-15 00:30:...| 15|   0|   11|2018|
|     The Big Gundown|         The Prodigy|     Ryan Smith|      Male|2018-11-15 00:41:...| 15|   0|   11|2018|
|            Marry Me|               Train|     Ryan Smith|      Male|2018-11-15 00:45:...| 15|   0|   11|2018|
|                NULL|                NULL|    Wyatt Scott|      Male|2018-11-15 01:57:...| 15|   1|   11|2018|
|                NULL|                NULL| Austin Rosales|      Male|2018-11-15 03:29:...| 15|   3|   11|2018|
|           Blackbird|         Sony Wonder|Samuel Gonzalez|      Male|2018-11-15 03:44:...| 15|   3|   1

In [None]:
# spark.sql("DROP TABLE IF EXISTS log_table PURGE")

DataFrame[]

In [82]:
# creating a persistent table using CTAS

spark.sql(
    '''
  CREATE TABLE logs_table
  AS
  select 
    song,
    artist as artistName,
    concat(firstName,' ',lastName) as userName,
    case when gender = 'F' then 'Female' else 'Male' end as userGender,
    cast(ts/1000 as Timestamp) as timestamp,
    day(cast(ts/1000 as Timestamp)) as day,
    hour(cast(ts/1000 as Timestamp)) as hour,
    month(cast(ts/1000 as Timestamp)) as month,
    year(cast(ts/1000 as Timestamp)) as year
    from logs
'''
)


DataFrame[]

In [83]:
# confirming if the table is craeated successfully. It will show you both persistent tables and temp views

spark.sql("SHOW TABLES").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default| logs_table|      false|
|  default|songs_table|      false|
|         |       logs|       true|
|         |      songs|       true|
+---------+-----------+-----------+



In [84]:
# querying the persistent table

spark.sql(
    '''
select*
from logs_table

'''
).show(5)

+---------------+-----------+--------------+----------+--------------------+---+----+-----+----+
|           song| artistName|      userName|userGender|           timestamp|day|hour|month|year|
+---------------+-----------+--------------+----------+--------------------+---+----+-----+----+
|  Sehr kosmisch|   Harmonia|    Ryan Smith|      Male|2018-11-15 00:30:...| 15|   0|   11|2018|
|The Big Gundown|The Prodigy|    Ryan Smith|      Male|2018-11-15 00:41:...| 15|   0|   11|2018|
|       Marry Me|      Train|    Ryan Smith|      Male|2018-11-15 00:45:...| 15|   0|   11|2018|
|           NULL|       NULL|   Wyatt Scott|      Male|2018-11-15 01:57:...| 15|   1|   11|2018|
|           NULL|       NULL|Austin Rosales|      Male|2018-11-15 03:29:...| 15|   3|   11|2018|
+---------------+-----------+--------------+----------+--------------------+---+----+-----+----+
only showing top 5 rows



In [85]:
# verifying number of logs created

spark.sql(
    '''
select
count(*) as total_logs_created
from logs_table

'''
).show()

+------------------+
|total_logs_created|
+------------------+
|              8056|
+------------------+



In [86]:
# reading the table and writing to s3

(
   spark.table("logs_table") # picking up the source table to write
     .write
     .mode("overwrite")   # this mode replaces existing data. 'Apend' mode is used to add more data to existing data
     .format("parquet")   # Use "csv", "json", etc. for other formats. Parquet is recommended for datalakes
     .option("path", s3_output_logs_table) # specifiying s3 destination and outcome table name
     .save() # executing the write operation
)

In [105]:
# joining both tables by using inner join (unique key = song_title)

spark.sql(
    '''
  CREATE TABLE combined_processed_dataset
  AS
  select
  b.song_title,
  b.song_id,
  b.duration,
  a.artistName,
  a.userName,
  a.timestamp,
  a.hour,
  a.day,
  a.month,
  a.year
  
  from
  logs_table a
  inner join songs_table b
  on a.song =b.song_title 
    
'''
)

DataFrame[]

In [None]:
# spark.sql("DROP TABLE IF EXISTS your-table-name PURGE")

DataFrame[]

In [106]:
# confirming if the table is craeated successfully. It will show you both persistent tables and temp views

spark.sql("SHOW TABLES").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|combined_processe...|      false|
|  default|          logs_table|      false|
|  default|         songs_table|      false|
|         |                logs|       true|
|         |               songs|       true|
+---------+--------------------+-----------+



In [107]:
# querying the persistent table

spark.sql(
    '''
select*
from combined_processed_dataset

'''
).show(20)

+--------------+------------------+---------+-----------------+-------------+--------------------+----+---+-----+----+
|    song_title|           song_id| duration|       artistName|     userName|           timestamp|hour|day|month|year|
+--------------+------------------+---------+-----------------+-------------+--------------------+----+---+-----+----+
|Setanta matins|SOZCTXZ12AB0182364|269.58322|            Elena|    Lily Koch|2018-11-21 21:56:...|  21| 21|   11|2018|
|         Intro|SOGDBUF12A8C140FAA| 75.67628|        Percubaba|  Sylvie Cruz|2018-11-14 05:06:...|   5| 14|   11|2018|
|         Intro|SOGDBUF12A8C140FAA| 75.67628|Calvin Richardson|Layla Griffin|2018-11-19 09:14:...|   9| 19|   11|2018|
|         Intro|SOGDBUF12A8C140FAA| 75.67628|      Samy Deluxe| Tegan Levine|2018-11-27 22:35:...|  22| 27|   11|2018|
+--------------+------------------+---------+-----------------+-------------+--------------------+----+---+-----+----+



In [108]:
# verifying number of records

spark.sql(
    '''
select
count(*) as total_records
from combined_processed_dataset

'''
).show()

+-------------+
|total_records|
+-------------+
|            4|
+-------------+



In [112]:
# reading the table and writing to s3

(
   spark.table("combined_processed_dataset") # picking up the source table to write
     .write
     .mode("overwrite")   # this mode replaces existing data. 'Apend' mode is used to add more data to existing data
     .format("parquet")   # Use "csv", "json", etc. for other formats. Parquet is recommended for datalakes
     .option("path", s3_output_combined_table) # specifiying s3 destination and outcome table name
     .save() # executing the write operation
)

In [113]:
spark.stop()
print ('spark session is stopped sucessfully')

spark session is stopped sucessfully
