# **Reto I**

### 1. Datasets

Los datos de origen constan de dos archivos csv con la misma estructura y tipo de columnas.

* trade_details: dataset original con datos reales de operaciones financieras.
* trade_details_snapshot: copia de seguridad por posibles perdidas de datos.

### 2. Columnas y significado:

* mfamily: indica la familia de operaciones a la que pertenece.
* mgroup: indica el grupo de operaciones dentro de la familia.
* mtype: indica el tipo de operación dentro del grupo.
* origin_trade_number: indica el número de la operación de trading (la misma operación puede tener varios números de trading).
* origin_contract_number: indica el número de contrato de la operación (igual para todas las operaciones que pertenecen al mismo contrato).
* maturity: fecha de finalización del contrato de cada operación.

### 3. Descripción del problema:

En estos datasets se encuentran varias operaciones financieras de distinto tipo, que diferenciaremos mediante los distintos valores de las columnas mfamily, mgroup y mtype.

Existe un cierto tipo de operaciones especiales, llamadas FXSwaps. Estas pueden ser diferenciadas por medio de los siguientes valores:

**mfamily = CURR** \
**mgroup = FXD** \
**mtype = SWLEG**

Podemos ver en nuestro dataset que estas operaciones aparecen duplicadas, es decir, con el mismo **origin_contract_number** aunque distinto **origin_trade_number**. De estas operaciones duplicadas en origen, queremos obtener solo una de ellas.

La forma para decidir cuál de las operaciones nos interesa obtener es mediante la columna *maturity*. De ambas operaciones de trading (distinto origin_trade_number) para un mismo contrato (origin_contract_number), queremos obtener solo la *long leg*, es decir, la que tiene una mayor fecha de vencimiento (fecha más actual de la columna maturity).

Existe un cierto problema en nuestro dataset trade_details que tendremos que solucionar. Podemos ver que para algunas operaciones el campo maturity vendrá como *null*, es decir, sin informar. En estos casos, deberemos buscar esa operacion en el dataset trade_details_snapshot y el respectivo campo maturity para poder saber cuál de las dos operaciones es la *long leg* y filtrar la *short leg* 

**NOTA: Si se quiere conocer más el significado de estas operaciones financieras: https://es.wikipedia.org/wiki/Swap_(finanzas)**

### 4. Reto:

* Obtener un dataframe final donde tengamos todas las operaciones originales excepto los short leg de los contratos tipo FXSwap.
* Aunque usemos el valor de la columna maturity del dataset trade_details_snapshot en los casos que venga en la trade_details a *null*, en el dataframe final deberá venir con el valor original de trade_details.
* Hacerlo de la manera más eficiente posible a nivel computacional.

### Inicialización de SparkSession:

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .appName("Reto 1")
                        .master("local")
                        .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://L2110017.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1641542631138)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6d2d97e3


### Carga de CSV

In [2]:
val trade_details = spark.read.format("csv")
                              .option("header", "true")
                              .option("delimiter", ";")
                              .load("./reto1/trade_details.csv")

val trade_details_snapshot = spark.read.format("csv")
                                       .option("header", "true")
                                       .option("delimiter", ";")
                                       .load("./reto1/trade_details_snapshot.csv")

trade_details: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 4 more fields]
trade_details_snapshot: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 4 more fields]


In [3]:
val a =trade_details.select("*").where(col("mfamily") === "CURR" && col("mgroup") === "FXD" && col("mtype") === "SWLEG")
.groupBy("mfamily","mgroup","mtype","origin_contract_number").count()
a.show()

+-------+------+-----+----------------------+-----+
|mfamily|mgroup|mtype|origin_contract_number|count|
+-------+------+-----+----------------------+-----+
|   CURR|   FXD|SWLEG|              18622136|    2|
|   CURR|   FXD|SWLEG|              18724280|    2|
|   CURR|   FXD|SWLEG|              19883451|    2|
|   CURR|   FXD|SWLEG|              21622649|    2|
|   CURR|   FXD|SWLEG|              19622128|    2|
+-------+------+-----+----------------------+-----+



a: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 3 more fields]


In [34]:
val a =trade_details.select("*").groupBy("mfamily","mgroup","mtype","origin_contract_number").agg(max("maturity").alias("a"))
                    .withColumn("a",when(col("a").isNull,datediff(col("a"), col("a"))).otherwise(col("a")))
a.show()

