## Importar librerias utiles

In [2]:
import org.apache.spark.sql.functions._
import sqlContext.implicits._

## Seleccion de tablas de SINFO

El orígen de datos de Plan Uno es principalmente tablas de sinfo y también alguna tabla "core". Estas tablas están persistidas en HDFS en las rutas correspondientes: "/data/master/sfma/" para sinfo y "/data/master/kdo/" para core. Todas las tablas están persistidas en formato "parquet". 

Para leer una tabla en formato parquet usaremos el contexto de SparkSQL: 
>sqlContext.read.parquet("PATH_HDFS"). 

Definimos la variable pathData par reutilizarla a la hora de leer diferentes tablas.

In [None]:
val pathData = "/data/master/sfma/"
val payrolls = sqlContext.read.parquet(pathData + "t_sfma_payroll_m")

val generalParty = sqlContext.read.parquet(pathData + "t_sfma_general_party_m")

Su equivalente en SAS para la lectura de tablas:

proc sql;  
create table payrolls as  
select *  
from sfmapv.vsfmapoa;  
quit;

proc sql;  
create table generalParty as  
select *  
from sfmapv.vsfmaiii(de todos los periodos);  
quit;

## Queries típicas

Para hacer transformaciones sobre las tablas, utilizaremos en este caso la api de dataframes en scala (para la versión de spark 1.6.2 :
https://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame


Seleccionar columnas:

In [None]:
val nominas = payrolls.
select("customer_id","total_credited_1m_amount","closing_date")

PROC SQL;  
create table nominas as  
select cod_persona, imp_apuzm, fec_cierre  
from payrolls;  
quit;

Eliminar columnas: 


Nota: En la versión 1.6.2 no es posible utilizar la sentencia drop para más de una columna, por lo que tendremos que utilizar un drop por cada columna a eliminar. Sin embargo, en la versión de Spark 2.1 (próxima versión de la plataforma) se podran eliminar más de una columna por sentencia.

In [None]:
val nominasSinMes = nominas.
drop("closing_date")

data nominasSinMes;  
set nominas(drop=fec_cierre);  
run;

Filtrar registros de una tabla según una condición:

Filtro de clientes titulares de algun contrato por mes desde el mes de enero

In [None]:
val intervininentesDesdeEnero = generalParty.
select("customer_id","closing_date").
where("closing_date >= '2017-01-31' AND party_type_id = 'TIT'")

PROC SQL;  
create table intervininentesEnero as  
select cod_persona, fec_cierre  
from generalParty  
where cod_mes >= 1 and cod_ano eq 2017 and cod_interv='TIT';  
quit;

Quitar los duplicados:

Eliminar duplicados ya que aparecera un registro por cada contrato que tuviese el cliente

In [None]:
val intervininentesEneroSinDup = intervininentesEnero.
dropDuplicates(Seq("customer_id","closing_date"))

proc sort data=intervininentesEneroSinDup out=intervininentesEnero nodupkey;  
by cod_persona, fec_cierre;  
run;

Operaciones de agregación:

Sumar cantidades por cliente y mes

In [None]:
val nominaMensualPorCliente = payrolls.
select("customer_id","total_credited_1m_amount","closing_date").
groupBy("customer_id","closing_date").
agg(sum("total_credited_1m_amount").
as("nomina_mensual"))

proc sql;
create table nominaMensualPorCliente as
select cod_persona, fec_cierre, sum(imp_apuzm) as nomina_mensual 
from payrolls
group by cod_persona, fec_cierre;
quit;

Tambien se pueden sumar poniendo restricciones

In [None]:
val nominaMensualPorClientePositiva = payrolls.
select("customer_id","total_credited_1m_amount","closing_date").
groupBy("customer_id","closing_date").
agg(sum("total_credited_1m_amount").
as("nomina_mensual")).
where("nomina_mensual>0")

proc sql;  
create table nominaMensualPorClientePositiva as  
select cod_persona, fec_cierre, sum(imp_apuzm) as nomina_mensual  
from payrolls  
group by cod_persona, fec_cierre  
having nomina_mensual>0;  
quit;

Cruce de dos tablas:  


Cruce por variables de cod_persona y mes para sacar la nomina mensual de los intervinientes titulares de algun contrato

In [None]:
val nominasTitulares = nominaMensualPorClientePositiva.
join(intervininentesEneroSinDup, Seq("customer_id","closing_date"))

data nominasTitulares;  
merge nominaMensualPorClientePositiva(in=x)  
intervininentesEneroSinDup(in=y);  
by cod_persona fec_cierre;  
if x and y;  
run;

En Spark el join por defecto es inner join, si queremos otro tipo de join: full, left..) es necesario especificarlo. 
En ocasiones cuando se realiza un full join es necesario sustituir los nulos que se generan a un valor específico.

