# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
import boto3
import time
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::114652167878:role/AWSGlueAndS3RoleGrupo2
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 87ba3f3b-6912-456e-8eb6-f4a27f0495f7
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 87ba3f3b

In [2]:
REFINED_DATABASE = "pr2-grupo3-rodaan-refined-db"
S3_REFINED_BUCKET = "pr2-grupo3-rodaan-refined-layer"

CATALOG_TABLES = {"countries":"dim_countries","users":"dim_users","facts":"facts_user","horoscopes":"dim_horoscopes"}

S3_CURATED_COUNTRIES = "s3://pr2-grupo3-rodaan-curated-layer/data/dim_countries/"
S3_REFINED_HOROSCOPES = "s3://pr2-grupo3-rodaan-refined-layer/data/dim_horoscopes/"

S3_REFINED_COUNTRIES = "s3://pr2-grupo3-rodaan-refined-layer/data/dim_countries/"
S3_CURATED_HOROSCOPES = "s3://pr2-grupo3-rodaan-curated-layer/data/dim_horoscopes/"

S3_CURATED_USERS = "s3://pr2-grupo3-rodaan-curated-layer/data/dim_users/"
S3_REFINED_USERS = "s3://pr2-grupo3-rodaan-refined-layer/data/dim_users/"

S3_CURATED_FACTS = "s3://pr2-grupo3-rodaan-curated-layer/data/facts_user/"
S3_REFINED_FACTS = "s3://pr2-grupo3-rodaan-refined-layer/data/facts_user/"

PATH_LOG = "pr2-grupo3-rodaan-refined-layer/log/"

inf_VARIABLE_LECTURA_S3_COUNTRIES = " LOG INF: Reading countries data file to dataframe"
inf_VARIABLE_LECTURA_S3_USERS = " LOG INF: Reading users data file to dataframe"
inf_VARIABLE_LECTURA_S3_FACTS = " LOG INF: Reading facts data file to dataframe"
inf_VARIABLE_LECTURA_S3_HOROSCOPES = " LOG INF: Reading horoscopes data file to dataframe"




inf_VARIABLE_WRITING_DF_TO_S3 = " LOG INF: Writing dataframe to s3"
inf_VARIABLE_WRITING_DF_TO_S3_OK = " LOG INF: Writing dataframe to s3 finished OK"


inf_VARIABLE_TRANSFORMING_S3_HOROSCOPES = " LOG INF: Starting horoscopes transformation"

inf_VARIABLE_RENAME_COLUMNS_HOROSCOPES = " LOG INF: horoscopes renaming horoscope column to normalize data"

inf_VARIABLE_TRANSFORMING_S3_USERS = " LOG INF: Starting users transformation"

inf_VARIABLE_RENAME_COLUMNS_USERS = " LOG INF: users renaming id column to normalize data"
inf_VARIABLE_ADDING_HOROSCOPES_JOIN_FIELDS = " LOG INF: users adding fields day register and month register to join with horoscopes"



inf_VARIABLE_TRANSFORM_OK_HOROSCOPES = " LOG INF: horoscopes transform finished OK"

inf_VARIABLE_TRANSFORM_OK_USERS = " LOG INF: users cleaning transform OK"



inf_VARIABLE_LOAD_FACTS = " LOG INF: facts loading to refined stage"
inf_VARIABLE_LOAD_OK_FACTS = " LOG INF: facts loading finished OK"

inf_VARIABLE_LECTURA_S3_OK = " LOG INF: extracting sources finished ok"

inf_VARIABLE_ADDING_HOROSCOPES_USERS = " LOG INF: Adding horoscopes to users"
inf_VARIABLE_ADDING_CONTINENT_USERS = " LOG INF: Adding continent to users"
inf_VARIABLE_LOAD_USERS = " LOG INF: users loading to refined stage"
inf_VARIABLE_LOAD_OK_USERS = " LOG INF: users loading finished OK"

inf_VARIABLE_LOAD_COUNTRIES = " LOG INF: countries loading to refined stage"
inf_VARIABLE_LOAD_OK_COUNTRIES = " LOG INF: countries loading finished OK"

