<img src="../images/Spark_logo.png" width="350">

# Best Practices in DATIO platform

<div style="background-color:white; text-align:center; padding:10px; color:black; margin-left:0px; border-radius: 10px; font-family:Trebuchet MS; font-size:20px">
<strong>Mexico City, 03 april 2019</strong>
</div>

<table class="table table-bordered table-hover">
  <thead>
    <tr>
      <th scope="col">Authors</th>
      <th scope="col">Organization</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>Marcos Olguín Martínez</td>
      <td>BBVA</td>
    </tr>
    <tr>
      <td>Libertad Pantoja Hernández</td>
      <td>DATIO</td>
    </tr>
    <tr>
      <td>Cesar Castañeda Reyes</td>
      <td>DATIO</td>
    </tr>
  </tbody>
</table>

***

<div style="background-color:white; text-align:center; padding:10px; color:steelblue; margin-left:0px; border-radius: 10px; font-family:Trebuchet MS; font-size:18px">
The purpose of this document is to show the best of programming and code organization practices to achieve a better use of the DATIO platform and optimize its performance.
</div>



# How should organize yours Jupyter Notebooks?

- Spark services
- Libraries
- Load JSON information (parameters)
- Spark code
    - Locate related python functions into the same file
    - Data Frames in Spark
    - Select
    - Create new columns
    - Filter
    - groupBy and aggregate
    - Functions
    - Results throughout the document (graphs, tables, etc.)
    - Save results into Sandbox Manager
- Best practices examples

## Spark services

```python
import pyspark
sc = pyspark.SparkContext('local[*]')
sc
```

## Libraries

```python
import pyspark.sql.functions as F
from pyspark.sql.types import *
from Utilerias import *
import matplotlib.pyplot as plt
import sys
```

## JSON files (parameters)

The structure structure of JSON file will be as follows:

```json
{
	"PATHS":
	{
        "INSUMOS" : "/path/of/your/sandbox/data_base_input",
        "SALIDAS" : "/path/of/your/sandbox/data_base_output"
	},
	
	"INPUTS":
	{
		"BALANCES"   : "balances_xxxxxx.csv",
		"REFUNDIDOS" : "cat_refundidos_xxxx_validado.csv",
		"STSM"       : "CLON_XXXXX_EGE_XXXX_300M.csv"
    },
	
	"OUTPUTS":
	{
		"DOC_OUTPUT_1" : "DOC_SERIE_COHORTE_SEGMENTOS.csv",
		"DOC_OUTPUT_2" : "DOC_SERIE_ANUAL_SEGMENTOS.csv"
    },

	"SCHEMAS": 
	{
		"BALANCES" : [["CLIENTE","str"],
                      ["NANOEJER","int"],
                      ["MES_EJER", "str"],
                      ["TIPO_EMPRESA","str"],
                      ["CANT_MESES","int"],
                      ["UNID_MEDIDA","str"],
                      ["COD_DIVISA","str"],
                      ...
     
```

We create a function in python to read the information that is contained in the JSON and store it inside a python class in a file that in our case we call **"utilerias.py"**, but they can put the name that best fits your code .

```python
import json

class Utilerias(object):

    #>>> Function that reads the json file
    def fnCargaJson(nexus_url):
        with open(nexus_url) as json_file: properties = json.load(json_file)
        return properties
```

One way to load the information from JSON files to the Notebook, is as follows:

```python
# RUTA DE INSUMOS Y NOMBRE DE LOS ARCHIVOS
sRuta = ""
sNombreParam = "Parametros.json"

# RUTAS COMPLETAS
sParametros = sRuta + sNombreParam

# CARGANDO PARAMETROS
sContenidoJson = Utilerias.fnCargaJson(sParametros)

# ALMACENANDO RUTAS DE INSUMOS Y SALIDA
sRutaInsumos = sContenidoJson["PATHS"]["INPUTS"]
sRutaSalidas = sContenidoJson["PATHS"]["OUTPUTS"]
```

Load schemas from JSON file

```python
# Arreglo auxiliar para cambiar el tipo de dato de python a PySpark
dTiposDatos = {
               "int" :IntegerType(), "double" : DoubleType(), "date"    : DateType(),
               "str" : StringType(), "float"  : FloatType(),  "decimal" : DecimalType(30,10)
              }

esquemaBalances  = sContenidoJson["SCHEMAS"]["BALANCES"]
```

<div class="alert alert-success" role="alert">

<h4 class="alert-heading">Thinking about productivization?</h4>
<hr>
When you migrate to the archetype, \*.py files are going to be used to load the parameters. However, the *.json files guarantees a better organization in the preliminar development stages.
  <button type="button" class="close" data-dismiss="alert" aria-label="Close">
    <span aria-hidden="true">&times;</span>
  </button>
</div>

### How can I load Sparks Schemas?

<div class="alert alert-danger" role="alert">
Do not load Spark schemes like this because it doesn't work:
</div>

<div class="alert alert-success alert-dismissable" role="alert">
Load Spark scheme like this:
</div>

It is important to create functions in python that help with high complexity mathematical processess and not to keep these operations inside the notebook, but in a different class (file). This way we will have a better organized and cleaner code.

If we organize the notebook in this way, we will have a loading libraries, load tables and variables transformations sections.

**Load tables**

**Load schemes**

## Spark code

### Locate related python functions into the same file

In the file that in this case we will call **"Utilerias.py"**, in addition to the function that loads files of type JSON, store functions that will help us to apply transformations yo the columns, such as formatting columns of date type. We may also include functions that measure how long it takes to execute the code, such as:

```python
class Utilerias(object):

    #>>> Función para devolver la hora actual
    def fnHoraActual():
        sHoraActual= datetime.datetime.now(pytz.timezone('Mexico/General')).strftime("%H:%M:%S %p")
        return sHoraActual

    #>>> Función para devolver tiempo transcurrido
    def fnTiempoTranscurrido (HoraInicio, HoraFin):
        sTiempoTranscurrido = dt.strptime(HoraFin[:8],'%H:%M:%S') - dt.strptime(HoraInicio[:8],'%H:%M:%S')
        return str(sTiempoTranscurrido)
        
    #>>> Función para convertir una fecha String a Date
    def fnConvierteFecha (fechaConvertir):
        if (fechaConvertir is None):
            return None
        else:
            sTipoFecha  = fechaConvertir[2:3]
            if (sTipoFecha == "/"):
                return dt.strptime(fechaConvertir,'%d/%m/%Y')
            else:
                sTipoFecha  = fechaConvertir[4:5]
            if (sTipoFecha == "-"):
                return dt.strptime(fechaConvertir,'%Y-%m-%d')
            else:
                "9999-01-01"
```

### Data Frames in Spark

Starting Spark services (just run once)

In [3]:
import pyspark
sc = pyspark.SparkContext('local[*]')
sc

In [4]:
from pyspark.sql import SparkSession

To create a data frame in Spark we need to:

- Load a data base from a plane file (for instance CSV).
- Connect to a large distributed file (like HDFS).
- Convert a RDD to DataFrame

In [5]:
# This may take some time in your local PC
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [6]:
# Let's start by reading a new file
df = spark.read.json('./data/people2.json')

In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|   7|  Sofia|
|null| Martha|
+----+-------+



In [9]:
df.limit(3).toPandas()
# limit = limit the result to a max "n" amount of registers (in this case n = 3)
# Convert the limited [limit()] result to a Pandas table (Python)

Unnamed: 0,age,name
0,,Michael
1,30.0,Andy
2,19.0,Justin