+-------+------+-----+----------------------+----+
|mfamily|mgroup|mtype|origin_contract_number|   a|
+-------+------+-----+----------------------+----+
|    IRD| LN_BR| null|              18176215|   0|
|    IRD|   IRS| null|              17356077|   0|
|    IRD|   IRS| null|               2222222|   0|
|    IRD| LN_BR| null|                  2222|   0|
|    IRD|    CF| null|              19453781|   0|
|    IRD|   IRS| null|             556111214|null|
|    IRD|   IRS| null|             222333111|   0|
|    IRD|   IRS| null|             111222333|   0|
|    IRD|   IRS| null|             555111222|null|
|   CURR|   FXD|SWLEG|              18622136|   0|
|    IRD|   IRS| null|             333111222|   0|
|   CURR|   FXD|SWLEG|              18724280|null|
|    IRD|  BOND|  FWD|              10000009|   0|
|    IRD|    CF| null|              19433281|   0|
|    IRD|   IRS| null|              20077630|   0|
|    IRD|   IRS| null|             564367838|   0|
|   CURR|   FXD|SWLEG|         

a: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 3 more fields]


In [108]:
val resultado =trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number","A.origin_trade_number","A.maturity")
                    .agg(max("B.maturity").alias("ad"))
                    .orderBy("ad")
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number")
                    .agg(first("origin_trade_number").alias("origin_trade_number"),first("maturity").alias("maturity"))
                    
                   
resultado.orderBy("A.origin_contract_number").show()

+-------+------+-----+----------------------+-------------------+----------+
|mfamily|mgroup|mtype|origin_contract_number|origin_trade_number|  maturity|
+-------+------+-----+----------------------+-------------------+----------+
|    EQD| EQUIT|  FWD|              10000001|           10000001|2019-07-02|
|    IRD|  BOND|  FWD|              10000009|           10000009|2021-06-12|
|    IRD|   IRS| null|             111222333|          111222333|2024-10-15|
|    IRD| LN_BR| null|              13774383|           14596583|2020-12-29|
|    IRD|   IRS| null|              17356077|           18343978|2024-10-15|
|    IRD| LN_BR| null|              18176215|           19203839|2022-10-06|
|   CURR|   FXD|SWLEG|              18622136|           19665185|2020-04-29|
|   CURR|   FXD|SWLEG|              18724280|           19772400|2021-11-05|
|    IRD|    CF| null|              19433281|           20513130|2021-07-06|
|    IRD|    CF| null|              19453781|           20533916|2023-07-06|

resultado: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 4 more fields]


In [96]:
val a =trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number","A.origin_trade_number","A.maturity")
                    .agg(max("B.maturity").alias("ad"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number")
                    .agg(max("ad").alias("maturity2"))
                    .join(trade_details.as("C"),col("A.origin_contract_number") === col("C.origin_contract_number"))
                    .select("A.mfamily","A.mgroup","A.mtype","C.origin_contract_number","C.origin_trade_number","C.maturity")
a.orderBy("A.origin_contract_number").show()

+-------+------+-----+----------------------+-------------------+----------+----------+
|mfamily|mgroup|mtype|origin_contract_number|origin_trade_number|  maturity|        ad|
+-------+------+-----+----------------------+-------------------+----------+----------+
|    EQD| EQUIT|  FWD|              10000001|           10000001|2019-07-02|2019-07-02|
|    IRD|  BOND|  FWD|              10000009|           10000009|2021-06-12|2021-06-12|
|    IRD|   IRS| null|             111222333|          111222333|2024-10-15|2024-10-15|
|    IRD| LN_BR| null|              13774383|           14596583|2020-12-29|2020-12-29|
|    IRD|   IRS| null|              17356077|           18343978|2024-10-15|2024-10-15|
|    IRD| LN_BR| null|              18176215|           19203839|2022-10-06|2022-10-06|
|   CURR|   FXD|SWLEG|              18622136|           19665185|2020-04-29|2020-12-30|
|   CURR|   FXD|SWLEG|              18622136|           19665186|2020-12-30|2020-12-30|
|   CURR|   FXD|SWLEG|          

a: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 5 more fields]


In [22]:
val a =trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number","A.origin_trade_number","A.maturity")
                    .pivot("A.maturity")
                    .agg(max("B.maturity").alias("ad"))
a.orderBy("A.origin_contract_number").show()

+-------+------+-----+----------------------+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|mfamily|mgroup|mtype|origin_contract_number|origin_trade_number|  maturity|2019-07-02|2020-04-29|2020-12-12|2020-12-29|2020-12-30|2021-06-12|2021-07-06|2021-09-22|2021-11-05|2022-10-06|2023-07-06|2024-10-15|2040-07-13|      NULL|
+-------+------+-----+----------------------+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|    EQD| EQUIT|  FWD|              10000001|           10000001|2019-07-02|2019-07-02|      null|      null|      null|      null|      null|      null|      null|      null|      null|      null|      null|      null|      null|
|    IRD|  BOND|  FWD|              10000009|           10000009|2021-06-12|

a: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 18 more fields]