inf_VARIABLE_LOAD_HOROSCOPES = " LOG INF: horoscopes loading to refined stage"
inf_VARIABLE_LOAD_OK_HOROSCOPES = " LOG INF: horoscopes loading finished OK"

inf_VARIABLE_WRITING_DF_TO_S3 = " LOG INF: Starting loading to refined stage"
inf_VARIABLE_WRITING_DF_TO_S3_OK = " LOG INF: loading to refined stage finished OK"

inf_VARIABLE_CLEANING_DIRECTORY_PATH = " LOG INF: Cleaning the directory to write the new files"

DIRECTORY_PATH_COUNTRIES = "data/dim_countries"
DIRECTORY_PATH_USERS = "data/dim_users"
DIRECTORY_PATH_HOROSCOPES = "data/dim_horoscopes"
DIRECTORY_PATH_FACTS = "data/facts_user"





### Funcion para obtener la timestamp en formato iso

In [3]:
def get_timestamp():
    try:
        timestamp = time.time()
        fecha_hora_iso = time.strftime('%Y-%m-%dT%H:%M:%S%z', time.localtime(timestamp))
        return fecha_hora_iso
    except ValueError:
        return ValueError




### Funcion para vaciar una ruta de s3

In [4]:
def vaciar_carpeta_s3(bucket, carpeta):
    """
    Vacía una carpeta de un bucket de Amazon S3.

    Parámetros:
    - bucket: el nombre del bucket de S3
    - carpeta: el nombre de la carpeta que se desea vaciar (debe estar en el formato "carpeta/subcarpeta/")

    """

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket)

    for obj in bucket.objects.filter(Prefix=carpeta):
        obj.delete()




### Funcion para escribir en ficheros de log en s3

In [5]:
def write_log_in_s3(path, texto, timestamp):
    path = path + timestamp + ".txt"
    prefix = "(" + str(get_timestamp()) + ")"
    texto = prefix + texto + '\n'
    s3 = boto3.client('s3')
    bucket_name, object_key = path.split('/', 1)
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=object_key)
        contenido_actual = obj['Body'].read().decode('utf-8')
        contenido_nuevo = contenido_actual + texto
        s3.put_object(Bucket=bucket_name, Key=object_key, Body=contenido_nuevo)
    except s3.exceptions.NoSuchKey:
        s3.put_object(Bucket=bucket_name, Key=object_key, Body=texto)
    else:
        s3.put_object(Bucket=bucket_name, Key=object_key, Body=contenido_nuevo)




### Funcion para guardar un df en s3

In [6]:
def write_df_to_s3(log_timestamp, directory_path, s3_path, glueContext, spark, catalog_database, catalog_table, df_to_write, partition_keys=[]):
    try:
        write_log_in_s3(PATH_LOG, inf_VARIABLE_WRITING_DF_TO_S3, log_timestamp)
        s3output = glueContext.getSink(
          path=s3_path,
          connection_type="s3",
          updateBehavior="UPDATE_IN_DATABASE",
          partitionKeys= partition_keys,
          compression="snappy",
          enableUpdateCatalog=True,
          transformation_ctx="s3output",
        )
        s3output.setCatalogInfo(
          catalogDatabase= catalog_database , catalogTableName= catalog_table
        )
        dynamic_frame = glueContext.create_dynamic_frame_from_rdd(df_to_write.rdd, "dynamic_frame")
        s3output.setFormat("glueparquet")
        write_log_in_s3(PATH_LOG, inf_VARIABLE_CLEANING_DIRECTORY_PATH, log_timestamp)
        vaciar_carpeta_s3(S3_REFINED_BUCKET, directory_path)
        s3output.writeFrame(dynamic_frame)
        write_log_in_s3(PATH_LOG, inf_VARIABLE_WRITING_DF_TO_S3_OK, log_timestamp)
    except Exception as e:
        error = "LOG: ERROR: " + str(e)
        write_log_in_s3(PATH_LOG, error, log_timestamp)
        raise e 
    