Con na.fill(0) rellenamos con 0s todos los huecos vacios que han quedado de hacer el full join.

In [None]:
val nominasTitulares = nominaMensualPorClientePositiva.
join(intervininentesEneroSinDup, Seq("customer_id","closing_date"),"full").
na.fill(0)

data nominasTitulares(drop=i);  
merge nominaMensualPorClientePositiva(in=x)   
intervininentesEneroSinDup(in=y);  
by cod_persona fec_cierre;  
if x or y;  
ARRAY cero _numeric_;  
DO I=1 TO dim(cero);  
IF cero(i)=. THEN cero(i)=0;  
END;  
run;

Añadir una columna con 1 si la nomina del cliente es mayor que 200€ o 0 si no

In [None]:
val nominasTitulares2 = nominasTitulares.
withColumn("mayor_a_200",when($"nomina_mensual" > 200,1).
           otherwise(0))

data nominasTitulares2;  
set nominasTitulares;  
if nomina_mensual>200 then mayor_a_200=1;  
else mayor_a_200=0;  
run;

Renombrar una columna

In [None]:
val nominasTitulares3 = nominasTitulares2.
withColumnRenamed("mayor_a_200","cumple_nomina_mayor_200")

data nominasTitulares3;  
set nominasTitulares2(rename=(mayor_a_200=cumple_nomina_mayor_200));  
run;

Ordenar un DataFrame de forma ascendente

In [None]:
nominasTitulares3.
orderBy($"nomina_mensual")

proc sort data=nominasTitulares3;  
by nomina_mensual;  
run;

Ordenar un DataFrame de forma descendente

In [None]:
nominasTitulares3.
orderBy($"nomina_mensual".desc)

proc sort data=nominasTitulares3;  
by descending nomina_mensual;  
run;

Es importante saber que en Spark no es necesario ordenar los DataFrame para hacer join. Simplemente nos serviria para quitar duplicados o visualizar los datos

Calcular el número de registros de un dataframe:

In [None]:
nominasTitulares.count()

In [None]:
nominasTitulares.where("closing_date='2017-01-31'").count()

Mostrar registros del dataframe:

In [None]:
nominasTitulares.show()

Ejemplo de mostrar un conteo agrupado y ordenado. Numero de clientes por segmento global en la CGT

In [None]:
sqlContext.read.parquet(pathData + "t_sfma_ownership_h_category_ah_m").
where(closing_date='2017-01-31').
select("global_segment_id","customer_id").
groupBy("global_segment_id").
count().
withColumnRenamed("count()","numero_clientes")
orderBy($"numero_clientes").desc).
show()

Ver la estructura de la tabla, sus columnas y que formato tienen

In [None]:
nominasTitulares.printSchema()

Para hacer operaciones que implican diferentes columnas de una fila se utilizan las User Defined Functions (UDFs):

In [None]:
import org.apache.spark.sql.functions.udf

Un caso típico es generar una columna nueva cuyo valor depende del valor de una o varias columnas: 

Ejemplo: Con varios case para, por ejemplo, añadir una columna con la descripcion del tipo de seguro que equivale a ciertos codigos de producto de la tabla de intervinientes

