In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.sql.window import Window

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Nombre archivos

In [3]:
dir_archivo = '/home/tonatiuh/Documents/Desarrollo/ZophiaLearning/ejercicios/'
dir_complemento = 'prestamos/curated/'

### Funciones

In [4]:
def df_almacenamiento_parquet(dir_archivo, nombre_archivo, df):
    nombre_destino = f'prestamos/curated/{nombre_archivo}'
    df.write.mode('overwrite').parquet(dir_archivo+nombre_destino)
    print(nombre_destino)

    
def df_almacenamiento_csv(nombre_archivo, df):
    df_filtrado = df.limit(10)
    df_pandas = df_filtrado.toPandas()
    nombre_output = nombre_archivo.replace('.parquet', '')
    nombre_csv = f'output/{nombre_output}.csv'
    df_pandas.to_csv(nombre_csv, index=False)
    print(nombre_csv)


def df_almacenamiento(dir_archivo, nombre_archivo, df):
    df_almacenamiento_parquet(dir_archivo, nombre_archivo, df)
    df_almacenamiento_csv(nombre_archivo, df)

### Tabla prestamos_destinatarios

In [5]:
nombre_archivo = 'prestamos_destinatarios.parquet'
df_destinatarios = spark.read.format('parquet')\
                        .load(dir_archivo+dir_complemento+nombre_archivo)


df_destinatarios = df_destinatarios.select(
    F.col('ID').alias('ID_SOLICITANTE'),
    F.col('NOMBRE').alias('NOMBRE_SOLICITANTE'),
    'DIRECCION',
    'CP',
    'FECHA_NACIMIENTO',
    'CORREO',
    F.col('BANCO').alias('ID_BANCO'),
    F.col('PLAZA').alias('ID_PLAZA'),
    'CLABE')


#NOMBRE_SOLICITANTE
df_destinatarios = df_destinatarios.withColumn('nombre_split', F.split(F.col('NOMBRE_SOLICITANTE'), ' '))
df_destinatarios = df_destinatarios.withColumn(
    'nombre_indicador',
    F.when(F.col('nombre_split').getItem(0) == 'Ing.',   2)
     .when(F.col('nombre_split').getItem(0) == 'Dr.',    2)
     .when(F.col('nombre_split').getItem(0) == 'Sr(a).', 2)
     .when(F.col('nombre_split').getItem(0) == 'Lic.',   2)
     .when(F.col('nombre_split').getItem(0) == 'Mtro.',  2)
    .otherwise(1))
df_destinatarios = df_destinatarios.withColumn('nombre_elementos', F.size(F.col('nombre_split')))
df_destinatarios = df_destinatarios.withColumn(
    'nombre_procesado',
    F.when(F.col('nombre_indicador') == 2,
           F.concat_ws(' ',
                       F.expr("slice(nombre_split, nombre_indicador, nombre_elementos)")))
    .otherwise(F.col('NOMBRE_SOLICITANTE')))

# df_destinatarios = df_destinatarios.withColumn('nombre_split_drop', F.expr("slice(nombre_split, nombre_indicador, nombre_elementos)"))
# df_destinatarios = df_destinatarios.withColumn('nombre_procesado', F.concat_ws(' ', 'nombre_split_drop'))


#ESTADO
df_destinatarios = df_destinatarios.withColumn('direccion_split', F.split(F.col('DIRECCION'), ','))
df_destinatarios = df_destinatarios.withColumn('numero_elementos', F.size(F.col('direccion_split')))
df_destinatarios = df_destinatarios.withColumn('estado', F.col('direccion_split').getItem(F.col('numero_elementos') - 1 ))
df_destinatarios = df_destinatarios.withColumn('estado', F.trim(F.col('estado')))

df_destinatarios = df_destinatarios.withColumn('direccion_split_drop', F.expr("slice(direccion_split, 1, numero_elementos-1)"))
df_destinatarios = df_destinatarios.withColumn('direccion_procesado', F.concat_ws(',', 'direccion_split_drop'))

In [6]:
df_destinatarios.printSchema()

root
 |-- ID_SOLICITANTE: integer (nullable = true)
 |-- NOMBRE_SOLICITANTE: string (nullable = true)
 |-- DIRECCION: string (nullable = true)
 |-- CP: string (nullable = true)
 |-- FECHA_NACIMIENTO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- ID_BANCO: integer (nullable = true)
 |-- ID_PLAZA: integer (nullable = true)
 |-- CLABE: long (nullable = true)
 |-- nombre_split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nombre_indicador: integer (nullable = false)
 |-- nombre_elementos: integer (nullable = false)
 |-- nombre_procesado: string (nullable = true)
 |-- direccion_split: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- numero_elementos: integer (nullable = false)
 |-- estado: string (nullable = true)
 |-- direccion_split_drop: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- direccion_procesado: string (nullable = false)



In [7]:
df_destinatarios.show(3)

+--------------+--------------------+--------------------+----------+----------------+--------------------+--------+--------+-----------+--------------------+----------------+----------------+--------------------+--------------------+----------------+------+--------------------+--------------------+
|ID_SOLICITANTE|  NOMBRE_SOLICITANTE|           DIRECCION|        CP|FECHA_NACIMIENTO|              CORREO|ID_BANCO|ID_PLAZA|      CLABE|        nombre_split|nombre_indicador|nombre_elementos|    nombre_procesado|     direccion_split|numero_elementos|estado|direccion_split_drop| direccion_procesado|
+--------------+--------------------+--------------------+----------+----------------+--------------------+--------+--------+-----------+--------------------+----------------+----------------+--------------------+--------------------+----------------+------+--------------------+--------------------+
|             0|     Sr(a). Luz Leon|Diagonal Argelia ...|     76193|      1960-12-12|   pablo10@