### Funcion para leer un fichero en s3 a un df

In [7]:
def read_s3_csv_to_df(log_timestamp, spark, s3_path, format = 'csv', header = 'true', delimiter = '\t'):
    try:
        df_to_return = (
            spark.read
            .format(format)
            .option("header", header)
            .option("delimiter",delimiter)
            .load(s3_path) 
        )
        return df_to_return
    except Exception as e:
        error = "LOG: ERROR: " + str(e)
        write_log_in_s3(PATH_LOG, error, log_timestamp)
        raise e 




### Extraer de la capa curated la informacion

In [8]:
def extract_data(log_timestamp, spark):
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LECTURA_S3_USERS, log_timestamp)
    df_dim_users = read_s3_csv_to_df(log_timestamp, spark, S3_CURATED_USERS, 'parquet')
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LECTURA_S3_HOROSCOPES, log_timestamp)
    df_dim_horoscopes = read_s3_csv_to_df(log_timestamp, spark, S3_CURATED_HOROSCOPES, 'parquet')
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LECTURA_S3_FACTS, log_timestamp)
    df_dim_facts = read_s3_csv_to_df(log_timestamp, spark, S3_CURATED_FACTS, 'parquet')
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LECTURA_S3_COUNTRIES, log_timestamp)
    df_dim_contries = read_s3_csv_to_df(log_timestamp, spark, S3_CURATED_COUNTRIES, 'parquet')
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LECTURA_S3_OK, log_timestamp)
    
    return df_dim_users, df_dim_horoscopes, df_dim_facts, df_dim_contries
    




### Generar ficheros con usuarios con sus repectivos horoscopos

In [9]:
def day(date):
    date_str = str(date)
    date_split = date_str.split("-")
    day = int(date_split[2])
    return day




In [19]:
def transform_users(df_dim_users, df_dim_horoscopes, df_dim_contries, log_timestamp):
    write_log_in_s3(PATH_LOG,inf_VARIABLE_TRANSFORMING_S3_USERS, log_timestamp)
    try:
        write_log_in_s3(PATH_LOG,inf_VARIABLE_RENAME_COLUMNS_USERS, log_timestamp)
        df_dim_users_rename_id = (
            df_dim_users
            .withColumnRenamed("#id","user_id")
        )
        write_log_in_s3(PATH_LOG,inf_VARIABLE_ADDING_HOROSCOPES_JOIN_FIELDS, log_timestamp)
        dayUDF = udf(lambda date: day(date))

        df_dim_users_month_day_register = (
        df_dim_users_rename_id
            .withColumn("register_month", month("register_date"))
            .withColumn("register_day", dayUDF("register_date"))    
        )
        write_log_in_s3(PATH_LOG,inf_VARIABLE_ADDING_HOROSCOPES_USERS, log_timestamp)
        df_users_join_horoscopes = (
            df_dim_users_month_day_register
            .join(df_dim_horoscopes, (df_dim_users_month_day_register.register_day == df_dim_horoscopes.Day) & (df_dim_users_month_day_register.register_month == df_dim_horoscopes.Month),"left")
            .drop("Date","Day","Month")
            .withColumnRenamed("Horocope","horoscope")  
        )
        write_log_in_s3(PATH_LOG,inf_VARIABLE_ADDING_CONTINENT_USERS, log_timestamp)
        df_dim_users_refined = (
            df_users_join_horoscopes
            .join(df_dim_countries, df_users_join_horoscopes.country_name == df_dim_countries.country, "left")
            .drop(df_dim_countries.country)
            .drop("nombre","continente","pais")
        )
        write_log_in_s3(PATH_LOG,inf_VARIABLE_TRANSFORM_OK_USERS, log_timestamp)
    except Exception as e:
        write_log_in_s3(PATH_LOG, str(e), log_timestamp)
        raise e 
    return df_dim_users_refined
    




### Generar horoscopos en refined