In [3]:
val comCategoryUdf = udf((com_category_id: Long) => {
    com_category_id match {
        case t if t == 592 => "HOGAR/INCENDIOS"
        case t if t == 593 => "NEGOCIOS"
        case t if t == 594 || t== 595 => "VIDA"
        case t if t == 596 => "SALUD"
        case t if t == 598 => "AUTO"
        case _ => "OTROS"
    }})

In [None]:
val intervinientesSeguros = sqlContext.read.parquet(pathData + "t_sfma_general_party_m").
where("closing_date = '2017-01-31' AND party_type_id='TIT' AND contract_status_id = 'A'").
select("customer_id", "contract_id", "com_category_id")

proc sql;  
create table intervinientesSeguros as  
select cod_persona, cod_idcontra, cod_ctgcom  
from sfmapv.vsfmaiii004 (periodo que equivalga a enero)  
where cod_interv='TIT' and xti_estado='A';  
quit;

Para utilizar la función udf en una query bastará con nombrarla y pasarle como parámetro la/s columnas que utiliza:

In [None]:
val intervinientesSegurosNombre = intervinientesSeguros.
withColumn("tipo_seguro",comCategoryUdf($"com_category_id"))

data intervinientesSegurosNombre;  
set intervinientesSeguros;  
if cod_ctgcom=592 then tipo_seguro = 'HOGAR/INCENDIOS';  
else if cod_ctgcom=593 then tipo_seguro = 'NEGOCIOS';  
else if cod_ctgcom=594 or cod_ctgcom=595 then tipo_seguro = 'VIDA';  
else if cod_ctgcom=596 then tipo_seguro = 'SALUD';  
else if cod_ctgcom=598 then tipo_seguro = 'AUTO';  
else tipo_seguro='OTROS';  
run;

En el caso de que la operación a realizar es cambiar los valores de una columna según un diccionario, como por ejemplo cmabiar el codigo de segmento por su descripción, crearemos un HashMap para definir la relación de cada código con su descripción, a continuación definiremos una udf que haga uso de ese HashMap:
Nota: se puede definir un valor por defecto del HashMap, en este caso será "PAR_ESTÁNDAR"

In [None]:
val globalSegmentDescription = new HashMap[String,String](){override def default(key:String) = "PAR_ESTÁNDAR"}

globalSegmentDescription +=(
    "1" -> "INSTITUCIONES PRIVADAS",
    "2" -> "I.PÚBLICAS SEGURIDAD SOCIAL",
    "3" -> "I.PÚBLICAS ADMON.CENTRAL",
    "4" -> "I.PÚBLICAS CCAA",
    "5" -> "I.PÚB C.LOC – DIP CABILD CONS",
    "6" -> "I.PÚB C.LOC – AYTOS 500k HAB",
    "7" -> "I.PÚB C.LOCALES – AYTOS 20K H",
    "8" -> "I.PÚB C.LOC – AYTOS 5K HAB",
    "9" -> "I.PÚB C.LOC – AYTOS < 5K HAB",
    "11" -> "BPRI_PATRIMONIOS",
    "12" -> "BPRI_BANCA PRIVADA",
    "13" -> "BPER_ALTO VALOR",
    "14" -> "BPER_VALOR RECURSOS",
    "15" -> "BPER_VALOR TENENCIA", 
    "21" -> "PAR_POTENCIAL VALOR",
    "22" -> "PAR_POTENCIAL RESTO",
    "23" -> "PAR_59+",
    "24" -> "PAR_ESTÁNDAR",
    "25" -> "PAR_JÓVENES",
    "31" -> "BEC_BANCA CORPORATIVA",
    "32" -> "RE_FINANCIACIÓN RESIDENCIAL",
    "33" -> "BEC_GRANDES EMPRESAS",
    "34" -> "BEC_EMPRESAS",
    "35" -> "PYM_PEQUEÑAS EMPRESAS",
    "38" -> "RE_FINANCIACIÓN PATRIMONIAL",
    "63" -> "RESTO",
    "80" -> "PYM_AUTÓNOMOS Y PROFS VALOR",
    "81" -> "PYM_AUTÓNOMOS Y PROFS RESTO",
    "82" -> "PYM_COMERCIOS VALOR",
    "83" -> "PYM_COMERCIOS RESTO",
    "84" -> "PYM_AGRARIOS VALOR",
    "85" -> "PYM_AGRARIOS RESTO",
    "86" -> "PYM_INDUSTRIA VALOR",
    "87" -> "PYM_INDUSTRIA RESTO",
    "88" -> "PYM_OTRAS PYMES VALOR",
    "89" -> "PYM_OTRAS PYMES RESTO")
    
