<a href="https://colab.research.google.com/github/iGhostlp/Albus/blob/Hermione/Proyecto_BBVA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Armado del entorno

In [1]:
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

In [2]:
# Unzip the file
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [3]:
!readlink -f $(which java) | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


In [4]:
# Set up the environment for Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64/"
os.environ["SPARK_HOME"] = '/content/spark-3.3.2-bin-hadoop3'

In [5]:
# Install library for finding Spark
!pip install -q findspark

# Import the libary
import findspark

# Initiate findspark
findspark.init()

In [6]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").config('spark.sql.parquet.datetimeRebaseModeInRead','CORRECTED').getOrCreate()

# Check Spark Session Information
spark

# Importado de funciones

In [49]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, concat_ws, col, row_number, desc, collect_list, to_json, struct, year, current_date, datediff, floor, when, lit
from pyspark.sql.window import Window

In [8]:
#Creación de un SQL context
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)



# Carga de parquets

In [9]:
df_customer = spark.read.parquet('/content/Datasets/customer_basics.snappy.parquet')
df_phones = spark.read.parquet('/content/Datasets/phones.snappy.parquet')
df_address = spark.read.parquet('/content/Datasets/address.snappy.parquet')
df_emails = spark.read.parquet('/content/Datasets/emails.snappy.parquet')
df_marital_status = spark.read.parquet('/content/Datasets/marital_status_type.parquet')
df_segment_type = spark.read.parquet('/content/Datasets/segment_type.parquet')
df_customer_documents = spark.read.parquet('/content/Datasets/customer_documents.parquet')
df_address_type = spark.read.parquet('/content/Datasets/address_type.parquet')
df_gender = spark.read.parquet('/content/Datasets/gender.parquet')
df_nationality = spark.read.parquet('/content/Datasets/nationality.parquet')
df_personal_type = spark.read.parquet('/content/Datasets/personal_type.parquet')
df_phone_type = spark.read.parquet('/content/Datasets/phone_type.parquet')
df_province = spark.read.parquet('/content/Datasets/province.parquet')
df_customer_segment = spark.read.parquet('/content/Datasets/customer_segment.parquet')
df_customer_info_temp = spark.read.parquet('/content/Datasets/customer_info_temp.parquet')

### Normalización de datos

In [10]:
df_customer = df_customer.withColumnRenamed('last_change_date', 'l_c_d_customer')

In [None]:
df_marital_status_ok = df_marital_status.withColumn("marital_status_type", 
                                             when(df_marital_status.martial_status_short_desc == "NO INFORMA", 0)
                                             .when(df_marital_status.martial_status_short_desc == "CASADO/A  ", 1)
                                             .when(df_marital_status.martial_status_short_desc == "VIUDO/A   ", 2)
                                             .when(df_marital_status.martial_status_short_desc == "SEPARADO/A", 3)
                                             .when(df_marital_status.martial_status_short_desc == "DIVORCIADO", 4)
                                             .when(df_marital_status.martial_status_short_desc == "SOLTERO/A ", 5)
                                             .when(df_marital_status.martial_status_short_desc == "CONVIVIENT", 6)
                                             .when(df_marital_status.martial_status_short_desc == "OTROS     ", 7)
                                             .when(df_marital_status.martial_status_short_desc == "CONV. INSC", 8)
                                             .otherwise("-"))

# Extraccion de datos



### - 24 - Extraccion de datos desde parquet, clientes y teléfonos.
####Crear un DataFrame que contenga el JOIN de la tabla t_abtq_customer_basics y tabla t_abtq_customer_phones.

In [12]:
df_customer_phones = df_phones.join(df_customer, 'customer_id')

In [None]:
df_customer_phones.show()

### - 25 - Extraccion de datos desde .parquet, clientes y direcciones
####Crear un DataFrame que contenga el JOIN de la tabla t_abtq_customer_basics y tabla t_abtq_customer_adress.

In [14]:
df_customer_address = df_address.join(df_customer, 'customer_id')

In [None]:
df_customer_address.show()