In [11]:
def transform_horoscopes(df_dim_horoscopes, log_timestamp):
    write_log_in_s3(PATH_LOG,inf_VARIABLE_TRANSFORMING_S3_HOROSCOPES, log_timestamp)
    try:
        write_log_in_s3(PATH_LOG,inf_VARIABLE_RENAME_COLUMNS_HOROSCOPES, log_timestamp)
        df_dim_horoscopes_refined = df_dim_horoscopes.withColumnRenamed("Horoscopes","horoscopes")
        write_log_in_s3(PATH_LOG,inf_VARIABLE_TRANSFORM_OK_HOROSCOPES, log_timestamp)
    except Exception as e:
        write_log_in_s3(PATH_LOG, str(e), log_timestamp)
        raise e 
    return df_dim_horoscopes_refined
    




### Load data in refined layer

In [12]:
def load_refined_data(log_timestamp, glueContext, spark, df_dim_users_refined, df_dim_horoscopes_refined, df_dim_countries_refined, df_dim_facts_refined):
    write_log_in_s3(PATH_LOG,inf_VARIABLE_WRITING_DF_TO_S3, log_timestamp)
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_COUNTRIES, log_timestamp)
    write_df_to_s3(log_timestamp, DIRECTORY_PATH_COUNTRIES, S3_REFINED_COUNTRIES, glueContext, spark, REFINED_DATABASE, CATALOG_TABLES["countries"], df_dim_countries_refined)
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_OK_COUNTRIES, log_timestamp)
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_HOROSCOPES, log_timestamp)
    write_df_to_s3(log_timestamp, DIRECTORY_PATH_HOROSCOPES,S3_REFINED_HOROSCOPES, glueContext, spark, REFINED_DATABASE, CATALOG_TABLES["horoscopes"], df_dim_horoscopes_refined)
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_OK_HOROSCOPES, log_timestamp)
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_USERS, log_timestamp)
    write_df_to_s3(log_timestamp, DIRECTORY_PATH_USERS,S3_REFINED_USERS, glueContext, spark, REFINED_DATABASE, CATALOG_TABLES["users"], df_dim_users_refined)
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_OK_USERS, log_timestamp)
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_FACTS, log_timestamp)
    write_df_to_s3(log_timestamp, DIRECTORY_PATH_FACTS,S3_REFINED_FACTS, glueContext, spark, REFINED_DATABASE, CATALOG_TABLES["facts"], df_dim_facts_refined, partition_keys=["user_id"])
    write_log_in_s3(PATH_LOG,inf_VARIABLE_LOAD_OK_FACTS, log_timestamp)
    
    write_log_in_s3(PATH_LOG,inf_VARIABLE_WRITING_DF_TO_S3_OK, log_timestamp)
    
    
    
    




#### MAIN

In [21]:
timestamp = str(get_timestamp())

df_dim_users, df_dim_horoscopes, df_dim_facts, df_dim_countries = extract_data(timestamp, spark)

df_dim_users_refined = transform_users(df_dim_users, df_dim_horoscopes, df_dim_countries, timestamp)
df_dim_horoscopes_refined = transform_horoscopes(df_dim_horoscopes, timestamp)

load_refined_data(timestamp, glueContext, spark, df_dim_users_refined, df_dim_horoscopes_refined, df_dim_countries, df_dim_facts)




+-----------+------+----+------------------+------------+--------------------+-------------+--------------+------------+---------+
|    user_id|gender| age|           country|  registered|        country_name|register_date|register_month|register_day|Horoscope|
+-----------+------+----+------------------+------------+--------------------+-------------+--------------+------------+---------+
|user_000001|     m|null|             Japan|Aug 13, 2006|               Japan|   2006-08-13|             8|          13|      Leo|
|user_000002|     f|null|              Peru|Feb 24, 2006|                Peru|   2006-02-24|             2|          24|   Pisces|
|user_000003|     m|  22|     United States|Oct 30, 2005|United States of ...|   2005-10-30|            10|          30|  Scorpio|
|user_000004|     f|null|              null|Apr 26, 2006|                null|   2006-04-26|             4|          26|   Taurus|
|user_000005|     m|null|          Bulgaria|Jun 29, 2006|            Bulgaria|   20