In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as SqlFuncs
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::940173853583:role/AWSGluestudio-datalake
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 1cf47394-31d5-4d06-86ce-9caa465c9c16
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 1cf47394-31d5-4d06-86ce-9caa465c9c16 to get into ready status...
Session 1cf47394-31d5-4d06-86ce-9caa465c9c16 has been created.



In [2]:
liq= (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg-data-elhoy",
        table_name="liquidacion",
        transformation_ctx="liq",
    )
)

nota= (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg-data-elhoy",
        table_name="nota",
        transformation_ctx="nota",
    )
)

dt= (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg-data-elhoy",
        table_name="detalle_nota",
        transformation_ctx="dt",
    )
)



gn3api = (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg_datalake_db_prod",
        table_name="globalnet_gn3_api_3",
        transformation_ctx="gn3api",
    )
)



u = (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg_datalake_db_prod",
        table_name="globalnet_unidad",
        transformation_ctx="u",
    )
)

dire = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3://globalgas-datalake/datasets/raw/raw/direccion/Direcciones.csv"
        ],
        "recurse": True,
    },
    transformation_ctx="dire",
)




In [3]:
inv= (
    glueContext.create_dynamic_frame.from_catalog(
        database="gg-data-elhoy",
        table_name="inventario_gas",
        transformation_ctx="inv",
    )
)




In [4]:
obj_ef = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3://globalgas-datalake/cleaning_stepts/2022_2023/Presupuesto_eficiencia/Eficiencia_22_23.csv"
        ],
        "recurse": True,
    },
    transformation_ctx="obj_ef",
)




In [5]:
def generate_series(start, stop, interval):
    """
    :param start  - lower bound, inclusive
    :param stop   - upper bound, exclusive
    :interval int - increment interval in seconds
    """
    spark = SparkSession.builder.getOrCreate()
    # Determine start and stops in epoch seconds
    start, stop = spark.createDataFrame(
        [(start, stop)], ("start", "stop")
    ).select(
        [col(c).cast("timestamp").cast("long") for c in ("start", "stop")
    ]).first()
    # Create range with increments and cast to timestamp
    return spark.range(start, stop, interval).select(
        col("id").cast("timestamp").alias("value")
    )




In [6]:
starting_date  = '2022-01-01'




In [7]:
liq00 = liq.toDF()
n00 = nota.toDF()
liq0 = liq00.filter(liq00.eliminado_logico == "NO")
n0 = n00.filter(n00.eliminado_logico == "NO") 
d0 = dt.toDF()
u0 = u.toDF()
dire1 = dire.toDF().select('Direccion','Entidad_Legal', 'planta_datalake')





In [8]:
obef1 = obj_ef.toDF()




In [9]:
api = gn3api.toDF()




In [10]:
inv1 = inv.toDF()
inv2 = inv1.filter(inv1.eliminado_logico == "NO").select('fecha_hora', 'planta', 'diferencia')
inv3 =  inv2.withColumn('fecha1', to_date(col("fecha_hora")) ).drop('fecha_hora').withColumn('planta_datalake1', concat(lit('Planta'),inv2.planta )).drop('planta')
inv3 = inv3.filter(inv3.fecha1 >= starting_date)




In [11]:
inv3 = inv3.groupBy('planta_datalake1', 'fecha1').sum('diferencia').withColumnRenamed('sum(diferencia)', 'diferencia')




In [13]:
api1 = api.withColumn('date1', to_date(col("date")))
api2 = api1.groupBy('planta_datalake', 'date1').pivot("unitType").sum('totalQuantity').withColumnRenamed('sum(totalQuantity)', 'totalquantity').withColumnRenamed('date1', 'date')
api2 = api2.withColumnRenamed("PORTATIL", "PORTATIL_e").withColumnRenamed("ESTACION", "ESTACION_e").withColumnRenamed("ESTACIONARIO", "ESTACIONARIO_e").withColumnRenamed("MAYORISTA_E", "MAYORISTA_E_e").withColumnRenamed("ANDEN", "ANDEN_e").withColumnRenamed("MAYORISTA_P", "MAYORISTA_P_e").fillna(0)
api2 = api2.withColumn('totalquantity',  api2.ANDEN_e +api2.ESTACION_e+api2.ESTACIONARIO_e + api2.MAYORISTA_E_e + api2.MAYORISTA_P_e+ api2.PORTATIL_e)




In [14]:
api2.columns

['planta_datalake', 'date', 'ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e', 'totalquantity']


In [15]:
n1 = n0.withColumnRenamed('id_planta', 'id_planta1').withColumnRenamed('id_liquidacion', 'id_liquidacion1').withColumnRenamed('planta', 'planta1')