### Tabla prestamos_bancos

In [8]:
nombre_archivo = 'prestamos_bancos.parquet'
df_bancos = spark.read.format('parquet')\
                .load(dir_archivo+dir_complemento+nombre_archivo)

df_bancos = df_bancos.select(
    F.col('DIGITOS').alias('ID_BANCO').cast(IntegerType()),
    F.col('BANCO').alias('NOMBRE_BANCO'))

In [9]:
df_bancos.printSchema()

root
 |-- ID_BANCO: integer (nullable = true)
 |-- NOMBRE_BANCO: string (nullable = true)



In [10]:
df_bancos.show(3)

+--------+------------+
|ID_BANCO|NOMBRE_BANCO|
+--------+------------+
|       2|     BANAMEX|
|       6|   BANCOMEXT|
|       9|    BANOBRAS|
+--------+------------+
only showing top 3 rows



### Tabla plaza

In [11]:
nombre_archivo = 'prestamos_plaza.parquet'
df_plaza = spark.read.format('parquet')\
                .load(dir_archivo+dir_complemento+nombre_archivo)

df_plaza = df_plaza.select(
    F.col('DIGITOS').alias('ID_PLAZA').cast(IntegerType()),
    F.col('PLAZA').alias('NOMBRE_PLAZA'))

############################################################
# Se agrega código temporal para hacer ID_PLAZA único
############################################################
w1 = Window.partitionBy('ID_PLAZA').orderBy('NOMBRE_PLAZA')
df_plaza = df_plaza\
            .withColumn('row', F.row_number().over(w1))\
            .filter(F.col('row') == 1).drop('row')

In [12]:
df_plaza.printSchema()

root
 |-- ID_PLAZA: integer (nullable = true)
 |-- NOMBRE_PLAZA: string (nullable = true)



In [13]:
df_plaza.show(3)

+--------+------------+
|ID_PLAZA|NOMBRE_PLAZA|
+--------+------------+
|     463|    Zumpango|
|     496|   La Piedad|
|     540|  Cuernavaca|
+--------+------------+
only showing top 3 rows



### Cruce tablas

In [14]:
df_destinatarios_b = df_destinatarios.join(df_bancos, how='left', on=['ID_BANCO'])
df_destinatarios_p = df_destinatarios_b.join(df_plaza, how='left', on=['ID_PLAZA'])

In [15]:
df_destinatarios_p = df_destinatarios_p.select(
    'ID_SOLICITANTE',
    F.col('nombre_procesado').alias('NOMBRE_SOLICITANTE'),
    F.col('direccion_procesado').alias('DIRECCION'),
    F.col('estado').alias('ESTADO'),
    'CP',
    'FECHA_NACIMIENTO',
    'CORREO',
    'ID_PLAZA',
    'NOMBRE_PLAZA',
    'ID_BANCO',
    'NOMBRE_BANCO',
    'CLABE')

df_destinatarios_p.cache()

DataFrame[ID_SOLICITANTE: int, NOMBRE_SOLICITANTE: string, DIRECCION: string, ESTADO: string, CP: string, FECHA_NACIMIENTO: string, CORREO: string, ID_PLAZA: int, NOMBRE_PLAZA: string, ID_BANCO: int, NOMBRE_BANCO: string, CLABE: bigint]

In [16]:
df_destinatarios_p.printSchema()

root
 |-- ID_SOLICITANTE: integer (nullable = true)
 |-- NOMBRE_SOLICITANTE: string (nullable = true)
 |-- DIRECCION: string (nullable = false)
 |-- ESTADO: string (nullable = true)
 |-- CP: string (nullable = true)
 |-- FECHA_NACIMIENTO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- ID_PLAZA: integer (nullable = true)
 |-- NOMBRE_PLAZA: string (nullable = true)
 |-- ID_BANCO: integer (nullable = true)
 |-- NOMBRE_BANCO: string (nullable = true)
 |-- CLABE: long (nullable = true)



In [17]:
df_destinatarios_p.show(n=2, vertical=True, truncate=False)

-RECORD 0---------------------------------------------------------------------------------
 ID_SOLICITANTE     | 0                                                                   
 NOMBRE_SOLICITANTE | Luz Leon                                                            
 DIRECCION          | Diagonal Argelia 602 655San Caridad de la Montaña                   
 ESTADO             | BC                                                                  
 CP                 | 76193                                                               
 FECHA_NACIMIENTO   | 1960-12-12                                                          
 CORREO             | pablo10@gmail.com                                                   
 ID_PLAZA           | 396                                                                 
 NOMBRE_PLAZA       | Tepatitlan                                                          
 ID_BANCO           | 147                                                                 

### Almacenamiento

In [18]:
nombre_archivo = 'prestamos_destinatarios_expandida.parquet'
df_almacenamiento(dir_archivo, nombre_archivo, df_destinatarios_p)

prestamos/curated/prestamos_destinatarios_expandida.parquet
output/prestamos_destinatarios_expandida.csv


### Unpersist

In [19]:
df_destinatarios_p.unpersist()

DataFrame[ID_SOLICITANTE: int, NOMBRE_SOLICITANTE: string, DIRECCION: string, ESTADO: string, CP: string, FECHA_NACIMIENTO: string, CORREO: string, ID_PLAZA: int, NOMBRE_PLAZA: string, ID_BANCO: int, NOMBRE_BANCO: string, CLABE: bigint]