### - 26 - Extraccion de datos desde .parquet, clientes y correos electrónicos
#### Crear un DataFrame que contenga el JOIN de la tabla t_abtq_customer_basics y tabla t_abtq_customer_email.   

In [16]:
df_customer_emails = df_emails.join(df_customer, 'customer_id')

In [None]:
df_customer_emails.show()

# Filtrar datos, reducir volumen

####- 27 - Filtrar el DataFrame de contactos telefónicos de clientes y resguardar los 3 contactos más actuales por cliente.

In [19]:
df_customer_phones_sorted = df_customer_phones.orderBy([df_customer_phones.customer_id, desc('last_change_date')])

In [20]:
window = Window.partitionBy(df_customer_phones_sorted.customer_id).orderBy(desc(df_customer_phones_sorted.last_change_date))

In [21]:
df_phone_contact = df_customer_phones_sorted.withColumn('row_num', row_number().over(window))
df_phone_contact = df_phone_contact.filter(df_phone_contact.row_num <= 3)

In [22]:
df_phone_contact = df_phone_contact.withColumn("full_phone", concat_ws("-", 'prefix_phone_id', 'phone_area_id', 'phone_exchange_id', 'phone_line_id'))

In [23]:
df_pivot_pc = df_phone_contact.groupBy('customer_id').agg(collect_list('full_phone').alias('last_3_changes_list'))

In [24]:
df_pivot_phone = df_pivot_pc.selectExpr('customer_id', 'last_3_changes_list[0] as phone_1', 'last_3_changes_list[1] as phone_2', 'last_3_changes_list[2] as phone_3')

In [None]:
df_pivot_phone = df_pivot_phone.na.fill('---')
df_pivot_phone.show()

####- 28 - Filtrar el DataFrame de direcciones de clientes y resguardar los 3 contactos más actuales por cliente.

In [26]:
df_customer_address_sorted = df_customer_address.orderBy([df_customer_address.customer_id, desc('last_change_date')])

In [27]:
window = Window.partitionBy(df_customer_address_sorted.customer_id).orderBy(desc(df_customer_address_sorted.last_change_date))

In [28]:
df_address_contact = df_customer_address_sorted.withColumn('row_num', row_number().over(window))
df_address_contact = df_address_contact.filter(df_address_contact.row_num <= 3)

In [29]:
df_address_contact = df_address_contact.withColumn("full_address", concat('street_name', 'address_outdoor_id', 'address_indoor_id', 'indoor_number', 'address_department_name', 'province_id', 'zipcode_id'))

In [30]:
df_pivot_ad = df_address_contact.groupBy('customer_id').agg(collect_list('full_address').alias('last_3_changes_list'))

In [31]:
df_pivot_address = df_pivot_ad.selectExpr('customer_id', 'last_3_changes_list[0] as address_1', 'last_3_changes_list[1] as address_2', 'last_3_changes_list[2] as address_3')

In [None]:
df_pivot_address = df_pivot_address.na.fill('---')
df_pivot_address.toPandas()

####- 29 - Filtrar el DataFrame de correos electrónicos de clientes  y resguardar los 3 contactos más actuales por cliente. 

In [35]:
df_customer_emails_sorted = df_customer_emails.orderBy([df_customer_emails.customer_id, desc('last_change_date')])

In [36]:
window = Window.partitionBy(df_customer_emails_sorted.customer_id).orderBy(desc(df_customer_emails_sorted.last_change_date))

In [38]:
df_email_contact = df_customer_emails_sorted.withColumn('row_num', row_number().over(window))
df_email_contact = df_email_contact.filter(df_email_contact.row_num <= 3)

In [41]:
df_pivot_ec = df_email_contact.groupBy('customer_id').agg(collect_list('email_desc').alias('last_3_changes_list'))

In [42]:
df_pivot_email = df_pivot_ec.selectExpr('customer_id', 'last_3_changes_list[0] as email_1', 'last_3_changes_list[1] as email_2', 'last_3_changes_list[2] as email_3')

In [None]:
df_pivot_email = df_pivot_email.na.fill('---')
df_pivot_email.toPandas()

# Enrequecimiento de datos