val udfglobalSegmentDescription = udf((segmentCode: String)=> {
        globalSegmentDescription(segmentCode)})

También se podría definir una udf con sentencias if-else, sin embargo es más lento de ejecutar dado que tiene que ir recorriendo las diferentes sentencias hasta que se cumple la sentencia correcta:

In [None]:
val globalSegmentDescription = udf((segmentCode: String)=> {
    if (segmentCode == "1") "INSTITUCIONES PRIVADAS"
    else if (segmentCode == "2") "I.PÚBLICAS SEGURIDAD SOCIAL"
    else if (segmentCode == "3") "I.PÚBLICAS ADMON.CENTRAL"
    else if (segmentCode == "4") "I.PÚBLICAS CCAA"
    else if (segmentCode == "5") "I.PÚB C.LOC – DIP CABILD CONS"
    else if (segmentCode == "6") "I.PÚB C.LOC – AYTOS 500k HAB"
    else if (segmentCode == "7") "I.PÚB C.LOCALES – AYTOS 20K H"
    else if (segmentCode == "8") "I.PÚB C.LOC – AYTOS 5K HAB"
    else if (segmentCode == "9") "I.PÚB C.LOC – AYTOS < 5K HAB"
    else if (segmentCode == "11") "BPRI_PATRIMONIOS"
    else if (segmentCode == "12") "BPRI_BANCA PRIVADA"
    else if (segmentCode == "13") "BPER_ALTO VALOR"
    else if (segmentCode == "14") "BPER_VALOR RECURSOS"
    else if (segmentCode == "15") "BPER_VALOR TENENCIA"
    else if (segmentCode == "21") "PAR_POTENCIAL VALOR"
    else if (segmentCode == "22") "PAR_POTENCIAL RESTO"
    else if (segmentCode == "23") "PAR_59+"
    else if (segmentCode == "24") "PAR_ESTÁNDAR"
    else if (segmentCode == "25") "PAR_JÓVENES"
    else if (segmentCode == "31") "BEC_BANCA CORPORATIVA"
    else if (segmentCode == "32") "RE_FINANCIACIÓN RESIDENCIAL"
    else if (segmentCode == "33") "BEC_GRANDES EMPRESAS"
    else if (segmentCode == "34") "BEC_EMPRESAS"
    else if (segmentCode == "35") "PYM_PEQUEÑAS EMPRESAS"
    else if (segmentCode == "38") "RE_FINANCIACIÓN PATRIMONIAL"
    else if (segmentCode == "63") "RESTO"
    else if (segmentCode == "80") "PYM_AUTÓNOMOS Y PROFS VALOR"
    else if (segmentCode == "81") "PYM_AUTÓNOMOS Y PROFS RESTO"
    else if (segmentCode == "82") "PYM_COMERCIOS VALOR"
    else if (segmentCode == "83") "PYM_COMERCIOS RESTO"
    else if (segmentCode == "84") "PYM_AGRARIOS VALOR"
    else if (segmentCode == "85") "PYM_AGRARIOS RESTO"
    else if (segmentCode == "86") "PYM_INDUSTRIA VALOR"
    else if (segmentCode == "87") "PYM_INDUSTRIA RESTO"
    else if (segmentCode == "88") "PYM_OTRAS PYMES VALOR"
    else if (segmentCode == "89") "PYM_OTRAS PYMES RESTO"
    else ""})