In [82]:
val resultado = trade_details.alias("fst").join( trade_details_snapshot.alias("snd"),Seq("origin_contract_number"),"outer")
.where(col("fst.maturity").isNotNull)
.withColumn("maturity_noNull", when(col("fst.maturity") === "NULL",col("snd.maturity"))
.otherwise(col("fst.maturity")))
.withColumn("maturity_Null", when(col("fst.maturity")==="NULL","NULL").otherwise(col("maturity_noNull")))
.select("origin_contract_number","fst.*","maturity_noNull","maturity_Null")
.groupBy("mfamily", "mgroup", "mtype","origin_contract_number", "origin_trade_number")
.agg(max("maturity_Null").alias("maturity")).orderBy("maturity")
.groupBy("mfamily", "mgroup", "mtype","origin_contract_number")
.agg(first("origin_trade_number").alias("origin_trade_number"),
first("maturity").alias("maturity"))

resultado.orderBy("origin_trade_number").show(50)

+-------+------+-----+----------------------+-------------------+----------+
|mfamily|mgroup|mtype|origin_contract_number|origin_trade_number|  maturity|
+-------+------+-----+----------------------+-------------------+----------+
|    EQD| EQUIT|  FWD|              10000001|           10000001|2019-07-02|
|   CURR|   FXD|SWLEG|              19622128|           20665177|2020-04-29|
|   CURR|   FXD|SWLEG|              18622136|           19665185|2020-04-29|
|    IRD|   IRS| null|             333111222|          333111222|2020-12-12|
|    IRD| LN_BR| null|              13774383|           14596583|2020-12-29|
|   CURR|   FXD|SWLEG|              19622128|           20665178|2020-12-30|
|   CURR|   FXD|SWLEG|              18622136|           19665186|2020-12-30|
|    IRD|   IRS| null|             222333111|          222333111|2020-12-30|
|    IRD|  BOND|  FWD|              10000009|           10000009|2021-06-12|
|    IRD|    CF| null|              19433281|           20513130|2021-07-06|

resultado: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [mfamily: string, mgroup: string ... 4 more fields]


In [34]:
import org.apache.spark.sql.expressions.Window
val windowP = Window.partitionBy("origin_contract_number")
val resultado = trade_details.join(trade_details_snapshot,when(trade_details("maturity")==="NULL",trade_details("origin_contract_number")===trade_details_snapshot("origin_contract_number") && trade_details("origin_trade_number")===trade_details_snapshot("origin_trade_number")),"left") //completar valores null
.select(trade_details("mfamily"),trade_details("mgroup"),trade_details("mtype"),trade_details("origin_trade_number"),trade_details("origin_contract_number"),trade_details("maturity"),trade_details_snapshot("maturity").alias("maturity_snapshot")) //selecionar los campos del join que interesan (dejando los completados en otra columna)
.withColumn("maturity_not_null",when(col("maturity")==="NULL",col("maturity_snapshot"))
.otherwise(col("maturity"))) //crear columna que una los maturity originales y completados
.withColumn("maturity_not_null",when((col("maturity_not_null") === max("maturity_not_null").over(windowP)),col("maturity_not_null"))
.otherwise("DELETE")) //si es la long la deja como esta y si es la short la marca para borrarla
.filter(!(col("maturity_not_null")==="DELETE")) //borrar deletes (shorts)
.drop("maturity_snapshot","maturity_not_null") //borrar columnas que sobran

resultado.orderBy("origin_trade_number").show()

+-------+------+-----+-------------------+----------------------+----------+-----------------+-----------------+
|mfamily|mgroup|mtype|origin_trade_number|origin_contract_number|  maturity|maturity_snapshot|maturity_not_null|
+-------+------+-----+-------------------+----------------------+----------+-----------------+-----------------+
|    EQD| EQUIT|  FWD|           10000001|              10000001|2019-07-02|             null|       2019-07-02|
|    IRD|  BOND|  FWD|           10000009|              10000009|2021-06-12|             null|       2021-06-12|
|    IRD| LN_BR| null|               1111|                  2222|2022-10-06|             null|       2022-10-06|
|    IRD|   IRS| null|          111222333|             111222333|2024-10-15|             null|       2024-10-15|
|    IRD| LN_BR| null|           14596583|              13774383|2020-12-29|             null|       2020-12-29|
|    IRD|   IRS| null|           18343978|              17356077|2024-10-15|             null|  

import org.apache.spark.sql.expressions.Window
windowP: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@15da8116
resultado: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 6 more fields]


### Resultado:

**INSTRUCCIONES**: El DataFrame resultante debe almacenarse en la variable `resultado`, sustituyendo el valor `None` por el código que consideréis oportuno. De esta forma podréis comprobar si el resultado es correcto.