In [10]:
# show the columns compounding the Data Frame (a.k.a. DF)
df.columns

['age', 'name']

In [11]:
# Ultra short DF analysis
df.describe().show()

+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 3|    5|
|   mean|18.666666666666668| null|
| stddev|11.503622617824933| null|
|    min|                 7| Andy|
|    max|                30|Sofia|
+-------+------------------+-----+



<div class="alert alert-danger" role="alert">
For DataFrames we do not have either the <strong>isEmpty</strong> or <strong>countApprox</strong> methods, so if we want to know if our DataFrame contains data, it is recommended to convert it to a RDD. In general, it is not a good idea to rise a <strong>collect</strong> or a <strong>count</strong> action in DataFrames which magnitude we do not know, since it can involve a high computational cost.
</div>

In [12]:
df.rdd.isEmpty()

False

In [13]:
df.rdd.countApprox(timeout=800)

5

Spark has tools for adjust the schemas of our table:

In [14]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

Once we load the tools we need to create the list of structured fields (StructFields), which have three parameters mainly.

```
StructField(String name, DataType dataType, boolean nullable)
```


* **name:** field name (String).
* **dataType:** field `DataType`.
* **nullable:** Boolean, can it be either null (None) or not.

In [15]:
# Manually defined Schema
data_schema = [StructField("age", IntegerType(), True),
               StructField("name", StringType(), True)]

We define the scheme completely.

In [16]:
final_struc = StructType(fields=data_schema)

And finally assign the schema.

In [17]:
df = spark.read.json('./data/people2.json', schema=final_struc)

In [18]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



The scheme defined by Spark was:

<div class="alert alert-info" role="alert">Defining the scheme as previously shown reduces computational costs.</div>

<div class="alert alert-success" role="alert">

<h4 class="alert-heading">Thinking about productivization?</h4>
<hr>
Once you are developing the productive version, schemas will be needed to check if the data is as we expect it and if the process outputs have the desired data type.
  <button type="button" class="close" data-dismiss="alert" aria-label="Close">
    <span aria-hidden="true">&times;</span>
  </button>
</div>

### Select

In [19]:
df[["age"]].show()

+----+
| age|
+----+
|null|
|  30|
|  19|
|   7|
|null|
+----+



In [20]:
df.select('age')

DataFrame[age: int]

In [21]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
|   7|
|null|
+----+



In [22]:
# Returns list of Row objects
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

#### Select multiples columns

In [23]:
df.select('age','name')

DataFrame[age: int, name: string]

In [24]:
df.select('age','name').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|   7|  Sofia|
|null| Martha|
+----+-------+



In [25]:
# Remember limit the outputs
df.select('age','name').limit(2).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
+----+-------+



### Create new columns

In [26]:
# Add a new column like a copy of another
df.withColumn('newage',df['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
|   7|  Sofia|     7|
|null| Martha|  null|
+----+-------+------+



In [27]:
# Renaming one column
df.withColumnRenamed('age','supernewage').show()

+-----------+-------+
|supernewage|   name|
+-----------+-------+
|       null|Michael|
|         30|   Andy|
|         19| Justin|
|          7|  Sofia|
|       null| Martha|
+-----------+-------+



In [28]:
df.fillna(0).show()
#df.fillna(0, subset=['column_a', 'column_b'])

+---+-------+
|age|   name|
+---+-------+
|  0|Michael|
| 30|   Andy|
| 19| Justin|
|  7|  Sofia|
|  0| Martha|
+---+-------+



<div class="alert alert-danger" role="alert">
In the DATIO platform it is not recommended to use the _"withColumn"_ instruction, instead **_SELECT_** and **_ALIAS_** should be used.
</div>

In [29]:
df.columns

['age', 'name']

In [30]:
listaColumnas = df.columns
listaColumnas

['age', 'name']

In [31]:
import pyspark.sql.functions as F

In [32]:
df.select(*listaColumnas,
          F.lit(1).alias("number_one")
         ).show()

+----+-------+----------+
| age|   name|number_one|
+----+-------+----------+
|null|Michael|         1|
|  30|   Andy|         1|
|  19| Justin|         1|
|   7|  Sofia|         1|
|null| Martha|         1|
+----+-------+----------+



In [33]:
df.select(*listaColumnas,
          (F.col("age")/20).alias("division")
         ).fillna(0, subset=["division"]).toPandas()

Unnamed: 0,age,name,division
0,,Michael,0.0
1,30.0,Andy,1.5
2,19.0,Justin,0.95
3,7.0,Sofia,0.35
4,,Martha,0.0


#### Some operations are slightly more complicated

Remember, we should't use `withColumn`, instead use `select` and `alias`.

In [34]:
#df.withColumn('doubleage',df['age']*2).show()
df.select(*listaColumnas, (df['age']*2).alias("doubleage")).show()

+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|null|Michael|     null|
|  30|   Andy|       60|
|  19| Justin|       38|
|   7|  Sofia|       14|
|null| Martha|     null|
+----+-------+---------+



In [35]:
#df.withColumn('add_one_age',df['age']+1).show()
df.select(*listaColumnas, (df['age']+1).alias('add_one_age')).show()

+----+-------+-----------+
| age|   name|add_one_age|
+----+-------+-----------+
|null|Michael|       null|
|  30|   Andy|         31|
|  19| Justin|         20|
|   7|  Sofia|          8|
|null| Martha|       null|
+----+-------+-----------+



In [36]:
#df.withColumn('half_age',df['age']/2).show()
df.select(*listaColumnas, (df['age']/2).alias('half_age')).show()

+----+-------+--------+
| age|   name|half_age|
+----+-------+--------+
|null|Michael|    null|
|  30|   Andy|    15.0|
|  19| Justin|     9.5|
|   7|  Sofia|     3.5|
|null| Martha|    null|
+----+-------+--------+



In [37]:
#df.withColumn('half_age',df['age']/2)
df.select(*listaColumnas, (df['age']/2).alias('half_age').astype("string")).printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- half_age: string (nullable = true)



### Filter

In [38]:
df = spark.read.csv('./data/tempVol_CorpGar.csv', inferSchema=True, header=True)

In [39]:
df.printSchema()

root
 |-- estado: string (nullable = true)
 |-- FEC_RRENTRAD_CTO: timestamp (nullable = true)
 |-- XTI_SOLUCION: string (nullable = true)
 |-- YINC: integer (nullable = true)
 |-- LGD_OBS_1: double (nullable = true)
 |-- MARCA_MIT_CURVA_INF: string (nullable = true)
 |-- LGDCI_1: double (nullable = true)
 |-- LGDCI_2: double (nullable = true)
 |-- LGDCI_3: double (nullable = true)
 |-- LGDCI_4: double (nullable = true)
 |-- LGDCI_5: double (nullable = true)
 |-- LGDCI_6: double (nullable = true)
 |-- LGDCI_7: double (nullable = true)
 |-- LGDCI_8: double (nullable = true)
 |-- LGDCI_9: double (nullable = true)
 |-- LGDCI_10: double (nullable = true)
 |-- LGDCI_11: double (nullable = true)
 |-- LGDCI_12: double (nullable = true)
 |-- LGDCI_13: double (nullable = true)
 |-- LGDCI_14: double (nullable = true)
 |-- LGDCI_15: double (nullable = true)
 |-- LGDCI_16: double (nullable = true)
 |-- LGDCI_17: double (nullable = true)
 |-- LGDCI_18: double (nullable = true)
 |-- LGDCI_19: double 

In [40]:
df.limit(10).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,LGD_OBS_1,MARCA_MIT_CURVA_INF,LGDCI_1,LGDCI_2,LGDCI_3,LGDCI_4,...,NOMISSING_28,NOMISSING_29,NOMISSING_30,NOMISSING_31,NOMISSING_32,NOMISSING_33,NOMISSING_34,NOMISSING_35,NOMISSING_36,FREQ
0,OBSERVADO,2012-10-24,CERRADO,2012,0.50529,,-0.02044,-0.02044,-0.02044,-0.02044,...,20,20,20,19,19,19,19,18,17,231
1,OBSERVADO,2016-04-30,CERRADO,2016,0.436739,,-0.026093,-0.026093,,,...,20,20,20,19,19,19,19,18,17,231
2,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
3,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
4,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
5,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
6,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
7,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
8,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
9,OBSERVADO,2015-03-31,CERRADO,2015,0.462892,,-0.027922,-0.027922,-0.027922,-0.027922,...,20,20,20,19,19,19,19,18,17,231


In [41]:
print("Filas: {0:,}\tColumnas: {1:,}".format(df.count(), len(df.columns)))

Filas: 610	Columnas: 187


In [42]:
df.select("estado","LGDCI_1","LGDCI_2","LGDCI_3","NOMISSING_1").describe().show()

+-------+------+------------------+-------------------+-------------------+-----------------+
|summary|estado|           LGDCI_1|            LGDCI_2|            LGDCI_3|      NOMISSING_1|
+-------+------+------------------+-------------------+-------------------+-----------------+
|  count|   610|               610|                596|                581|              610|
|   mean|  null|0.4430530852459018|0.46528672651006725| 0.4629620240963857|272.7311475409836|
| stddev|  null|0.5011260894417999| 0.3688855521174224|0.36397779629587207|80.80642307499596|
|    min|ABCAVE|           -7.8573|          -0.316506|          -0.316506|               46|
|    max| QUITA|               1.0|                1.0|                1.0|              333|
+-------+------+------------------+-------------------+-------------------+-----------------+



In [43]:
import pyspark.sql.functions as F

In [44]:
listaColumnas = ["estado","LGDCI_1","LGDCI_2","LGDCI_3","NOMISSING_1"]
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in listaColumnas]).show()