Usamos la cgt con datos de enero y seleccionamos el codigo de persona y el codigo de segmento global, para pegarle el significado de ese codigo creamos una nueva columna "global_segment_type" con el Udf globalSegmentDescription y la columna de entrada $"global_segment_id"

In [None]:
val planUnoCustomersBase = sqlContext.read.parquet(pathData + "t_sfma_ownership_h_category_ah_m").
where(closing_date='2017-01-31').
select("global_segment_id","customer_id").
withColumn("global_segment_type", globalSegmentDescription($"global_segment_id"))

Para buscar el maximo entre columnas de la misma fila necesitamos tambien haremos uso de una UDF, que definaremos previamente:

In [None]:
val maxColumnsUdf = udf((x: Double, y: Double, z: Double) => {val a = Array(x,y,z) ; a.reduceLeft(_ max _)})
val tabla = tablaEntrada.
withColumn("maximo", maxColumnsUdf($"columna1",$"columna2",$"columna3"))

data tabla;  
set tablaEntrada;  
maximo=max(columna1,columna2,columna3);  
run;

## Algoritmo para seleccion de los meses

Este algoritmo crea una lista con un numero del 1 a la longitud que queramos darle (las veces que se repita el bucle, en este ejemplo 6) que equivale al ultimo dia de cada mes hacia atras. Si estamos en el mes Mayo, 31 de Mayo sera 1, 30 de Abril 2, 31 de Marzo 3, 28 de Febrero 4, etc.
Si queremos que el mes anterior sea el 1, en la linea "calendar.add(Calendar.MONTH, -(i))" habria que cambiar i por i+1.
Para ello creamos un map, donde para el valor i le asignaremos una fecha, en este caso el último día de mes.

In [None]:
import java.util.Calendar
import java.sql.Date

In [None]:
var lastDateMonth = collection.mutable.Map[Int, Date]()
for(i <- 1 to 6){
val calendar = Calendar.getInstance()
calendar.add(Calendar.MONTH, -(i))
val max = calendar.getActualMaximum(Calendar.DAY_OF_MONTH)
calendar.set(Calendar.DAY_OF_MONTH, max)
lastDateMonth += (i-> new Date(calendar.getTimeInMillis()))
}

## Definicion de variables para filtros

Cada vez que creas una variable tienes que poner val seguido del nombre. Esto seria algo como las macrovariables de SAS, ademas hay que hacerlo al crear tablas, textos, etc. Aqui se crean el codigo de banco 0182 en formato string que equivaldria al filtro necesario para cod_entalfa y el codigo de area de negocio de Consumer Finance para poder usarlos como filtros luego.

Ademas definimos las variables que contienen los códigos de pensiones y desempleo y la cantidad minima para cumplir el criterio de pension y desempleo en Plan UNO

In [None]:
val bbvaBankCode = "0182"
val consumerFinanceBusinessArea = 6057

val pensionGroupingId = (72002, 72003)
val unemploymentGroupingId =  72004
val minPension = 540
val minUnemployment = 300

## Ejemplo tipico, criterios de pension y desempleo de Plan UNO

Para realizar cualquier transformación (select,filter, group...) de un dataframe, se aplicará un método sobre el dataframe, cada transformación va precedida por un "."-
Aqui creamos las tablas payrolls6m, que seria una tabla que coge informacion de t_sfma_payroll_m (la POA) de los últimos 6 meses (${lastDateMonth(6)}) con código de banco 0182 y área de negocio diferente de Consumer Finance. Además seleccionamos las columnas que vamos a usar despues.

Para incluir variables definidas previamente en un string (como la fecha, o los filtros de banco y area de negocio) tenemos que incluir una s previo al string que define la query, a continuación vemos el ejemplo en la sentencia where, las variables las indicaremos dentro de la sentenci precedidas por el símbolo $. Si además la variable de entrada es de tipo string la tenemos que meter dentro de ''.

In [None]:
val payrolls6m = sqlContext.read.parquet("/data/master/sfma/t_sfma_payroll_m").
where(s"closing_date >= '${lastDateMonth(6)}' AND entity_id= $bbvaBankCode AND business_area_id!= $consumerFinanceBusinessArea").
select("customer_id","total_credited_1m_amount","closing_date")