In [52]:
val resultado = trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number","A.origin_trade_number","A.maturity")
                    .agg(max("B.maturity").alias("ad"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number")
                    .agg(first("origin_trade_number").alias("origin_trade_number"),max("maturity").alias("maturity"))

resultado: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 4 more fields]


In [54]:
val resultado = trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number","A.origin_trade_number","A.maturity")
                    .agg(max("B.maturity").alias("ad"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number")
                    .agg(max("ad").alias("maturity"),first("origin_trade_number").alias("origin_trade_number"))

resultado: org.apache.spark.sql.DataFrame = [mfamily: string, mgroup: string ... 4 more fields]


In [90]:
//bie
val resultado =trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_trade_number","A.origin_contract_number","A.maturity")
                    .agg(max("B.maturity").alias("aux"))
                    .orderBy("aux")
                    .groupBy("A.mfamily","A.mgroup","A.mtype","A.origin_contract_number")
                    .agg(first("origin_trade_number").alias("origin_trade_number"),first("maturity").alias("maturity"))
                    .select("A.mfamily","A.mgroup","A.mtype","origin_trade_number","A.origin_contract_number","maturity")
resultado.orderBy("origin_contract_number").show(50)

+-------+------+-----+-------------------+----------------------+----------+----------+
|mfamily|mgroup|mtype|origin_trade_number|origin_contract_number|  maturity|       aux|
+-------+------+-----+-------------------+----------------------+----------+----------+
|    EQD| EQUIT|  FWD|           10000001|              10000001|2019-07-02|2019-07-02|
|    IRD|  BOND|  FWD|           10000009|              10000009|2021-06-12|2021-06-12|
|    IRD|   IRS| null|          111222333|             111222333|2024-10-15|2024-10-15|
|    IRD| LN_BR| null|           14596583|              13774383|2020-12-29|2020-12-29|
|    IRD|   IRS| null|           18343978|              17356077|2024-10-15|2024-10-15|
|    IRD| LN_BR| null|           19203839|              18176215|2022-10-06|2022-10-06|
|   CURR|   FXD|SWLEG|           19665186|              18622136|2020-12-30|2020-12-30|
|   CURR|   FXD|SWLEG|           19665185|              18622136|2020-04-29|2020-12-30|
|   CURR|   FXD|SWLEG|          

resultado: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [mfamily: string, mgroup: string ... 5 more fields]


In [4]:
//bien
val resultado = trade_details.as("A").join(trade_details_snapshot.as("B"),col("A.origin_contract_number") === col("B.origin_contract_number"))
                    .withColumn("aux", when(col("A.maturity") === "NULL","NULL").otherwise(col("B.maturity")))
                    .groupBy("A.mfamily", "A.mgroup", "A.mtype","A.origin_contract_number","A.origin_trade_number")
                    .agg(max("aux").alias("maturity"),max("B.maturity").alias("maturity_snapshot"))
                    .orderBy(desc("maturity"),desc("maturity_snapshot"))
                    
                    
resultado.show(50)

+-------+------+-----+----------------------+-------------------+----------+-----------------+
|mfamily|mgroup|mtype|origin_contract_number|origin_trade_number|  maturity|maturity_snapshot|
+-------+------+-----+----------------------+-------------------+----------+-----------------+
|   CURR|   FXD|SWLEG|              18724280|           19772399|      NULL|       2021-11-05|
|   CURR|   FXD|SWLEG|              21622649|           22798005|      NULL|       2021-05-11|
|   CURR|   FXD|SWLEG|              21622649|           22798004|      NULL|       2021-05-11|
|    IRD|   IRS| null|             555111222|          555111222|      NULL|       2021-05-04|
|    SCF|   SCF|  SCF|               3672136|            3815982|      NULL|       2021-05-04|
|    IRD|   IRS| null|             556111214|          556111214|      NULL|       2021-05-04|
|    IRD|   IRS| null|             444111222|          444111222|      NULL|       2021-05-04|
|   CURR|   FXD|SWLEG|              19883451|     

resultado: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [mfamily: string, mgroup: string ... 5 more fields]


Ejecuta la siguiente celda (no modifiques su código) y te dirá si tu solución es correcta o no. En caso de ser correcta, se ejecutará correctamente y no mostrará nada, pero si no lo es mostrará un error. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no.

In [104]:
assert(resultado.count() == 26)
assert(resultado.orderBy("origin_contract_number").collect()(24)(4) == "564367838")
assert(resultado.orderBy("origin_contract_number").collect()(19)(5) == "NULL")
assert(resultado.orderBy("origin_trade_number").collect()(16)(5) == "NULL")