+------+-------+-------+-------+-----------+
|estado|LGDCI_1|LGDCI_2|LGDCI_3|NOMISSING_1|
+------+-------+-------+-------+-----------+
|     0|      0|     14|     29|          0|
+------+-------+-------+-------+-----------+



Another example with **filter**.

In [45]:
df.filter(df.NOMISSING_1 < 100).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1") \
    .limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,QUITA,2016-01-22,QUITA,2016,46
1,QUITA,2010-04-16,QUITA,2010,46
2,QUITA,2015-01-05,QUITA,2015,46
3,QUITA,2014-12-19,QUITA,2014,46
4,QUITA,2014-12-22,QUITA,2014,46


In [46]:
df.filter(df.NOMISSING_1 > 300).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1").limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,ABCAVE,2013-07-29,VENTA,2013,333
1,ABCAVE,2013-07-29,VENTA,2013,333
2,ABCAVE,2013-07-29,VENTA,2013,333
3,ABCAVE,2013-07-29,VENTA,2013,333
4,ABCAVE,2013-07-29,VENTA,2013,333


In [47]:
#Another way to do it
df.filter("NOMISSING_1 > 300").select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1").limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,ABCAVE,2013-07-29,VENTA,2013,333
1,ABCAVE,2013-07-29,VENTA,2013,333
2,ABCAVE,2013-07-29,VENTA,2013,333
3,ABCAVE,2013-07-29,VENTA,2013,333
4,ABCAVE,2013-07-29,VENTA,2013,333


In [48]:
#Another way to do it
df.filter(F.col("NOMISSING_1") > 300).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1").limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,ABCAVE,2013-07-29,VENTA,2013,333
1,ABCAVE,2013-07-29,VENTA,2013,333
2,ABCAVE,2013-07-29,VENTA,2013,333
3,ABCAVE,2013-07-29,VENTA,2013,333
4,ABCAVE,2013-07-29,VENTA,2013,333


If we don't use **select**, this code shows all columns:

In [49]:
df.filter("NOMISSING_1 > 300").limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,LGD_OBS_1,MARCA_MIT_CURVA_INF,LGDCI_1,LGDCI_2,LGDCI_3,LGDCI_4,...,NOMISSING_28,NOMISSING_29,NOMISSING_30,NOMISSING_31,NOMISSING_32,NOMISSING_33,NOMISSING_34,NOMISSING_35,NOMISSING_36,FREQ
0,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
1,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
2,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
3,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333
4,ABCAVE,2013-07-29,VENTA,2013,0.604233,,0.890917,0.890917,0.890917,0.890917,...,307,298,268,266,265,264,259,254,254,333


Ok, so, What do the next spark commands do with our original DF?

In [50]:
df.filter((df['NOMISSING_1'] > 300) & (df['YINC'] == 2012)).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1") \
    .limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,ABCAVE,2012-09-23,CASTIGO,2012,333
1,ABCAVE,2012-09-23,CASTIGO,2012,333
2,ABCAVE,2012-09-23,CASTIGO,2012,333
3,ABCAVE,2012-09-23,CASTIGO,2012,333
4,ABCAVE,2012-09-23,CASTIGO,2012,333


In [51]:
df.filter((df['NOMISSING_1'] > 300) & ~(df['YINC'] == 2012)).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1") \
    .limit(5).toPandas()
# Character "~" means denied

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,ABCAVE,2013-07-29,VENTA,2013,333
1,ABCAVE,2013-07-29,VENTA,2013,333
2,ABCAVE,2013-07-29,VENTA,2013,333
3,ABCAVE,2013-07-29,VENTA,2013,333
4,ABCAVE,2013-07-29,VENTA,2013,333


In [52]:
df.filter((df['XTI_SOLUCION'] == "CERRADO")).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","NOMISSING_1") \
    .limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,NOMISSING_1
0,OBSERVADO,2012-10-24,CERRADO,2012,231
1,OBSERVADO,2016-04-30,CERRADO,2016,231
2,OBSERVADO,2015-03-31,CERRADO,2015,231
3,OBSERVADO,2015-03-31,CERRADO,2015,231
4,OBSERVADO,2015-03-31,CERRADO,2015,231


In [53]:
df.filter(df['MARCA_MIT_CURVA_INF'].isNull() == True).select("estado","FEC_RRENTRAD_CTO","XTI_SOLUCION","YINC","MARCA_MIT_CURVA_INF") \
    .limit(5).toPandas()

Unnamed: 0,estado,FEC_RRENTRAD_CTO,XTI_SOLUCION,YINC,MARCA_MIT_CURVA_INF
0,OBSERVADO,2012-10-24,CERRADO,2012,
1,OBSERVADO,2016-04-30,CERRADO,2016,
2,ABCAVE,2013-07-29,VENTA,2013,
3,ABCAVE,2013-07-29,VENTA,2013,
4,ABCAVE,2013-07-29,VENTA,2013,


We can save our information into a dictionary:

In [54]:
result = df.filter((df['YINC'] == 2016)).collect()

In [55]:
row = result[0]

In [56]:
row.asDict()

