# ETL com AWS Glue e PySpark
Neste projeto, é feito um processo de ETL utilizando o AWS Glue e PySpark.
É coletado um arquivo armazenado em um bucket S3, onde este será transformado a fim de ser tratado, utilizando o Glue e PySpark, e, depois, armazenado em um outro bucket S3.

## Criando o ambiente (sessões e variáveis de ambiente)
É necessário criar a sessão do Spark e do Glue para o funcionamento.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from dotenv import load_dotenv
import os

#cria a sessão do spark
sc = SparkContext.getOrCreate()
#cria a sessão do glue associada ao spark
glueContext = GlueContext(sc)
#armazena a sessão na variável 'spark'
spark = glueContext.spark_session
#armazena o job do glue na variável 'job'
job = Job(glueContext)

In [None]:
#armazena as variáveis de ambiente presentes no arquivo .env
load_dotenv()

In [None]:
#armazena as variáveis de ambiente em variáveis locais
database_name = os.environ.get('DATABASE_NAME')
table_name = os.environ.get('TABLE_NAME')
transformation_ctx = os.environ.get('TRANSFORMATION_CTX')
path_destination = os.environ.get('PATH_S3_DESTINATION')

## Trabalhando com os dados

In [None]:
#cria um DynamicFrame puxando os dados do catálogo, que foi povoado através de um Crawler
university_ranking_source = glueContext.create_dynamic_frame.from_catalog(
    database = database_name,
    table_name = table_name,
    transformation_ctx=transformation_ctx
)

In [None]:
#mostra que o DynamicFrame, criado a partir do catálogo do Crawler, está em JSON
university_ranking_source.show(5)

In [None]:
#para facilitar, esse DynamicFrame é convertido em um DataFrame e armazenado na variável 'rankings_df'
rankings_df = university_ranking_source.toDF()

In [None]:
#mostra que está no formato DataFrame
rankings_df.show(5)

In [None]:
#cria uma view chamada 'university_ranking', para facilitar o trabalho de transformação
rankings_df.createOrReplaceTempView("university_ranking")

In [None]:
#realiza a transformação dos dados utilizando o SQL do Spark e armazena o resultado limpo no DataFrame
#chamado 'clean_rankings_df'
clean_rankings_df = spark.sql("""SELECT 
    university,
    coalesce(int(year),9999) as year,
    rank_display,
    coalesce(int(split(rank_display,'-')[0]),9999) as n_rank,
    coalesce(float(score),-1) as score,
    country, city, region, type,
    research_output,
    coalesce(float(student_faculty_ratio),-1) as student_faculty_ratio,
    coalesce(int(regexp_replace(international_students,'[.,]','')),-1) as international_students,
    size,
    coalesce(int(regexp_replace(faculty_count,'[.,]','')),-1) as faculty_count
    FROM university_ranking""")

In [None]:
#retorna os dados limpos para DynamicFrame, utilizando a variável 'clean_dynamic_frame'
clean_dynamic_frame = DynamicFrame.fromDF(clean_rankings_df, glueContext, "university_ranking_clean")

## Armazenando os dados após tratamento e finalizando

In [None]:
#carrega o DynamicFrame limpo para um bucket S3, em formato .csv
university_ranking_destn = glueContext.write_dynamic_frame.from_options(
    frame=clean_dynamic_frame,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": path_destination,
        "partitionKeys": [],
    },
    transformation_ctx="university_ranking_dest",
)

In [None]:
#dá o commit no job do Glue, para que ele armazene que estes dados já foram processados (bookmark)
job.commit()