## Spark with Datalake 
In this project the notebook has been set up using an EMR cluster. To work direclty within AWS.

In [31]:
import configparser
import os

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

## Set up the session with spark

In [32]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Set the credentials
Remeber to have them stored in dl.cfg

In [41]:
config = configparser.ConfigParser()
config.read_file(open('./dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']


## Unzip the data
We can see that the data is zipped within this notebook. Therefore we need to extract it, so that we can access it. 

In [42]:
input_data = "./data/"
output_data = "./output/"

In [43]:
song_data_zip = input_data + 'song-data.zip'
log_data_zip = input_data + 'log-data.zip'

song_data = input_data + 'song_data/*/*/*/'
log_data = input_data + 'log_data'
# song_data = input_data + "song_data/*/*/*/"


In [44]:
# import zipfile as zf

# files = zf.ZipFile(song_data_zip,'r')
# files.extractall()
# files.close()

# files = zf.ZipFile(log_data_zip,'r')
# files.extractall()
# files.close()

In [45]:
song_df = spark.read.json(song_data)


Now we have the data living in the correct folder within the right structure in the data file. (I copied it over since it got extracted within this path)

In [46]:
songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()

In [47]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



### Now we want to create the artist table

In [48]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).distinct()
artists_table.show(5)

+------------------+---------------+---------------+---------------+----------------+
|         artist_id|    artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+---------------+---------------+---------------+----------------+
|AR3JMC51187B9AE49D|Backstreet Boys|    Orlando, FL|       28.53823|       -81.37739|
|AR0IAWL1187B9A96D0|   Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARWB3G61187FB49404|    Steve Morse| Hamilton, Ohio|           null|            null|
|AR47JEX1187B995D81|   SUE THOMPSON|     Nevada, MO|       37.83721|       -94.35868|
|ARHHO3O1187B989413|      Bob Azzam|               |           null|            null|
+------------------+---------------+---------------+---------------+----------------+
only showing top 5 rows



Now we have seen the song data. We can continue to the log data. 

## Log data

In [49]:
log_data = input_data + "log_data/"

In [50]:
log_df = spark.read.json(log_data)
log_df.count()

8056

In [51]:
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

Now we want to filter again the data on Next Song data. 

In [52]:
log_df = log_df.where('page="NextSong"')
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

## User table

In [53]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



## Time table