{'FEC_RRENTRAD_CTO': datetime.datetime(2016, 4, 30, 0, 0),
 'FREQ': 231,
 'LGDCIP_1': -0.026093,
 'LGDCIP_10': None,
 'LGDCIP_11': None,
 'LGDCIP_12': None,
 'LGDCIP_13': None,
 'LGDCIP_14': None,
 'LGDCIP_15': None,
 'LGDCIP_16': None,
 'LGDCIP_17': None,
 'LGDCIP_18': None,
 'LGDCIP_19': None,
 'LGDCIP_2': -0.026093,
 'LGDCIP_20': None,
 'LGDCIP_21': None,
 'LGDCIP_22': None,
 'LGDCIP_23': None,
 'LGDCIP_24': None,
 'LGDCIP_25': None,
 'LGDCIP_26': None,
 'LGDCIP_27': None,
 'LGDCIP_28': None,
 'LGDCIP_29': None,
 'LGDCIP_3': None,
 'LGDCIP_30': None,
 'LGDCIP_31': None,
 'LGDCIP_32': None,
 'LGDCIP_33': None,
 'LGDCIP_34': None,
 'LGDCIP_35': None,
 'LGDCIP_36': None,
 'LGDCIP_4': None,
 'LGDCIP_5': None,
 'LGDCIP_6': None,
 'LGDCIP_7': None,
 'LGDCIP_8': None,
 'LGDCIP_9': None,
 'LGDCI_1': -0.026093,
 'LGDCI_10': None,
 'LGDCI_11': None,
 'LGDCI_12': None,
 'LGDCI_13': None,
 'LGDCI_14': None,
 'LGDCI_15': None,
 'LGDCI_16': None,
 'LGDCI_17': None,
 'LGDCI_18': None,
 'LGDCI_19':

In [57]:
row.asDict()['FREQ']

231

### groupBy and aggregate

In [58]:
print("Filas: {0:,}\tColumnas: {1:,}".format(df.count(), len(df.columns)))

Filas: 610	Columnas: 187


In [59]:
df.describe("PESO_1","PESO_11").show()

+-------+-------------------+-------------------+
|summary|             PESO_1|            PESO_11|
+-------+-------------------+-------------------+
|  count|                610|                441|
|   mean| 0.9902107754098362| 0.9864593492063495|
| stddev|0.09830107049049229|0.11542832104976965|
|    min|           0.001633|           0.001633|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



**groupBy**

In [60]:
df.select("MARCA_MIT_CURVA_INF","PESO_1").groupBy("MARCA_MIT_CURVA_INF").mean().show()

+-------------------+------------------+
|MARCA_MIT_CURVA_INF|       avg(PESO_1)|
+-------------------+------------------+
|               null|0.9900641813643928|
|                  X|               1.0|
+-------------------+------------------+



In [61]:
df.select("MARCA_MIT_CURVA_INF","PESO_1").groupBy("MARCA_MIT_CURVA_INF").sum().show()

+-------------------+-----------+
|MARCA_MIT_CURVA_INF|sum(PESO_1)|
+-------------------+-----------+
|               null| 595.028573|
|                  X|        9.0|
+-------------------+-----------+



In [62]:
df.select("XTI_SOLUCION","YINC").groupBy("XTI_SOLUCION").max().show()

+------------+---------+
|XTI_SOLUCION|max(YINC)|
+------------+---------+
|     CASTIGO|     2016|
|       QUITA|     2016|
|     ABIERTO|     2012|
|     CERRADO|     2016|
|       VENTA|     2013|
+------------+---------+



In [63]:
df.groupBy("XTI_SOLUCION").count().show()

+------------+-----+
|XTI_SOLUCION|count|
+------------+-----+
|     CASTIGO|  254|
|       QUITA|   46|
|     ABIERTO|   24|
|     CERRADO|  231|
|       VENTA|   55|
+------------+-----+



**aggregate**

<div class="alert alert-info"> When dont use groupBy sentence, we are calculating across the all data</div>

In [64]:
df2 = spark.read.csv('./data/sales_info.csv', inferSchema=True, header=True)

In [65]:
df2.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [66]:
df2.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [67]:
df2.agg(F.sum("Sales"), F.avg("Sales")).show()

+----------+-----------------+
|sum(Sales)|       avg(Sales)|
+----------+-----------------+
|    4327.0|360.5833333333333|
+----------+-----------------+



In [68]:
df2.groupBy("Company").agg(F.sum("Sales"), F.avg("Sales")).show()

+-------+----------+-----------------+
|Company|sum(Sales)|       avg(Sales)|
+-------+----------+-----------------+
|   APPL|    1480.0|            370.0|
|   GOOG|     660.0|            220.0|
|     FB|    1220.0|            610.0|
|   MSFT|     967.0|322.3333333333333|
+-------+----------+-----------------+



In [69]:
df2.groupBy("Company").agg(F.sum("Sales").alias("suma"), F.avg("Sales").alias("promedio")).show()

+-------+------+-----------------+
|Company|  suma|         promedio|
+-------+------+-----------------+
|   APPL|1480.0|            370.0|
|   GOOG| 660.0|            220.0|
|     FB|1220.0|            610.0|
|   MSFT| 967.0|322.3333333333333|
+-------+------+-----------------+



In [70]:
df2.groupBy("Company").agg(F.sum("Sales").alias("suma"), F.avg("Sales").alias("promedio")).toPandas()

Unnamed: 0,Company,suma,promedio
0,APPL,1480.0,370.0
1,GOOG,660.0,220.0
2,FB,1220.0,610.0
3,MSFT,967.0,322.333333


In [71]:
df2.groupBy("Company").agg(F.sum("Sales").alias("suma"), F.avg("Sales").alias("promedio")).sort("promedio").show()

+-------+------+-----------------+
|Company|  suma|         promedio|
+-------+------+-----------------+
|   GOOG| 660.0|            220.0|
|   MSFT| 967.0|322.3333333333333|
|   APPL|1480.0|            370.0|
|     FB|1220.0|            610.0|
+-------+------+-----------------+



In [72]:
df2.groupBy("Company").agg(F.sum("Sales").alias("suma"), F.avg("Sales").alias("promedio")).sort(F.desc("promedio")).show()

+-------+------+-----------------+
|Company|  suma|         promedio|
+-------+------+-----------------+
|     FB|1220.0|            610.0|
|   APPL|1480.0|            370.0|
|   MSFT| 967.0|322.3333333333333|
|   GOOG| 660.0|            220.0|
+-------+------+-----------------+



Another example:

In [73]:
group_data = df2.groupBy("Company")

In [74]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



There are more functions, like:

In [75]:
from pyspark.sql.functions import countDistinct, avg, stddev

#### What happened with Null values?

### Functions

The python fuctions to has have no more than 100 lines with in, if any function pass the limit you need split it into two or more functions depends of the situation.

<img src="../images/large_functions.png" width="600">

Another important thing is to always add descriptions to the functions.

<img src="../images/desc_function.png" width="900">

<div class="alert alert-info" role="alert">Persistir middle results</div>

<div class="alert alert-info" role="alert">Be careful with Order</div>

## Best Practices Examples

### Multiple operations

**SAS** code:

```sas

/* CALCULAMOS LA DESVIACIÓN ESTANDAR PONDERADA POR PUNTO */

PROC SQL;
     CREATE TABLE SDTPXESTADO_&SEGMENTO.  AS 
     SELECT DISTINCT ESTADO, %IF &SEGMENTO. EQ EMP_NGAR %THEN %DO;  SEGM, %END;
	 		/*DESVIACION ESTANDAR*/
			%DO I=1 %TO 36;
				SQRT( SUM(PESO_&I.*(LGDCIP_&I.-LGDM_&I.)**2)
						/
						( (NOMISSING_&I.-1)*SUM(PESO_&I.)/NOMISSING_&I. )
                    ) AS SDTP_&I.,
			%END;
			/*ERROR ESTANDAR*/
			%DO I=1 %TO 36;
				CALCULATED SDTP_&I. / SQRT( SUM(PESO_&I.) ) AS EESDT_&I. ,
			%END;
			COUNT(*) AS FREQ
     FROM TEMP_VOL
	 GROUP BY %IF &SEGMENTO. EQ EMP_NGAR %THEN %DO;  SEGM, %END; ESTADO;
QUIT;

```

**Spark** code:

In [76]:
desv_est = [(F.sqrt(F.sum(F.col("PESO_"+str(x))*(F.col("LGDCIP_"+str(x)) - F.col("LGDM_"+str(x)))**2) /
             ((F.count(F.col("NOMISSING_"+str(x)))-1)*F.sum(F.col("PESO_"+str(x)))/F.count(F.col("NOMISSING_"+str(x)))))
             ).alias("SDTP_"+str(x)) for x in range(1,37)]

error_est = [((F.sqrt(F.sum(F.col("PESO_"+str(x))*(F.col("LGDCIP_"+str(x)) - F.col("LGDM_"+str(x)))**2) /
              ((F.count(F.col("NOMISSING_"+str(x)))-1)*F.sum(F.col("PESO_"+str(x)))/F.count(F.col("NOMISSING_"+str(x)))))) /
              F.sqrt(F.sum(F.col("PESO_"+str(x))))
             ).alias("EESDT_"+str(x)) for x in range(1,37)]

table_results = df \
    .groupBy("estado") \
    .agg(*desv_est,
         *error_est,
         (F.count(F.col("estado"))).alias("FREQ")
        ) \
    .orderBy(F.asc("estado"))

In [77]:
error_est

[Column<b'(SQRT((sum((PESO_1 * POWER((LGDCIP_1 - LGDM_1), 2))) / (((count(NOMISSING_1) - 1) * sum(PESO_1)) / count(NOMISSING_1)))) / SQRT(sum(PESO_1))) AS `EESDT_1`'>,
 Column<b'(SQRT((sum((PESO_2 * POWER((LGDCIP_2 - LGDM_2), 2))) / (((count(NOMISSING_2) - 1) * sum(PESO_2)) / count(NOMISSING_2)))) / SQRT(sum(PESO_2))) AS `EESDT_2`'>,
 Column<b'(SQRT((sum((PESO_3 * POWER((LGDCIP_3 - LGDM_3), 2))) / (((count(NOMISSING_3) - 1) * sum(PESO_3)) / count(NOMISSING_3)))) / SQRT(sum(PESO_3))) AS `EESDT_3`'>,
 Column<b'(SQRT((sum((PESO_4 * POWER((LGDCIP_4 - LGDM_4), 2))) / (((count(NOMISSING_4) - 1) * sum(PESO_4)) / count(NOMISSING_4)))) / SQRT(sum(PESO_4))) AS `EESDT_4`'>,
 Column<b'(SQRT((sum((PESO_5 * POWER((LGDCIP_5 - LGDM_5), 2))) / (((count(NOMISSING_5) - 1) * sum(PESO_5)) / count(NOMISSING_5)))) / SQRT(sum(PESO_5))) AS `EESDT_5`'>,
 Column<b'(SQRT((sum((PESO_6 * POWER((LGDCIP_6 - LGDM_6), 2))) / (((count(NOMISSING_6) - 1) * sum(PESO_6)) / count(NOMISSING_6)))) / SQRT(sum(PESO_6))) AS `EESD

In [78]:
table_results.select("estado","SDTP_1","SDTP_2","SDTP_3","EESDT_1","EESDT_2","EESDT_3","FREQ").toPandas()

Unnamed: 0,estado,SDTP_1,SDTP_2,SDTP_3,EESDT_1,EESDT_2,EESDT_3,FREQ
0,ABCAVE,0.234424,0.234106,0.234259,0.012865,0.012867,0.012895,333
1,OBSERVADO,0.410424,0.392338,0.369797,0.027121,0.026695,0.026019,231
2,QUITA,0.249383,0.249383,0.249383,0.038028,0.038028,0.038028,46


In [79]:
table_results.columns

['estado',
 'SDTP_1',
 'SDTP_2',
 'SDTP_3',
 'SDTP_4',
 'SDTP_5',
 'SDTP_6',
 'SDTP_7',
 'SDTP_8',
 'SDTP_9',
 'SDTP_10',
 'SDTP_11',
 'SDTP_12',
 'SDTP_13',
 'SDTP_14',
 'SDTP_15',
 'SDTP_16',
 'SDTP_17',
 'SDTP_18',
 'SDTP_19',
 'SDTP_20',
 'SDTP_21',
 'SDTP_22',
 'SDTP_23',
 'SDTP_24',
 'SDTP_25',
 'SDTP_26',
 'SDTP_27',
 'SDTP_28',
 'SDTP_29',
 'SDTP_30',
 'SDTP_31',
 'SDTP_32',
 'SDTP_33',
 'SDTP_34',
 'SDTP_35',
 'SDTP_36',
 'EESDT_1',
 'EESDT_2',
 'EESDT_3',
 'EESDT_4',
 'EESDT_5',
 'EESDT_6',
 'EESDT_7',
 'EESDT_8',
 'EESDT_9',
 'EESDT_10',
 'EESDT_11',
 'EESDT_12',
 'EESDT_13',
 'EESDT_14',
 'EESDT_15',
 'EESDT_16',
 'EESDT_17',
 'EESDT_18',
 'EESDT_19',
 'EESDT_20',
 'EESDT_21',
 'EESDT_22',
 'EESDT_23',
 'EESDT_24',
 'EESDT_25',
 'EESDT_26',
 'EESDT_27',
 'EESDT_28',
 'EESDT_29',
 'EESDT_30',
 'EESDT_31',
 'EESDT_32',
 'EESDT_33',
 'EESDT_34',
 'EESDT_35',
 'EESDT_36',
 'FREQ']

### Make contract (concat) and segment (classificate)

**SAS** code:

```sas
/**** MACRO TABLAS INSUMOS ****/
%MACRO IFRS9(CARTE,MES,APROV,LIBN,SALIDA,AA,MM);

	DATA &CARTE._&SALIDA._&MES.;
		SET &LIBN..F&APROV._MXMAEST_CONTR_V16_F4&AA.
    /*	F_CURR.IFRS9_MINORISTA_BE */
        (KEEP =  CMCO_COD_COFICI CMCO_COD_CBANCO CMCO_COD_COFICI CMCO_NUM_CUOTIMP
				 CMCO_COD_CCONTR CMCO_COD_FAMILIA 
				 CMCO_POR_PUNHER_SCO CMCO_PLZ_INCUMP_CLI CMCO_IND_STAGE_FINAL
				 CMCO_COD_HERRAM_SCO CMCO_IMP_PROV_FINAL CMCO_IMP_RACREG_CON 
				 CMCO_IMP_DBLEDU_CON CMCO_IMP_SDFUBA_CON CMCO_IND_PREAPROB
				 CMCO_IMP_EAD_ACTUAL  CMCO_IMP_LIMITE_CON CMCO_IMP_EAD_Y01 
				 CMCO_POR_PD_MARGINAL CMCO_POR_LGD CMCO_POR_CCF
				 CMCO_PLZ_INCUMP_CON CMCO_IND_REESTR_CON CMCO_POR_PD_LT_ACTUAL_ORIG CMCO_FEC_APERTURA
				 CMCO_PLZ_TRANSCUR CMCO_IND_TIPO_MORA CMCO_COD_MOTIVO_TL
				 CMCO_COD_CLAVE_PD CMCO_COD_CLAVE_LGD CMCO_COD_CLAVE_CCF
				 CMCO_IND_REESTR_CON CMCO_POR_RIESGO_Y01
				 CMCO_POR_DECAIM_EAD_Y01 CMCO_POR_TIPINT_INI CMCO_POR_EIR CMCO_IND_TIPINT &MM.
																															);


		IF CMCO_COD_FAMILIA IN (&FAMILIAS.);

		format CCONTUNI $18. CARTERA $20. ;
	/* Crea contrato */
		IF CMCO_COD_COFICI <=9 then CCONTUNI = "00"||trim(left(CMCO_COD_CBANCO))||"000"||trim(left(CMCO_COD_COFICI))||trim(left(CMCO_COD_CCONTR));
		ELSE IF CMCO_COD_COFICI <=99 then CCONTUNI = "00"||trim(left(CMCO_COD_CBANCO))||"00"||trim(left(CMCO_COD_COFICI))||trim(left(CMCO_COD_CCONTR));
		ELSE IF CMCO_COD_COFICI <=999 then CCONTUNI = "00"||trim(left(CMCO_COD_CBANCO))||"0"||trim(left(CMCO_COD_COFICI))||trim(left(CMCO_COD_CCONTR));
		ELSE IF CMCO_COD_COFICI <=9999 then CCONTUNI = "00"||trim(left(CMCO_COD_CBANCO))||trim(left(CMCO_COD_COFICI))||trim(left(CMCO_COD_CCONTR));

	/* Crea segmento */
		IF cmco_cod_familia in ('0010003' '0010004' '0010005') then CARTERA = 'TdC Banco';
		IF cmco_cod_familia in ('0010006' '0010007') then CARTERA =  "TdC Finanzia";
		IF cmco_cod_familia in ('0010008') then CARTERA =  "TdC Infinity";

		IF CMCO_COD_FAMILIA IN ('0020101' '0020102' '0020501' '0020701') THEN CARTERA = 'Personal';
		IF CMCO_COD_FAMILIA IN ('0020201' '0020202' ) THEN CARTERA = 'Nomina';
		IF CMCO_COD_FAMILIA IN ('0020901' ) THEN CARTERA = 'Nomina Pemex';
		IF CMCO_COD_FAMILIA IN ('0020301' '0020302' '0020401' '0020801') THEN CARTERA = 'Autos';
		IF CMCO_COD_FAMILIA IN ('0020601' '0020602' '0020603' '0020604' '0020605' '0020606') THEN CARTERA = 'Consumo Otros';

		CUOTAS_IMP = CMCO_NUM_CUOTIMP * 1;

	RUN; 

%MEND;
%IFRS9(&CART.,&MES2.,&APROV2.,F_CURR  ,FIN ,_F, MARCA);
```

**Spark** code:

In [80]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import pyspark.sql.functions as F
from pyspark.sql.functions import col as c
import json
import itertools
from pyspark.sql.types import StringType,FloatType, IntegerType
from pyspark.sql.window import Window
from functools import reduce

from Utils import Utils

In [81]:
#read data
all_ifrs9_df = sqlContext.read.format("csv").option("header", "true").load('data/Jul_2018.csv')
#read params
names = "data/schemaNames.json"
names_json = Utils.load_json(names)
params = "data/params.json"
params_json = Utils.load_json(params)
#choose selected columns
old_columns = list(names_json.keys())
new_columns = [names_json[i] for i in old_columns]
all_ifrs9_df = reduce(lambda all_ifrs9_df, idx: all_ifrs9_df.withColumnRenamed(old_columns[idx], new_columns[idx]), range(len(old_columns)), all_ifrs9_df)
#choose working fields
fields = params_json["working_fields"]
ifrs9_df = all_ifrs9_df.select(*fields)

In [84]:
# When family_id belongs to any of the members of "portfolios"
column_family = F.col("family_id")
meta_con = [ F.when(
        column_family.isin(y["portfolio"]),y["name"])
            for x,y in params_json["portfolios"].items()]

ifrs9_cartera_df = ifrs9_df.select(*fields,
    Utils.recursive_when(meta_con,len(meta_con)-1).alias("Cartera")
                                   )
#CCountuni
ccountuni= F.when(c('branch_id') <= 9, 
     F.concat(F.lit("00"),c("entity_id"),F.lit("000"),c('branch_id'),c('contract_number_id'))).otherwise(
     F.when(c('branch_id') <= 99,
     F.concat(F.lit("00"),c("entity_id"),F.lit("00"),c('branch_id'),c('contract_number_id'))).otherwise(
     F.when(c('branch_id') <= 999,
     F.concat(F.lit("00"),c("entity_id"),F.lit("0"),c('branch_id'),c('contract_number_id'))).otherwise(
     F.when(c('branch_id') <= 9999,
     F.concat(F.lit("00"),c("entity_id"),c('branch_id'),c('contract_number_id'))).otherwise(None
    )))).alias("ccountuni")

ifrs9_cartera_ccountuni_df = ifrs9_cartera_df.select('*',
                       ccountuni
                       )

In [85]:
ifrs9_cartera_df.select("family_id", "Cartera").limit(10).toPandas()

Unnamed: 0,family_id,Cartera
0,27,
1,27,
2,27,
3,27,
4,27,
5,27,
6,27,
7,27,
8,27,
9,27,


In [86]:
meta_con

[Column<b'CASE WHEN (family_id IN (0010003, 0010004, 0010005)) THEN TdC banco END'>,
 Column<b'CASE WHEN (family_id IN (0010006, 0010007)) THEN TdC finanzia END'>,
 Column<b'CASE WHEN (family_id IN (0010008)) THEN TdC infinity END'>,
 Column<b'CASE WHEN (family_id IN (0020101, 0020102, 0020501, 0020701)) THEN Personal END'>,
 Column<b'CASE WHEN (family_id IN (0020201, 0020202)) THEN Nomina END'>,
 Column<b'CASE WHEN (family_id IN (0020901)) THEN Nomina Pemex END'>,
 Column<b'CASE WHEN (family_id IN (0020301, 0020302, 0020401, 0020801)) THEN Autos END'>,
 Column<b'CASE WHEN (family_id IN (0020601, 0020602, 0020603, 0020604, 0020605, 0020606)) THEN Consumo Otros END'>]

In [87]:
ifrs9_cartera_ccountuni_df.select("ccountuni").limit(10).toPandas()

Unnamed: 0,ccountuni
0,50000100374
1,50000101228
2,50000101433
3,50000101436
4,50000101441
5,50000101475
6,50000101485
7,50000101499
8,50000101517
9,50000101521


### Profiles

**SAS** code:


```sas
proc transpose data=aux_tmens out=tmens_tr prefix=cohor_;
by cod_cclien;
var p_ventas;
id orden;
run;


%let na5 = 2005;  %let na6 = 2006;  %let na7 = 2007;
%let na8 = 2008;  %let na9 = 2009;  %let na10 = 2010;
%let na11 = 2011; %let na12 = 2012; %let na13 = 2013;
%let na14 = 2014; %let na15 = 2015; %let NA16 = 2016;

%let nmes1 = 01;  %let nmes2=02;    %let nmes3 = 03;
%let nmes4 = 04;  %let nmes5 = 05;  %let nmes6 = 06;
%let nmes7 = 07;  %let nmes8 = 08;  %let nmes9 = 09;
%let nmes10 = 10; %let nmes11 = 11; %let nmes12 = 12;

data tmens_tr;
format  cod_cclien
		cohor_200501 - cohor_200512
		cohor_200601 - cohor_200612
		cohor_200701 - cohor_200712
		cohor_200801 - cohor_200812
		cohor_200901 - cohor_200912
		cohor_201001 - cohor_201012
		cohor_201101 - cohor_201112
		cohor_201201 - cohor_201212
		cohor_201301 - cohor_201312
		cohor_201401 - cohor_201412
		cohor_201501 - cohor_201512
		cohor_201601 - cohor_201612
;


set tmens_tr;


RUN;


%MACRO PERFIL;

DATA AUX;
SET TMENS_TR;

FORMAT 	%DO i= 5 %to 16;
			%DO J=1 %TO 12;
				COHOR_&&NA&I.&&NMES&J $15.
			%END;
		%END;
;

	%DO i= 5 %to 15;
		%LET K=%EVAL(&i.+1);
		
		%DO J=1 %TO 12;
			%IF &i. Le 15 %THEN %DO;
			IF COHOR_&&NA&K&&NMES&j EQ ""  THEN DO; 
												COHOR_&&NA&K&&NMES&j = COHOR_&&NA&I.&&NMES12;
												END;
								%END;
		%END;
	%END;

	%LET i=5;

	%DO J=1 %TO 12;
			%IF &i. Le 15 %THEN %DO;
			IF COHOR_&&NA&i&&NMES&j EQ ""  THEN DO; 
												COHOR_&&NA&i&&NMES&j = COHOR_&&NA&I.&&NMES12;
												END;
								%END;
	%END;

RUN;


data AUX_perfiles;

FORMAT cod_cclien	%DO i= 5 %to 16;
			%DO J=1 %TO 12;
				COHOR_&&NA&I.&&NMES&J $15.
			%END;
		%END;
;
SET AUX;
run;

%MEND;

%PERFIL;

proc transpose data=aux_perfiles out= perf_tmens prefix=perfil_;
by cod_cclien;
var cohor_200501 - cohor_200512
	cohor_200601 - cohor_200612
	cohor_200701 - cohor_200712
	cohor_200801 - cohor_200812
	cohor_200901 - cohor_200912
	cohor_201001 - cohor_201012
	cohor_201101 - cohor_201112
	cohor_201201 - cohor_201212
	cohor_201301 - cohor_201312
	cohor_201401 - cohor_201412
	cohor_201501 - cohor_201512
	
;
run;

```

**Spark** code:

In [88]:
aux_tmens_df=spark.read.parquet("data/AUX_TMENS_DF.parquet")

In [89]:
from pyspark.sql.types import *

In [90]:
aux_tmens_df.filter("CLIENTE='02418800'").select(
    "CLIENTE",
    "ORDEN",
    "P_VENTAS"
).show()

+--------+------+-----------+
| CLIENTE| ORDEN|   P_VENTAS|
+--------+------+-----------+
|02418800|200712|CORPORATIVO|
|02418800|200512|CORPORATIVO|
|02418800|200812|CORPORATIVO|
|02418800|200612|CORPORATIVO|
+--------+------+-----------+



In [91]:
def fn_fill_years(l_orden, l_perfil):
    l_years=[200512, 200612, 200712, 200812, 200912, 201012, 201112, 201212, 201312, 201412, 201512]
    l_tmp=[]
    i=0
##Completar los 11 años por cliente
    for x in l_years:
        if any(x==j for j in l_orden):
            i+=1    
            l_tmp.append(str(x)+"|"+l_perfil[i-1])
        else:
            l_tmp.append(str(x)+"|")
    return l_tmp

In [92]:
def fn_fill_months_year(perfil_act,perfil_sig,anio,inicio,l_final):
    if inicio:
        for x in range(1,13):
#            if (x == 12) & (perfil_act != ""):
#                l_final.append(str((anio+2005)*100+x)+"|"+perfil_act)
            if perfil_act != "":
                l_final.append(str((anio+2005)*100+x)+"|"+perfil_act)
            else:
                l_final.append(str((anio+2005)*100+x)+"|")
    else:
        for x in range(1,13):
            if (x == 12) & (perfil_sig != ""):
                l_final.append(str((anio+2005)*100+x)+"|"+perfil_sig)
            else:
                l_final.append(str((anio+2005)*100+x)+"|"+perfil_act)           

In [93]:
def fn_fill_profile(l_order, l_profile):
    l_tmp_orden=fn_fill_years(l_order, l_profile)
    actual=None
    siguiente=None
    num_orden=len(l_tmp_orden)
    inicio=True
    l_final=[]
    inicio=True
    for i in range(0,num_orden-1):
        actual=l_tmp_orden[i].split("|")
        perfil_act=actual[1]
        orden_act=actual[0]
    
        siguiente=l_tmp_orden[i+1].split("|")
        perfil_sig=siguiente[1]
        orden_sig=siguiente[0]
        if inicio:        
            fn_fill_months_year(perfil_act,perfil_sig,i,inicio,l_final)        
            inicio=False
        if perfil_sig == "":
            l_tmp_orden[i+1]=orden_sig+"|"+perfil_act
        fn_fill_months_year(perfil_act,perfil_sig,i+1,inicio,l_final)
    return l_final
udf_fn_fill_profile=F.udf(lambda l_order,l_profile: fn_fill_profile(l_order,l_profile), ArrayType(StringType()))

In [94]:
profiles_df=aux_tmens_df.orderBy("CLIENTE","ORDEN").groupBy("CLIENTE").agg(
    F.collect_list("ORDEN").alias("ORDEN"),F.collect_list("P_VENTAS").alias("P_VENTAS")
)

In [None]:
profiles_df.limit(10).show()

In [97]:
def fn_split_by(column,char_split,index):
    return column.split(char_split)[index]
udf_fn_split_by=F.udf(fn_split_by,StringType())

In [98]:
profiles_df=profiles_df.select(
    "CLIENTE",F.explode(udf_fn_fill_profile(F.col("ORDEN"),F.col("P_VENTAS"))).alias("X")
).select(
    F.col("CLIENTE"),
    udf_fn_split_by(F.col("X"),F.lit("|"),F.lit(0)).alias("ORDEN"),
    udf_fn_split_by(F.col("X"),F.lit("|"),F.lit(1)).alias("P_VENTAS")
).select(
    F.col("CLIENTE"),
    F.col("ORDEN"),
    F.when(F.col("P_VENTAS")=='',None).otherwise(F.col("P_VENTAS")).alias("P_VENTAS")
)

In [None]:
profiles_df.filter("CLIENTE='02418800'").limit(50).toPandas()

### Curva Media Ponderada

**SAS** code:

```sas
/*GENERAMOS LA CURVAS MEDIAS PONDERADAS*/
%MACRO CURVA_MEDPOND (BIN, BOUT, ETIQUETA, CONDICION);
PROC SUMMARY DATA=&BIN. ORDER=INTERNAL NWAY MISSING;
WHERE &CONDICION.;
OUTPUT  OUT=SUMA_&BOUT.
%DO I=1 %TO 84;
SUM(PESO_&I.)=
SUM(&ETIQUETA._&I.)=
%END;
;
RUN;

/*GENERAMOS LA CURVA MEDIA PONDERADA POR PESO OUTLIER*/
DATA  &BOUT. (DROP=PESO_:);
       SET SUMA_&BOUT.;
	   ARRAY P(84) PESO_1 - PESO_84;
	   ARRAY LGD(84) &ETIQUETA._1 - &ETIQUETA._84;
	   	DO I=1 TO 84;
 			IF P(I) > 0 THEN LGD(I)=LGD(I)/P(I);
			ELSE LGD(I)=.;
		END;
			DO I=2 TO 84;
				
			
				IF LGD(I) < LGD(I-1) AND I <= ULTMES_INF THEN DO;
					LGD(I)=MAX( LGD(I-1), LGD(I) );
					CORR_TENDENCIA='S';
				END;
				IF LGD(I) > 1.5 THEN DO;
					LGD(I)=1.5;
					ACOT_TENDENCIA='S';
				END;
				IF . < LGD(I) < -2 THEN DO;
					LGD(I)=-2;
					ACOT_TENDENCIA='I';
				END;
		END;		
RUN;
%MEND;

```

**Spark** code:

In [100]:
    def fn_get_columns(tag, start, end):
        return [tag+str(x) for x in range(start,end+1)]

In [101]:
    def fn_extract_columns(root_column, column_name, start, end):
        l_columns=[]
        for i in range(start,end+1):
            l_columns.append(F.col(root_column)[column_name + str(i)].alias(column_name + str(i)))
        return l_columns

    def fn_curva_medpond_part_1(l_lgd, l_peso):#, last_month_inf):
        l_lgd_tmp=[]
        corr_tendencia=None
        acot_tendencia=None
        for i in range(0,84):
            #Le haces una copia a la lista de lgd
            l_lgd_tmp.append(l_lgd[i])
            
            if Utils.fn_is_null(l_peso[i],0) > 0:
                l_lgd_tmp[i]=Utils.fn_safe_div(l_lgd_tmp[i],l_peso[i])
            else:
                l_lgd_tmp[i]=None
            if i > 0:
                if (Utils.fn_is_null(l_lgd_tmp[i],0) < Utils.fn_is_null(l_lgd_tmp[i-1],0)) :#& (i < last_month_inf) :
                    l_lgd_tmp[i]=Utils.fn_max(l_lgd_tmp[i-1],l_lgd_tmp[i])
                    corr_tendencia="S"
                if Utils.fn_is_null(l_lgd_tmp[i],0) > 1.5:
                    l_lgd_tmp[i]=1.5
                    acot_tendencia="S"
                if (l_lgd_tmp[i] is not None) & (Utils.fn_is_null(l_lgd_tmp[i],0) < -2):
                    l_lgd_tmp[i]=-2
                    acot_tendencia="I"
        l_lgd_tmp.append(corr_tendencia)
        l_lgd_tmp.append(acot_tendencia)
        return l_lgd_tmp
    
    def fn_curva_medpond(df_source, tag, condition):
        #l_columns=[x for x in df_source.columns if re.search("(PESO|"+tag+")\_[0-9]{1,2}",x) is None]
        l_cols_peso=[F.sum("PESO"+"_"+str(x)).alias("PESO"+"_"+str(x)) for x in range(1,85)]
        l_cols_lgd=[F.sum(tag+"_"+str(x)).alias(tag+"_"+str(x)) for x in range(1,85)]
        
        df_source_sum=df_source.filter(condition).agg(
            *l_cols_peso,
            *l_cols_lgd
            )
        
        l_lgd=fn_get_columns(tag + "_", 1, 84)
        l_peso=fn_get_columns("PESO_", 1, 84)
        
        l_fields=[StructField(x,DoubleType(),True) for x in l_lgd]
        l_fields.append(StructField("CORR_TENDENCIA",DoubleType(),True))
        l_fields.append(StructField("ACOT_TENDENCIA",DoubleType(),True))
        schema_lgd=StructType(l_fields)
        
        udf_fn_curva_medpond_part_1=F.udf(fn_curva_medpond_part_1,schema_lgd)
        
        return df_source_sum.select(
            udf_fn_curva_medpond_part_1(F.struct(l_lgd),F.struct(l_peso)#, F.col("ULTMES_INF")
                                ).alias("LGDS")
        ).select(
            F.col("LGDS")["CORR_TENDENCIA"].alias("CORR_TENDENCIA"),
            F.col("LGDS")["ACOT_TENDENCIA"].alias("ACOT_TENDENCIA"),
            *fn_extract_columns("LGDS", tag + "_", 1, 84)
        )

In [102]:
curvas_x_ciclo_df=sqlContext.read.parquet("data/CURVAS_X_CICLO_DF.parquet")

In [103]:
c_when= (F.col("PERFIL_VENTAS") == F.lit("EMPRESARIAL")) & F.col("GAR_ELEGIBLE").isNull()
curva_emp_nogar_antinf_df=fn_curva_medpond(curvas_x_ciclo_df,"LGDCP",c_when).select("*",F.lit("EMP_NOGAR").alias("SEGMENTO"))
#SAS
#1

In [104]:
curva_emp_nogar_antinf_df.toPandas()

Unnamed: 0,CORR_TENDENCIA,ACOT_TENDENCIA,LGDCP_1,LGDCP_2,LGDCP_3,LGDCP_4,LGDCP_5,LGDCP_6,LGDCP_7,LGDCP_8,...,LGDCP_76,LGDCP_77,LGDCP_78,LGDCP_79,LGDCP_80,LGDCP_81,LGDCP_82,LGDCP_83,LGDCP_84,SEGMENTO
0,,,0.593868,0.611014,0.65748,0.702431,0.727555,0.73675,0.7484,0.753374,...,0.985943,0.985943,0.985943,0.985943,0.985943,0.985943,0.985943,0.985943,0.985981,EMP_NOGAR


### Scope

In [105]:
from datetime import datetime as dt

In [106]:
birthday = dt.strptime("09-10-1987","%d-%m-%Y")

In [107]:
#globals()

In [108]:
globals()["birthday"]

datetime.datetime(1987, 10, 9, 0, 0)

In [109]:
print("My birthday is: "+dt.strftime(globals()["birthday"],"%d-%m-%Y"))

My birthday is: 09-10-1987


In [110]:
#Local scope
def add(x, y):
    result=x+y
    del(x)
    del(y)
    d_local=locals()
    return d_local

#call to function
d_result=add(5,10)
d_result

{'result': 15}

In [111]:
#Scope global
def subtract(x,y):
    global result
    result=x-y

#call to function
subtract(15,5)
print(result)

10


### Create a Data Frame from a LIST

In [112]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [113]:
l_peoples=[
    ["cesar", "castañeda reyes", 31.0],
    ["juana", "reyes vazquez", 61.0],
    ["cesar", "castañeda vazquez", 0.5]
]

l_peoples_rdd=sc.parallelize(l_peoples)

schema_peoples=StructType([
    StructField("name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", DoubleType(), True)
])

In [114]:
peoples_df=spark.createDataFrame(l_peoples_rdd, schema_peoples)
#peoples_df=sqlContext.createDataFrame(l_peoples, schema_peoples)
peoples_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: double (nullable = true)



In [115]:
peoples_df.show()

+-----+-----------------+----+
| name|        last_name| age|
+-----+-----------------+----+
|cesar|  castañeda reyes|31.0|
|juana|    reyes vazquez|61.0|
|cesar|castañeda vazquez| 0.5|
+-----+-----------------+----+



<div style="background-color:white; text-align:center; padding:10px; color:black; margin-left:0px; border-radius: 10px; font-family:Trebuchet MS; font-size:45px">
<strong>Gracias!</strong>
</div>

<div style="background-color:white; text-align:center; padding:30px; color:black; margin-left:0px; border-radius: 10px; font-family:Trebuchet MS; font-size:90px">
<strong>FIN</strong>
</div>