# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
import logging
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import date,timedelta,datetime
from zoneinfo import ZoneInfo
from pyspark.sql.types import *
from pyspark.sql.functions import *
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
log= glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: d397699c-65fa-4bb7-9dcf-06e0f67795d0
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session d397699c-65fa-4bb7-9dcf-06e0f67795d0 to get into ready status...
Session d397699c-65fa-4bb7-9dcf-06e0f67795d0 ha

#### Define JSON Schema

In [3]:

schema = StructType(
 [
     StructField('raw_json',StringType())
 ]
)




#### Ad Hoc Create Dataframe from Raw S3 to Silver Table, parses directory name to figure out Date

In [26]:
#Executions are only Ad-Hoc , if all raw S3 data needs to be loaded to the silver table

def create_dataframe_historic():
    current_date = date.today()-timedelta(days=1)
    directory_path = f's3://spotify-s3-poc/2025*'
    df = spark.read.option("recursiveFileLookup", "true").schema(schema).text(directory_path)
    df = df.select(
        get_json_object(col("raw_json"), "$.name").alias("Name"),
        get_json_object(col("raw_json"), "$.followers.total").alias("Followers"))
    df = df.withColumn('Date',(input_file_name()[21:10]))
    df.select('date').distinct().show();
    return df





#### Create Dataframe with today's data 

In [4]:
def create_dataframe_todays_data():
    try:
        tz = ZoneInfo("America/Costa_Rica")
        current_date = datetime.now(tz).date()
        directory_path = f"s3://spotify-s3-poc/{current_date}"
        log.info(f'Reading from: {directory_path}')
        df = spark.read.option("recursiveFileLookup", "true").schema(schema).text(directory_path)
        df = df.select(
            get_json_object(col("raw_json"), "$.name").alias("Name"),
            get_json_object(col("raw_json"), "$.followers.total").alias("Followers"))
        df = df.withColumn('Date',lit(current_date))
        log.info(f'Dataframe record count: {df.count()}')
        return df
    except Exception as e:
        log.error(f'Following error encountered: {e}')
        





#### Write DataFrame as Parquet to Silver table, overwrite todays partition to avoid data duplication

In [None]:
def write_silver(df):
    try:
        spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
        (df
         .write
         .partitionBy('Date')
         .mode("append")
         .format('parquet')
         .option("path",'s3://spotify-s3-poc/silver/silver_glue_table')
         .saveAsTable("silver_glue_table")
        )
        logging.info('Silver table sucessfully populated with todays data')
    except Exception as e:
        logging.error(f'Unexpected error when writting to the silver table: {e}')
        
df = create_dataframe_todays_data();
write_silver(df)