# Plantilla Notebook para desarrollo a medida del proyecto EYP0007


#### Uso
Notebook generado para realizar tranformaciones asociadas al unit conversion


#### Nota

In [0]:
val applicationName:String = ""
val uuid:String = "N/A"
val puid:String = "N/A"
val uniqueKey: String = ""
val linkedService: String = ""
val digitalCase: String = ""

val sources:String = "[{}]"
val target:String = "{}"

val name: String = ""

val auxParams: String = "{}"
val params: String = "{}"
val sparkProperties: String = """{}"""
val referenceDate: String = ""

In [0]:
// For tests

/*
var applicationname: String = "TEST"
var uuid:String = "N/A"
var puid:String = "N/A"
var step: Integer = 0
var uniqueKey: String = ""
var linkedService: String = "DL_EYP"
var digitalCase: String = "EYP0007"

val sources:String = """[
         {
          "source_linked_service_name": "DL_EYP",
          "source_container": "lakehouse",
          "source_path": "EYP0007/trn/eyp0007_dl_fac_m_str_ngl",
          "source_format": "parquet",
          "source_format_options": {}
        },
        {
          "source_linked_service_name": "DL_EYP",
          "source_container": "lakehouse",
          "source_path": "EYP0007/trn/eyp0007_dl_dts_d_class_attr_presentation",
          "source_format": "parquet",
          "source_format_options": {}
        },
        {
          "source_linked_service_name": "DL_EYP",
          "source_container": "lakehouse",
          "source_path": "EYP0007/trn/eyp0007_dl_dts_d_uom_setup",
          "source_format": "parquet",
          "source_format_options": {}
        },
        {
          "source_linked_service_name": "DL_EYP",
          "source_container": "lakehouse",
          "source_path": "EYP0007/trn/eyp0007_dl_dts_d_unit_conversion",
          "source_format": "parquet",
          "source_format_options": {}
        }
        ]"""
val target:String = """{
        "target_linked_service_name": "DL_EYP",
        "target_container": "lakehouse",
        "target_path": "EYP0007/trn/eyp0007_dl_fac_m_fld_plan_budg_uc",
        "target_format": "parquet"
      }"""

val auxParams:String = """{
            "class_name": "STRM_GAS_MTH",
            "fields_fixed" : ["fec_production_day", "id_stream", "fec_create_date", "fec_update_date"],
            "fields_transposed" : ["alloc_vol", "calc_net_vol", "grs_vol", "o_alloc_liq_vol", "p_alloc_liq_vol"]
        }""" 

val name: String = ""
*/

In [0]:
%run eyp0007_nb_gen_gen_functions

In [0]:
val sourcesFinal: List[Map[String, Any]] = mapper.readValue[List[Map[String, Any]]](sources)
val targetFinal: Map[String, Any] = mapper.readValue[Map[String, Any]](target)
val auxParamsFinal: Map[String, Any] = mapper.readValue[Map[String, Any]](auxParams)
val paramsFinal: Map[String, Any] = mapper.readValue[Map[String, Any]](if (params.trim.isEmpty) "{}" else params)
val sparkPropertiesFinal: Map[String, String] = mapper.readValue[Map[String, String]](if (sparkProperties.trim.isEmpty) "{}" else sparkProperties)

sparkPropertiesFinal.foreach(item => {
    println(s"Setting spark property ${item._1} to ${item._2}")
    spark.conf.set(item._1, item._2)
})

var error:String = ""

In [0]:
import com.repsol.datalake.log._
import com.repsol.datalake.control._

implicit val uid_app = (uuid, puid, applicationName)

if (linkedService.trim.nonEmpty) {
    ControlHelper().setCustomLinkedService(linkedService)
}

if (digitalCase.trim.nonEmpty) {
    ControlHelper().setCustomDigitalCase(digitalCase)
}

println(s"applicationName = $applicationName")
println(s"uuid = $uuid")
println(s"puid = $puid")
println(s"uniqueKey = $uniqueKey")
println(s"sources = $sources")
println(s"target = $target")
println(s"auxParams = $auxParams")
println(s"params = $params")
println(s"name = $name")

In [0]:
var error: String = ""

var df: DataFrame  = null

println("*"*25 + "\nSTART UNIT CONVERSION PROCESS\n" + "*"*25)