In [16]:
#join liq-nota
ln = liq0.join(n1, (liq0.id_liquidacion == n1.id_liquidacion1) & (liq0.id_planta == n1.id_planta1), "fullouter" )
ln1 = ln.select("id_planta", "id_liquidacion", "fecha_liquidacion","id_nota", "id_unidad", 'planta')




In [17]:
d1 = d0.withColumnRenamed('id_planta', 'id_planta1').withColumnRenamed('id_nota', 'id_nota1').withColumnRenamed('planta', 'planta1')




In [18]:
##join liq-nota-detallenota
ld = ln1.join(d1, (ln1.id_nota == d1.id_nota1) & (ln1.id_planta == d1.id_planta1), "fullouter" )
ld1 = ld.select("id_planta",'planta', "id_liquidacion", "fecha_liquidacion","id_nota", "id_unidad", "kilos")




In [19]:
u1 = u0.withColumnRenamed('id_unidad', 'id_unidad1').withColumnRenamed('id_planta', 'id_planta1').withColumnRenamed('planta', 'planta1')




In [20]:
##Join liq-nota-detallenota-unidad
lu = ld1.join(u1, (ld1.id_unidad == u1.id_unidad1) & (ld1.id_planta == u1.id_planta1), "fullouter")
lu1 = lu.select("id_planta", "id_liquidacion", 'planta',"fecha_liquidacion","id_nota", "id_unidad", "kilos", "tipo_unidad")
### agrupacion lu




In [21]:
lu2 = lu1.fillna(0)
lu3 = lu1.groupBy("id_planta",'planta', "id_liquidacion", "fecha_liquidacion", "id_unidad","tipo_unidad").sum("kilos").withColumnRenamed("sum(kilos)", "kilos")




In [22]:
lu4 = lu3.withColumn("fecha_liquidacion_day",to_date(col("fecha_liquidacion")))




In [23]:
lu5 = lu4.groupBy('fecha_liquidacion_day',  'planta').sum('kilos')




In [24]:
lu6 = lu5.withColumn('planta_datalake', concat(lit('Planta'), lu5.planta )  ).withColumnRenamed('sum(kilos)', 'kilos' ).drop('planta')




### Generar la serie de tiempo para todos los registros

In [25]:
df = generate_series("2022-01-01", "2023-04-17", 60 * 60 * 24)
df1 = df.withColumn("current_date",current_date())
l1 = df1.select("current_date").collect()
df2 = generate_series("2017-01-01", l1[0].current_date, 60 * 60 * 24)
df3 = df2.withColumn("fecha", to_date(col("value")))
df4 = df3.drop("value")
liq3 = liq.toDF()
a = liq3.select("planta").distinct().orderBy("planta").collect()




In [26]:
df5 = df4.withColumn("planta", lit(a[1].planta))




In [27]:
df5 = df4.withColumn("planta", lit(a[0].planta))
for i in range(len(a)):
    df6 = df4.withColumn("planta", lit(a[i].planta))
    df5 = df5.union(df6)

df7 =  df5.distinct()
df8 = df7.withColumnRenamed("planta","planta1")




In [28]:
df9 = df8.withColumn('planta_datalake1', concat(lit('Planta'), df8.planta1 )  )




In [29]:
df10 = df9.filter(df9.planta1.isNotNull())
df11 = df10.join(lu6, ((df10.planta_datalake1 ==  lu6.planta_datalake) & (df10.fecha == lu6.fecha_liquidacion_day)), "fullouter")
df11 = df11.select(coalesce("planta_datalake1", "planta_datalake"),  coalesce("fecha_liquidacion_day", "fecha"), 'kilos').withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake1').withColumnRenamed('coalesce(fecha_liquidacion_day, fecha)', 'fecha' )
#lu6.columns




In [30]:
df12 = df11.join(api2, ((df11.planta_datalake1 ==  api2.planta_datalake) & (df11.fecha == api2.date)), "fullouter")
df12 = df12.select(coalesce('planta_datalake1','planta_datalake'), coalesce('fecha','date'), 'kilos', 'totalQuantity','ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e',).withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake1').withColumnRenamed('coalesce(fecha, date)', 'date')




In [31]:
df14 = df12.filter(df12.planta_datalake1.isNotNull()).filter(df12.date >= starting_date)




In [37]:
df15 = df14.join(dire1, ((df14.planta_datalake1 ==  dire1.planta_datalake)), "fullouter")
df15 = df15.select('date', 'kilos', 'totalQuantity', 'ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e','Direccion', 'Entidad_Legal', coalesce('planta_datalake1','planta_datalake')).withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake')




In [38]:
df16 = df15.join(inv3, (df15.planta_datalake == inv3.planta_datalake1) & (df15.date == inv3.fecha1), 'outer')




### Select features