####- 30 - Agregar una nueva columna a los DataFrame de contactos, indicando el contact_type según corresponda (address, email, phone)

In [None]:
df_phones_contact_col = df_phones.withColumn('contact_type_phones', lit('phone'))
df_phones_contact_col.select('customer_id','contact_type_phones').show()

In [None]:
df_emails_contact_col = df_emails.withColumn('contact_type_emails', lit('e-mail'))
df_emails_contact_col.select('customer_id','contact_type_emails').show()

In [None]:
df_address_contact_col = df_address.withColumn('contact_type_address', lit('address'))
df_address_contact_col.select('customer_id','contact_type_address').show()

####- 31 - Agregar una nueva columna al DataFrame de contactos telefónicos de clientes, resguardando el contacto en formato json contenido en string, con los datos: Phone_type (mobile, landline ), Código país., Código de Área, Número teléfono.

In [45]:
df_phone_contact = df_phone_contact.withColumn("phone_contact", to_json(struct(df_phone_contact.phone_type, df_phone_contact.phone_country_id, df_phone_contact.prefix_phone_id, df_phone_contact.phone_area_id, df_phone_contact.cellphone_prefix_id, df_phone_contact.phone_exchange_id, df_phone_contact.phone_line_id)))

In [None]:
df_phone_contact.show(truncate=False)

####- 32 - Agregar una nueva columna al DataFrame de direcciones de clientes, resguardando el contacto en formato json contenido en string, con los datos: Calle, Número, Piso, Depto, Localidad, Provincia, Código postal

####- 33 - Combinar los DataFrame de contactos telefónicos de clientes, direcciones de clientes y email de clientes en uno solo.

In [84]:
df_contacts = df_pivot_phone.join(df_pivot_email, "customer_id", how='full').join(df_pivot_address, "customer_id", how='full')

In [None]:
df_contacts.toPandas()

In [None]:
df_short_contacts = df_contacts.select('customer_id','Phone_1','Email_1','Address_1')
df_short_contacts.toPandas()

----

In [74]:
df_contact_types = df_phones_contact_col.join(df_address_contact_col, 'customer_id', how='full').join(df_emails_contact_col, 'customer_id', how='full')

In [None]:
df_contact_types.select('customer_id','contact_type_phones','contact_type_address','contact_type_emails').toPandas()

# Creacion de vistas temporales

####- 35 - Generar una vista temporal a partir del DataFrame de contactos.

In [88]:
df_short_contacts.createTempView('tw_contacts')

####- 36 - Generar una vista temporal a partir del archivo t_abtq_customer_basics.

In [90]:
df_customer.createOrReplaceTempView('tw_customer_basics')

####- 37 - Generar una vista temporal a partir del archivo t_acog_marital_status_type.

In [148]:
df_marital_status_ok.createOrReplaceTempView('tw_marital_status')

####- 38 - Generar una vista temporal a partir del archivo t_acog_nationality.

In [94]:
df_nationality.createOrReplaceTempView("tw_nationality")

# Querys en spark SQL.

####- 39 - Generar un público objetivo (1) que cumpla los siguientes puntos:
* Cliente
* Antigüedad superior a 5 años
* Asalariado Fijo.
* Estado civil: Casado


In [157]:
publico_obj1 = sqlContext.sql('SELECT cb.customer_id, floor(datediff(current_date(), admission_date)/365) as years_diff, customer_condition_type, job_type, ms.martial_status_short_desc FROM tw_customer_basics as cb INNER JOIN tw_marital_status as ms ON cb.marital_status_type = ms.marital_status_type WHERE floor(datediff(current_date(), admission_date)/365) > 5 AND cb.marital_status_type = 01 AND cb.job_type = 001 AND cb.customer_condition_type = 1')

In [158]:
publico_obj1.toPandas()

Unnamed: 0,customer_id,years_diff,customer_condition_type,job_type,martial_status_short_desc
0,07867437,15,1,001,CASADO/A
1,06813266,17,1,001,CASADO/A
2,02746969,12,1,001,CASADO/A
3,03933825,11,1,001,CASADO/A
4,02568856,12,1,001,CASADO/A
...,...,...,...,...,...
245,19821561,10,1,001,CASADO/A
246,07772581,15,1,001,CASADO/A
247,07162212,16,1,001,CASADO/A
248,07788948,15,1,001,CASADO/A


