# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: bd6079aa-abef-4308-a6c1-675c7b1ba5bf
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session bd6079aa-abef-4308-a6c1-675c7b1ba5bf to get into ready status...
Session bd6079aa-abef-4308-a6c1-675c7b1ba5bf ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [3]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='universidad-proyecto-t1', table_name='ofertas')
dyf.printSchema()

root
|-- _id: string
|-- codigo_curso: string
|-- nombre_curso: string
|-- creditos: int
|-- seccion: int
|-- periodo: string
|-- total_cupos: int
|-- nro_inscritos: int
|-- horarios: array
|    |-- element: struct
|    |    |-- salon: string
|    |    |-- dia: int
|    |    |-- hora_inicio: int
|    |    |-- hora_fin: int
|    |    |-- _id: string
|-- __v: int


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [5]:
df = dyf.toDF()
df.show()

+--------------------+------------+--------------------+--------+-------+-------+-----------+-------------+--------------------+---+
|                 _id|codigo_curso|        nombre_curso|creditos|seccion|periodo|total_cupos|nro_inscritos|            horarios|__v|
+--------------------+------------+--------------------+--------+-------+-------+-----------+-------------+--------------------+---+
|66fda77d4e43a48ad...|      CS2031|desarrollo basado...|       4|      1| 2024-1|         30|            0|[{AUDITORIO, 2, 1...|  0|
|66fda7dc4e43a48ad...|      CC1103|      álgebra lineal|       2|      1| 2024-1|         45|            0|[{VIRTUAL, 1, 7, ...|  0|
|66fda7ed4e43a48ad...|      CC1103|      álgebra lineal|       2|      2| 2024-1|         45|            0|[{VIRTUAL, 1, 7, ...|  0|
|66fda8ac4e43a48ad...|      HH1102|laboratorio de co...|       3|      1| 2020-1|         45|            0|[{A907, 1, 7, 8, ...|  0|
+--------------------+------------+--------------------+--------+----

#### Relationalize nested jsons!

In [7]:
collection = dyf.relationalize('root',"s3://proyecto-parcial-out/temp/")
collection.keys()

dict_keys(['root', 'root_horarios'])


In [8]:
collection.select("root").toDF().show()
collection.select("root_horarios").toDF().show()

+--------------------+------------+--------------------+--------+-------+-------+-----------+-------------+--------+---+
|                 _id|codigo_curso|        nombre_curso|creditos|seccion|periodo|total_cupos|nro_inscritos|horarios|__v|
+--------------------+------------+--------------------+--------+-------+-------+-----------+-------------+--------+---+
|66fda77d4e43a48ad...|      CS2031|desarrollo basado...|       4|      1| 2024-1|         30|            0|       1|  0|
|66fda7dc4e43a48ad...|      CC1103|      álgebra lineal|       2|      1| 2024-1|         45|            0|       2|  0|
|66fda7ed4e43a48ad...|      CC1103|      álgebra lineal|       2|      2| 2024-1|         45|            0|       3|  0|
|66fda8ac4e43a48ad...|      HH1102|laboratorio de co...|       3|      1| 2020-1|         45|            0|       4|  0|
+--------------------+------------+--------------------+--------+-------+-------+-----------+-------------+--------+---+

+---+-----+------------------+-

#### Change those horarios column names...

In [10]:
dyf_root = collection.select("root")
dyf_root_horarios = collection.select("root_horarios")
dyf_root_horarios.printSchema()

root
|-- id: long
|-- index: int
|-- horarios.val.salon: string
|-- horarios.val.dia: int
|-- horarios.val.hora_inicio: int
|-- horarios.val.hora_fin: int
|-- horarios.val._id: string


In [17]:
dyf_root_horarios.toDF().show()

+---+-----+------------------+----------------+------------------------+---------------------+--------------------+
| id|index|horarios.val.salon|horarios.val.dia|horarios.val.hora_inicio|horarios.val.hora_fin|    horarios.val._id|
+---+-----+------------------+----------------+------------------------+---------------------+--------------------+
|  1|    0|         AUDITORIO|               2|                      14|                   16|66fda77d4e43a48ad...|
|  1|    1|              M301|               2|                       7|                    9|66fda77d4e43a48ad...|
|  1|    2|              M603|               5|                       7|                    9|66fda77d4e43a48ad...|
|  2|    0|           VIRTUAL|               1|                       7|                    8|66fda7dc4e43a48ad...|
|  2|    1|              M301|               2|                       7|                    9|66fda7dc4e43a48ad...|
|  3|    0|           VIRTUAL|               1|                       7|

transformed_dyf_root_horarios = dyf_root_horarios.apply_mapping([
    ("id", "long", "id", "long"),
    ("index", "int", "index", "int"),
    ("salon", "string", "salon", "string"),
    ("val.dia", "int", "dia", "int"),
    ("horarios.val.hora_inicio", "int", "hora_inicio", "int"),
    ("horarios.val.hora_fin", "int", "hora_fin", "int"),
    ("horarios.val._id", "string", "_id", "string")
])
transformed_dyf_root_horarios.toDF().show()

#### Join dataframes

In [25]:
dyf_ofertas = dyf_root.join(paths1 = ["horarios"], paths2 = ["id"], frame2 = dyf_root_horarios)

+-------------+------------------------+--------+-------+----------------+---+---------------------+-------+------------------+-----+--------+------------+-----------+--------------------+--------------------+--------------------+---+
|nro_inscritos|horarios.val.hora_inicio|horarios|seccion|horarios.val.dia|__v|horarios.val.hora_fin|periodo|horarios.val.salon|index|creditos|codigo_curso|total_cupos|                 _id|    horarios.val._id|        nombre_curso| id|
+-------------+------------------------+--------+-------+----------------+---+---------------------+-------+------------------+-----+--------+------------+-----------+--------------------+--------------------+--------------------+---+
|            0|                      14|       1|      1|               2|  0|                   16| 2024-1|         AUDITORIO|    0|       4|      CS2031|         30|66fda77d4e43a48ad...|66fda77d4e43a48ad...|desarrollo basado...|  1|
|            0|                       7|       1|      1|   

In [28]:
dyf_ofertas.toDF().show()

+-------------+------------------------+--------+-------+----------------+---+---------------------+-------+------------------+-----+--------+------------+-----------+--------------------+--------------------+--------------------+---+
|nro_inscritos|horarios.val.hora_inicio|horarios|seccion|horarios.val.dia|__v|horarios.val.hora_fin|periodo|horarios.val.salon|index|creditos|codigo_curso|total_cupos|                 _id|    horarios.val._id|        nombre_curso| id|
+-------------+------------------------+--------+-------+----------------+---+---------------------+-------+------------------+-----+--------+------------+-----------+--------------------+--------------------+--------------------+---+
|            0|                      14|       1|      1|               2|  0|                   16| 2024-1|         AUDITORIO|    0|       4|      CS2031|         30|66fda77d4e43a48ad...|66fda77d4e43a48ad...|desarrollo basado...|  1|
|            0|                       7|       1|      1|   

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [32]:
df_ofertas = dyf_ofertas.toDF()
df_ofertas = df_ofertas.coalesce(1)  # This will create a single output file

df_ofertas.write \
    .mode('overwrite') \
    .option("header", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("quoteAll", "true") \
    .option("delimiter", ",") \
    .csv("s3://proyecto-parcial-out/ofertas-etl-out/")




In [34]:
glueContext.write_dynamic_frame.from_options(
    frame=dyf_ofertas,
    connection_type="s3",
    connection_options={
        "path": "s3://proyecto-parcial-out/ofertas-etl-out/",
        "partitionKeys": []
    },
    format="csv",
    format_options={
        "separator": ",",
        "quoteChar": "\"",
        "escaper": "\"",
        "writeHeader": True
    }
)

<awsglue.dynamicframe.DynamicFrame object at 0x7fa8172adf30>


glueContext.write_dynamic_frame.from_options(
    frame=dyf_ofertas,
    connection_type="s3",
    connection_options={"path": "s3://proyecto-parcial-out/ofertas-etl-out/"},
    format="csv",
    format_options ={
        "quoteChar":-1,
    },
)

glueContext.write_dynamic_frame.from_catalog(
    frame=transformed_dyf_root_horarios,
    database="universidad-proyecto-t1",
    table_name="ofertas-transformed",
    connection_type="s3",
    connection_options={"path": "s3://proyecto-parcial-out/ofertas-etl-out/"},
    format="csv"
)

s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)