In [39]:
df17 = df16.select(coalesce("date", "fecha1"), 'kilos', 'ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e','totalQuantity', 'Direccion', 'Entidad_Legal', coalesce('planta_datalake1','planta_datalake')).withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake').withColumnRenamed('coalesce(date, fecha1)', 'fecha')




In [40]:
df18 = df17.filter(df17.planta_datalake.isNotNull()).filter(df17.fecha >= starting_date)




In [41]:
df19 = df18.join(inv3, ((df18.planta_datalake == inv3.planta_datalake1) & (df18.fecha == inv3.fecha1) ), 'outer')




In [46]:
df20 = df19.select('kilos','ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e', 'totalQuantity', 'Direccion', 'Entidad_Legal','diferencia',coalesce("fecha", "fecha1"), coalesce('planta_datalake1', 'planta_datalake')).withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake1').withColumnRenamed('coalesce(fecha, fecha1)', 'fecha')




### Add year_month

In [47]:
lc8 = df20.withColumn("Mes", df20.fecha.cast("string").substr(6,2))
lc99 = lc8.withColumn("Year", lc8.fecha.cast("string").substr(3,2))
lc9 = lc99.filter(lc99.Year.cast("int")> 21)
#lc10 = lc9.withColumn("Month", monthUDF(col("Mes")))
lc10 = lc9.withColumn("Month", when(lc9.Mes == "01", "Jan").otherwise(when(lc9.Mes == "02", "Feb").otherwise(when(lc9.Mes == "03", "Mar").otherwise(when(lc9.Mes == "04", "Apr").otherwise(when(lc9.Mes == "05", "May").otherwise(when(lc9.Mes == "06", "Jun").otherwise(when(lc9.Mes == "07", "Jul").otherwise(when(lc9.Mes == "08", "Aug").otherwise(when(lc9.Mes == "09", "Sep").otherwise(when(lc9.Mes == "10", "Oct").otherwise(when(lc9.Mes == "11", "Nov").otherwise(when(lc9.Mes == "12", "Dic").otherwise("0")))))))))))))
lc11= lc10.withColumn("year-month", concat(lc10.Month, lit("-"), lc10.Year))




In [48]:
df21 = lc11.filter(lc11.fecha >= starting_date).filter(lc11.planta_datalake1.isNotNull()).drop('Mes', 'Year', 'Month')




### Obj eficiencia

In [49]:
unpivot_ef = "stack(12, 'Jan', Enero, 'Feb', Febrero, 'Mar', Marzo, 'Apr', Abril, 'May', Mayo, 'Jun', Junio, 'Jul', Julio, 'Aug', Agosto, 'Sep', Septiembre, 'Oct', Octubre, 'Nov', Noviembre, 'Dec', Diciembre) as (Mes,proyeccion_ef)"
obef22 = obef1.select("planta_datalake",'Year' ,expr(unpivot_ef))
obef33 =  obef22.withColumn("month_year2", concat(obef22.Mes, lit('-'),obef22.Year.cast("string").substr(3,2)))
#obef3 = obef33.withColumn("proy_ef", when(length(obef33.proyeccion_ef.cast("string"))==  5, obef33.proyeccion_ef.cast("string").substr(1,8).cast("double")).otherwise(obef33.proyeccion_ef.cast("string").substr(1,5).cast("double"))).drop("proyeccion_ef").withColumnRenamed("proy_ef","proyeccion_ef").drop('Year')
obef3 = obef33.withColumn("proy_ef", obef33.proyeccion_ef.cast("string").substr(1,8).cast("double")).drop("proyeccion_ef").withColumnRenamed("proy_ef","proyeccion_ef").drop('Year')





In [50]:
df22 = df21.join(obef3, ((df21.planta_datalake1 ==  obef3.planta_datalake) & (df21['year-month'] == obef3.month_year2)), "left")




In [51]:
df23 =df22.select('kilos', 'ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e','totalQuantity', 'Direccion', 'Entidad_Legal','diferencia','proyeccion_ef', 'fecha',coalesce("year-month", "month_year2"), coalesce('planta_datalake1', 'planta_datalake')).withColumnRenamed('coalesce(planta_datalake1, planta_datalake)', 'planta_datalake').withColumnRenamed('coalesce(year-month, month_year2)', 'year-month')




In [52]:
df23.columns

['kilos', 'ANDEN_e', 'ESTACION_e', 'ESTACIONARIO_e', 'MAYORISTA_E_e', 'MAYORISTA_P_e', 'PORTATIL_e', 'totalQuantity', 'Direccion', 'Entidad_Legal', 'diferencia', 'proyeccion_ef', 'fecha', 'year-month', 'planta_datalake']


In [53]:
df23.write.mode("overwrite").format("parquet").save("s3://globalgas-datalake/datasets/raw/raw/eficiencia_diaria/")