####- 40 -  Generar un público objetivo (2) que cumpla los siguientes puntos:
* Potencial Cliente
* Sexo Femenino
* Entre 30 y 45 años de Edad.
* Nacionalidad No Argentina

In [162]:
publico_obj2 = sqlContext.sql('SELECT cb.customer_id, customer_condition_type, gender_type, floor(datediff(current_date(), birth_date)/365) as customer_age, n.country_name FROM tw_customer_basics AS cb INNER JOIN tw_nationality AS n ON cb.country_nationality_id == n.country_nationality_id WHERE floor(datediff(current_date(), birth_date)/365) > 30 AND floor(datediff(current_date(), birth_date)/365) < 45 AND gender_type = "F" AND customer_condition_type = 1 AND n.country_nationality_id <> 80')
publico_obj2.toPandas()

Unnamed: 0,customer_id,customer_condition_type,gender_type,customer_age,country_name
0,8576564,1,F,44,PARAGUAY
1,22184167,1,F,33,CHINA


####- 41 - Generar un público objetivo (3) que cumpla los siguientes puntos:
* Potencial Cliente
* Sexo Masculino.
* Mayor a 25 años.
* Estado civil Soltero

In [165]:
publico_obj3 = sqlContext.sql('SELECT cb.customer_id, customer_condition_type, gender_type, floor(datediff(current_date(), birth_date)/365) as customer_age, ms.martial_status_short_desc FROM tw_customer_basics AS cb INNER JOIN tw_marital_status as ms ON cb.marital_status_type = ms.marital_status_type WHERE customer_condition_type = 1 AND floor(datediff(current_date(), birth_date)/365) > 25 AND gender_type = "M" AND cb.marital_status_type = 5')
publico_obj3.toPandas()

Unnamed: 0,customer_id,customer_condition_type,gender_type,customer_age,martial_status_short_desc
0,29604216,1,M,53,SOLTERO/A
1,28366580,1,M,26,SOLTERO/A
2,23441482,1,M,27,SOLTERO/A
3,25734307,1,M,27,SOLTERO/A
4,03342631,1,M,57,SOLTERO/A
...,...,...,...,...,...
294,29422473,1,M,66,SOLTERO/A
295,00468968,1,M,90,SOLTERO/A
296,27283661,1,M,27,SOLTERO/A
297,08546725,1,M,42,SOLTERO/A


####- 42 - Generar un público objetivo (4) que cumpla los siguientes puntos:
* Cliente
* Edad superior a 52 años
* Nacionalidad Argentina
* Estado civil Viudo

In [169]:
publico_obj4 = sqlContext.sql('SELECT cb.customer_id, floor(datediff(current_date(), birth_date)/365) as customer_age, n.country_name, ms.martial_status_short_desc FROM tw_customer_basics as cb INNER JOIN tw_nationality AS n ON cb.country_nationality_id == n.country_nationality_id INNER JOIN tw_marital_status as ms ON cb.marital_status_type = ms.marital_status_type WHERE floor(datediff(current_date(), birth_date)/365) > 52 AND cb.country_nationality_id = 80 AND cb.marital_status_type = 2')
publico_obj4.toPandas()

Unnamed: 0,customer_id,customer_age,country_name,martial_status_short_desc
0,4161701,87,ARGENTINA,VIUDO/A
1,29561692,57,ARGENTINA,VIUDO/A
2,29632753,57,ARGENTINA,VIUDO/A
3,4566883,96,ARGENTINA,VIUDO/A
4,93940,101,ARGENTINA,VIUDO/A
5,29580724,73,ARGENTINA,VIUDO/A
6,104838,95,ARGENTINA,VIUDO/A
7,14664,102,ARGENTINA,VIUDO/A
8,29434917,102,ARGENTINA,VIUDO/A
9,4076446,104,ARGENTINA,VIUDO/A


