# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys,json
import pyspark
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext,SparkConf
from awsglue.context import GlueContext
from pyspark.sql import Window,SparkSession
from pyspark.sql.types import StringType,StructType,StructField,ArrayType
from pyspark.sql.functions import arrays_zip, explode,col,collect_list,when,expr,row_number,concat_ws,array_distinct,lit,trim,when,array_contains,size,arrays_zip,explode,col,collect_list,when,expr,row_number,concat_ws,array_distinct,lit,trim,array_contains,from_json,to_json,monotonically_increasing_id,length,desc, substring
from awsglue import DynamicFrame
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 0b64fe94-9a50-4dfc-9368-f9eead66141a
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 0b64fe94-9a50-4dfc-9368-f9eead66141a to get into ready status...
Session 0b64fe94-9a50-4dfc-9368-f9eead66141a ha

In [2]:
source_config = {
    'type': 's3',
    's3_path': 's3://laive/',
    'source': '0mat_plant_attr'
}




In [7]:
import boto3, json
from awsglue import DynamicFrame
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import col, row_number, col, row_number

def get_dyf_from_s3(glueContext, source_config, format='csv'):
    cnn_options = {
        "paths": [source_config['s3_path'] + source_config['source'] + "/"], 
        "recurse": True,
        "compression": "gzip"
    }
    if ('groupFiles' in source_config):
        cnn_options['groupFiles'] = source_config['groupFiles']
    if ('groupSize' in source_config):
        cnn_options['groupSize'] = source_config['groupSize']
    if ('useS3ListImplementation' in source_config):
        cnn_options['useS3ListImplementation'] = source_config['useS3ListImplementation']
    return glueContext.create_dynamic_frame.from_options( \
            connection_type="s3", \
            connection_options=cnn_options, \
            format=format, \
            format_options={"withHeader": True})

def read_json_from_s3(bucket_name, path):
    s3 = boto3.resource('s3')
    content_object = s3.Object(bucket_name, path)
    file_content = content_object.get()['Body'].read().decode('utf-8')
    return json.loads(file_content)


def transform(glueContext, source_config, config, source_inclussions = None):
    dyf_map = {}
    for el_name, el_config in config.items():
        if el_name in source_inclussions:
            source = el_config["source"]
            destination = el_config["destination"]
            mappings = el_config["mappings"]
#           autoincrement_column = el_config["autoincrement_column"]

            # Read CSV from the source path
            if source_config['type'] == 's3':
                dyf = get_dyf_from_s3(glueContext, source_config)

            # Apply column renaming and data type transformations
            dyf_schema = dyf.schema()
            dyf_t = dyf.apply_mapping(list(map(lambda t: (t[0], dyf_schema.getField(t[0]).dataType.typeName(), t[2], t[3]) if t[1] == "Dynamic" else tuple(t), mappings)))
#            if autoincrement_column:
#                autoincrement_order_by = el_config["autoincrement_order_by"]
#                window_spec_id = Window.orderBy(autoincrement_order_by)
#                df_t = dyf_t.toDF().withColumn(autoincrement_column, row_number().over(window_spec_id))\
#                       .withColumn(autoincrement_column, col(autoincrement_column))
#                
#               dyf_t = DynamicFrame.fromDF(df_t, glueContext, "dyf_t")            
            dyf_map[el_name] = {'table': destination, 'data': dyf_t}
    return dyf_map




In [5]:
#Probar la funcion
dynamic_frame = get_dyf_from_s3(glueContext, source_config)
dynamic_frame.show(1)

{"MATNR": "000000000040007660", "WERKS": "2000", "PSTAT": "DELBV", "LVORM": "", "BWTTY": "", "XCHAR": "", "MMSTA": "", "MMSTD": "null", "MAABC": "", "KZKRI": "", "EKGRP": "G02", "AUSME": "", "DISPR": "", "DISMM": "PD", "DISPO": "C02", "KZDIE": "", "PLIFZ": "0", "WEBAZ": "0", "PERKZ": "M", "AUSSS": "0.0", "DISLS": "EX", "BESKZ": "F", "SOBSL": "", "SBDKZ": "2", "LAGPR": "", "ALTSL": "", "KZAUS": "", "AUSDT": "null", "NFMAT": "", "KZBED": "T", "MISKZ": "", "FHORI": "", "PFREI": "", "FFREI": "", "RGEKZ": "", "FEVOR": "", "BEARZ": "0.0", "RUEZT": "0.0", "TRANZ": "0.0", "DZEIT": "0", "MAXLZ": "0", "LZEIH": "", "KZPRO": "", "GPMKZ": "", "UEETO": "0.0", "UEETK": "", "UNETO": "0.0", "WZEIT": "0", "ATPKZ": "", "VZUSL": "0.0", "HERBL": "", "INSMK": "", "SPROZ": "0.0", "QUAZT": "0", "SSQSS": "", "MPDAU": "0", "KZPPV": "", "KZDKZ": "", "WSTGH": "0", "PRFRQ": "0", "NKMPR": "null", "LADGR": "Z001", "XCHPF": "", "USEQU": "", "LGRAD": "0.0", "AUFTL": "", "PLVAR": "", "OTYPE": "", "OBJID": "00000000", "

In [11]:
config = read_json_from_s3("aws-glue-assets-637423333664-us-east-2", "scripts/stage/etl-config.json")
source_inclussions = ['0mat_plant_attr']
dyf_roots = transform(glueContext, source_config, config, source_inclussions)




In [12]:
dyf_roots.keys()

dict_keys(['0mat_plant_attr'])


In [13]:
dyf_roots['0mat_plant_attr']['data'].printSchema()

root
|-- Número de material: string
|-- Centro: string
|-- Status de actualización: string
|-- Marcar para borrado material a nivel de centro: string
|-- Tipo de valoración: string
|-- Indicador para la gestión de lotes (interno): string
|-- Status material específico centro: string
|-- Fecha inicio validez de estado material específico centro: string
|-- Indicador ABC: string
|-- Indicador: Parte crítica: string
|-- Grupo de compras: string
|-- Unidad de medida de salida: string
|-- Material: Perfil de planificación de necesidades: string
|-- Característica de planificación de necesidades: string
|-- Planificador de necesidades: string
|-- Ind: Planificador necesidades encargado compras (no activ): string
|-- Plazo de entrega previsto en días: string
|-- Tiempo de tratamiento para la entrada de mercancía en días: string
|-- Indicador de período: string
|-- Rechazo a nivel de conjunto en porcentaje: string
|-- Cálculo de tamaño de lote en planificación de necesidades: string
|-- Clase 