try {


    val dfFact = getDataFromSource(sourcesFinal.apply(0)).withColumnRenamed("id_bu", "id_bu_fac")
                                                            .withColumn("id_bu_fac_aux", when(col("id_bu_fac") === "ARG", "1")
                                                                                        .when(col("id_bu_fac") === "BRA", "2")
                                                                                        .when(col("id_bu_fac") === "RNAS", "3")
                                                                                        .otherwise("4"))

    val className = auxParamsFinal.getOrElse("class_name", "ERROR class_name").toString
    val dfClassAttrPresentation = getDataFromSource(sourcesFinal.apply(1)).filter(s"class_name = '$className'").withColumnRenamed("id_bu", "id_bu_class_attr")
   /* if (dfClassAttrPresentation.count == 0) {throw new Exception(s"$name: No se ha encontrado ninguno registro para el class name: $className")}*/


    val dfUomSetup = getDataFromSource(sourcesFinal.apply(2)).withColumnRenamed("id_bu", "id_bu_uom")
                                                                .withColumn("id_bu_uom_aux", when(col("id_bu_uom") === "ARG", "1")
                                                                                            .when(col("id_bu_uom") === "BRA", "2")
                                                                                            .when(col("id_bu_uom") === "RNAS", "3")
                                                                                            .otherwise("4"))
                                                            .select("db_unit", "measurement_type", "id_bu_uom_aux", "unit").distinct 
    val dfUomSetupFull = getDataFromSource(sourcesFinal.apply(2)).withColumnRenamed("id_bu", "id_bu_uom")

    val dfUnitConversion = getDataFromSource(sourcesFinal.apply(3)).withColumnRenamed("id_bu", "id_bu_unit_conversion")

  
    val countReads = dfFact.count()
    println(s"Process init with: $countReads rows read base")

    val fieldsFixed: Seq[String] = auxParamsFinal.getOrElse("fields_fixed", List()).asInstanceOf[List[String]] ++ List("id_bu_fac", "id_bu_fac_aux")
    val fieldsTransposed: Seq[String] = auxParamsFinal.getOrElse("fields_transposed", List()).asInstanceOf[List[String]]


    df = toTransposeDataframe(dfFact, fieldsFixed, fieldsTransposed)

    df = joinDataframes(df, dfClassAttrPresentation, Map("attribute" -> "attribute_name", "id_bu_fac" -> "id_bu_class_attr"), Seq("uom"), "left")

    //Get Db unit
    df = joinDataframes(df, dfUomSetup.filter("db_unit = 'Y'"), Map("uom" -> "measurement_type", "id_bu_fac_aux" -> "id_bu_uom_aux"), Seq("unit"), "left")
    df = df.withColumnRenamed("unit", "db_unit_aux")

    //Get View Units
    val dfUomSetupWithAttr = joinDataframes(dfClassAttrPresentation, dfUomSetupFull, Map("uom" -> "measurement_type"), Seq("db_unit", "view_unit", "unit"), "inner")
                            .filter("db_unit == 'Y' OR view_unit == 'Y'")
                            .select("class_name", "attribute_name", "unit").distinct
                            .withColumnRenamed("unit", "unit_aux")

    df = joinDataframes(df, dfUomSetupWithAttr, Map("attribute" -> "attribute_name"), Seq("unit_aux"), "inner")
    df = df.withColumnRenamed("db_unit_aux", "db_unit").withColumnRenamed("unit_aux", "unit")
    

    df = joinDataframes(df, dfUnitConversion.select("from_unit","to_unit", "mult_fact","add_numb").distinct, Map("db_unit" -> "from_unit",  "unit" -> "to_unit"), Seq("mult_fact", "add_numb"), "left")
    df = df.withColumn("mult_fact", when(col("db_unit").equalTo(col("unit"))  && col("mult_fact").isNull, lit(1)).otherwise(col("mult_fact")))
                                                .withColumn("add_numb", when(col("db_unit").equalTo(col("unit"))  && col("add_numb").isNull, lit(0)).otherwise(col("add_numb")))
    df = df.withColumn("value_final", (col("value") * col("mult_fact")) + col("add_numb"))

    df = toPivotDataframe(df,fieldsFixed)

    writeDataFrame(df, targetFinal)

    val countWrites = df.count()
    println(s"Process complete with: $countWrites rows writes")

    if (countReads != countWrites) {println(s"WARNING: Base rows read are different to rows write.")}
   
} catch {
    case e: Throwable => 
                error = e.toString
                println(error)
                LogHelper().logEndKO(error)
}

println("*"*25 + "\nEND UNIT CONVERSION PROCESS\n" + "*"*25)

In [0]:

val result = s"""{"error": "$error"}"""

if (!error.isEmpty) {
    LogHelper().logError(s"$name: $error")
    throw new Exception(s"$name: $error")
} 

LogHelper().logInfo(s"$name: $result")
mssparkutils.notebook.exit(result)  