# Carga de datos, generacion de archivos .csv
#### Los archivos deben cumplir los siguientes requerimientos: 
* Contener cabecera. 
* Separador “|”. 
* Máximo de 1000 registros por archivo (si el público objetivo tiene más de 1000 registros, se deberá crear más de un archivo)
* Ser guardados en HDFS.

#### - 43 -  Generar archivos .csv a partir de los público objetivo 1. 


#### - 44 - Generar archivos .csv a partir del público objetivo 2. 

####- 45 - Generar archivos .csv a partir del público objetivo 3. 

####- 46 - Generar archivos .csv a partir del público objetivo 4. 

# Extraccion de datos desde .csv, públicos objetivo

#### - 47 - Cargar los datos de los publicos generados en un DataFrame.

# Calculo de agregaciones.

####- 48 -  Contar al cantidad de personas contactada por publico objetivo.

####- 49 - Contar la cantidad de direcciones, mails y telefonos por publico objetivo.

####- 50 - Contar la cantidad de hombres y mujeres por publico objetivo.

#### - 51 - Calcular la edad promedio por publico objetivo.

# Carga de datos, generacion de archivos .parquet

####- 52 - Guardar las agregaciones generadas en un archivo parquet, particionado por mes de campaña y publico objetivo.

####- 53 - Guardar las agregaciones generadas en un archivo parquet, particionado por mes de campaña y publico objetivo.

####- 54 - Guardar las agregaciones generadas en un archivo parquet, particionado por mes de campaña y publico objetivo.

####- 55 - Guardar las agregaciones generadas en un archivo parquet, particionado por mes de campaña y publico objetivo.

# Borrador

In [159]:
df_customer.show() # admission_date marital_status_type gender_type birth_date job_type profession_id
#df_phones.toPandas()
#df_address.toPandas()
#df_emails.toPandas()
#df_marital_status.toPandas() # marital_status_type martial_status_short_desc
#df_segment_type.toPandas() 
#df_customer_documents.toPandas() 
#df_address_type.toPandas() # address_type address_type_short_desc ??
#df_gender.toPandas() # gender_type gender_desc
#df_nationality.toPandas() # country_nationality_id # country_nationality_id
#df_personal_type.toPandas()
#df_phone_type.toPandas() # phone_type	phone_type_desc	 ??
#df_province.toPandas()
#df_customer_segment.show() 
#df_customer_info_temp.show()

# VER DE NO IMPORTAR PARQUETS QUE NO USEMOS

+-----------+-------------+-------------+--------------+---------------+--------------------+---------+----------------+-----------+-----------------------+-----------------------+----------+-------------------+-----------------+--------------------------+------------+----------------+-------------------+-------------------+-----------+----------+----------------------+--------------------+---------------------+-----------+--------------------------+----------------------------+-------------------------------+------------------------+------------------------+--------------------------+------------------------+--------------------------+----------------------------+------------------+------------------------+-------------------+-------------------+---------------------------+-----------------+-------------------+-----------------+-------------------------------+--------------------+---------------------+------------------+----------------+--------------------+--------------------------+

Unnamed: 0,country_nationality_id,country_group_branch_type,phone_1_prefix_id,country_name,country_short_desc,country_id,operational_load_date
0,1,,44,REINO UNIDO,GBR,GB,2021-11-22 16:07:38.994
1,2,,1,ESTADOS UNIDOS,USA,US,2021-11-22 16:07:38.994
2,3,,49,ALEMANIA,DEU,DE,2021-11-22 16:07:38.994
3,4,,33,FRANCIA,FRA,FR,2021-11-22 16:07:38.994
4,5,,41,SUIZA,CHE,CH,2021-11-22 16:07:38.994
...,...,...,...,...,...,...,...
256,348,,,UNITED STATES MINOR OUTLAYING ISLAN,,UM,2021-11-22 16:07:38.994
257,349,,683,NIUE,NIU,NU,2021-11-22 16:07:38.994
258,350,,,KOSOVO,KOS,XK,2021-11-22 16:07:38.994
259,351,,420,REPUBLICA CHECA,CZE,CZ,2021-11-22 16:07:38.994