%macro meses(periodo);  
proc sql;  
create table payrolls&periodo.m as  
select cod_persona, imp_apuzm, fec_cierre  
from sfmapv.vsfmaiii0&periodo  
where cod_entalfa eq '0182' and cod_areanego ne 6057;  
quit;  
%mend;  
%meses(01);  
%meses(02);  
%meses(03);  
%meses(04);  
%meses(05);  
%meses(06);  

data payrolls6m;  
set payrolls01m payrolls02m payrolls03m payrolls04m payrolls05m payrolls06m;  
run;

Creamos las tablas pension y unemployment añadiendo los filtros de la agregacion comercial (com_grouping_id). Luego sumamos la cantidad percibida por cliente al mes y eliminamos los clientes que no tengan ingresos >0.

In [None]:
val pension = payrolls6m.
where(s"com_grouping_id in $pensionGroupingId").
select("customer_id","total_credited_1m_amount","closing_date").
groupBy("customer_id","closing_date").
agg(sum("total_credited_1m_amount").
as("pension_amount_m1")).
where("pension_amount_m1>0")

proc sql;  
create table pension as  
select cod_persona, fec_cierre, sum(imp_apuzm) as pension_amount_m1  
from payrolls6m  
group by cod_persona, fec_cierre  
having pension_amount_m1>0;  
quit;

In [None]:
val unemployment = payrolls6m.
where(s"com_grouping_id = $unemploymentGroupingId").
select("customer_id","total_credited_1m_amount","closing_date").
groupBy("customer_id","closing_date").
agg(sum("total_credited_1m_amount").
as("unemployment_amount_m1")).
where("unemployment_amount_m1>0")

proc sql;  
create table unemployment as  
select cod_persona, fec_cierre, sum(imp_apuzm) as unemployment_amount_m1  
from payrolls6m  
group by cod_persona, fec_cierre  
having unemployment_amount_m1>0;  
quit;

Cruzamos ambas tablas para crear una con un full join (necesitamos los clientes de ambas tablas), rellenamos con 0s los ingresos vacios de clientes que no tengan ambos ingresos de pension y desempleo. Despues creamos un indicador que nos va a mostrar con 1 los clientes que tienen ingresos de pension o desempleo mayores que los minimos necesarios para cumplir el criterio o 0 si no lo cumplen. El resultado seria una tabla por cliente y mes con los indicadores y los ingresos en €.

In [None]:
val definitivePensionUnemploymentSchema = pension.
join(unemployment,Seq("customer_id","closing_date"),"full").
na.fill(0).
withColumn("pension_type",when($"pension_amount_m1">minPension,1).otherwise(0)).
withColumn("unemployment_type",when($"unemployment_amount_m1">minUnemployment,1).otherwise(0))

data definitivePensionUnemploymentSchema(drop=i);  
merge pension(in=x)  
unemployment(in=y);  
by cod_persona fec_cierre;  
if x or y;  
ARRAY cero numeric;  
DO I=1 TO dim(cero);  
IF cero(i)=. THEN cero(i)=0;  
END;  
if pension_amount_m1>300 then pension_type=1; else pension_type=0;  
if unemployment_amount_m1>300 then unemployment_type=1; else unemployment_type=0;  
run;

Spark se ejecuta de forma "lazy", esto quiero decir, que hasta que no se realiza una accion sobre un dataframe no se "ejecutan" las transformaciones que le preceden. Acciones son por ejemplo count(), show(), take(), etc. Cuando lancemos una query con una de estas operaciones se ejecutarán todas las transformaciones anteriores. Es decir, cuando lanzamos una celda que incluye una acción, en realidad se ejecutan todas las transformaciones que estén involucradas para realizar dicha accion. Las operaciones más costosas en un sistema distribuido como es Spark, son aquellas que requieren movimiento de datos entre máquinas, es lo que se denomina "shuffle", estas operaciones incluyen, groupBy, sort, dropDuplicates, join...etc. 