In [54]:
log_df = log_df.withColumn('timestamp',( (log_df.ts.cast('float')/1000).cast("timestamp")) )
log_df.select('timestamp').show(5, False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:29:39.712|
|2018-11-15 00:40:35.072|
|2018-11-15 00:44:57.216|
|2018-11-15 03:44:05.12 |
|2018-11-15 05:48:36.224|
+-----------------------+
only showing top 5 rows



In [55]:
time_table = log_df.select(
                    F.col("timestamp").alias("start_time"),
                    F.hour("timestamp").alias('hour'),
                    F.dayofmonth("timestamp").alias('day'),
                    F.weekofyear("timestamp").alias('week'),
                    F.month("timestamp").alias('month'), 
                    F.year("timestamp").alias('year'), 
                    F.date_format(F.col("timestamp"), "E").alias("weekday")
                )

time_table.show(2, False)


+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-15 00:29:39.712|0   |15 |46  |11   |2018|Thu    |
|2018-11-15 00:40:35.072|0   |15 |46  |11   |2018|Thu    |
+-----------------------+----+---+----+-----+----+-------+
only showing top 2 rows



## Songplay data

In [56]:
song_df = spark.read.json(input_data+'song_data/*/*/*/*.json')


In [57]:
song_log_joined = log_df.join(
        song_df, 
        (log_df.song == song_df.title) 
        & (log_df.artist == song_df.artist_name) 
        & (log_df.length == song_df.duration), 
        how='inner'
    )


In [58]:
songplays_table = song_log_joined.distinct() \
                    .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
                    .withColumn("songplay_id", F.row_number().over( Window.partitionBy("timestamp").orderBy("timestamp"))) \
                    .withColumnRenamed("userId","user_id")        \
                    .withColumnRenamed("timestamp","start_time")  \
                    .withColumnRenamed("sessionId","session_id")  \
                    .withColumnRenamed("userAgent", "user_agent") \
                    

songplays_table.show(5)

+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|user_id|          start_time|           song_id|         artist_id|level|session_id|            location|          user_agent|songplay_id|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|     15|2018-11-21 21:56:...|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| paid|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          1|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+



## Store this as parquet files.

In [59]:
try:
    songs_table.write.parquet(output_data + 'songs/' + 'artists.parquet')
    artists_table.write.parquet(output_data + 'artists/' + 'artists.parquet')
    users_table.write.parquet(output_data + 'users/' + 'artists.parquet')
    time_table.write.parquet(output_data + 'time/' + 'artists.parquet')
    songplays_table.write.parquet(output_data + 'songplays/' + 'artists.parquet')
except Exception as e:
    print(e)
    
    

'path file:/home/workspace/output/songs/artists.parquet already exists.;'


## AWS 
Now we want to load these files to the AWS S3 bucket. 

In [75]:
import boto3

bucket_root='janneman-udacity-sparkify-data-lake'
s3 = boto3.resource('s3', region_name='us-west-2')

In [62]:
os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [65]:
# os.getenv("AWS_ACCESS_KEY_ID")

In [67]:
client = boto3.client(
's3',
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"]
)

In [74]:
# client.create_bucket(Bucket="mybucket", ....) 


In [76]:
client.create_bucket(ACL='private',Bucket=bucket_root,
                             CreateBucketConfiguration={'LocationConstraint':'us-west-2'})

{'ResponseMetadata': {'RequestId': 'R9MJ44E7ZWXEYSNF',
  'HostId': 'txp0ZwBY7mq5b3WbEQTdXe4SZTGJPaqyoxqykSytLbqUy+H6s5dB9lYIwhtDNXxKLBInXBCXkvk=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'txp0ZwBY7mq5b3WbEQTdXe4SZTGJPaqyoxqykSytLbqUy+H6s5dB9lYIwhtDNXxKLBInXBCXkvk=',
   'x-amz-request-id': 'R9MJ44E7ZWXEYSNF',
   'date': 'Sat, 24 Sep 2022 06:03:36 GMT',
   'location': 'http://janneman-udacity-sparkify-data-lake.s3.amazonaws.com/',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': 'http://janneman-udacity-sparkify-data-lake.s3.amazonaws.com/'}

### Now we want to create the folders in the bucket. 

In [78]:
# bucket = s3.Bucket(bucket_root)
folders_name = ['songs','songplays','time','artists','users']

# Create folders for the tables
for f in folders_name:
    fold_name = f'{f}/'
    try: 
        client.put_object(Bucket=bucket_root ,Key=fold_name)
    except Exception as e:
        raise e
print('Folders created')


Folders created


We check it that worked fine:

In [83]:
bucket_root

'janneman-udacity-sparkify-data-lake'

In [84]:
objects = client.list_objects(Bucket=bucket_root)

for object in objects['Contents']:
    print(object['Key'])

artists/
songplays/
songs/
time/
users/


### Check
Tada we see we created the folders via code instead of clicking around in the AWS UI. Very cool. 

In [87]:
songplays_table

DataFrame[user_id: string, start_time: timestamp, song_id: string, artist_id: string, level: string, session_id: bigint, location: string, user_agent: string, songplay_id: int]

In [88]:
output_data

's3a://janneman-udacity-sparkify-data-lake/'

## Write data as parquet to AWS S3

In [89]:
output_data  = f"{bucket_root}" 

songplays_table.write.parquet(output_data + 'songplays/' + 'songplays.parquet')
songs_table.write.parquet(output_data + 'songs/' + 'songs.parquet')
artists_table.write.parquet(output_data + 'artists/' + 'artists.parquet')
users_table.write.parquet(output_data + 'users/' + 'users.parquet')
time_table.write.parquet(output_data + 'time/' + 'time.parquet')

## Finished Spark + AWS
1. Loaded data with spark 
2. Transoformed it
3. Did IaS (use code to create infrastructure)
4. Loaded it into S3 as parquet. 