In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from google.cloud import bigquery

In [72]:
spark = SparkSession.builder.getOrCreate()

In [73]:
spark

In [74]:
schema_name = "maximal-beach-351616.dwh."
storage_name = "gs://etl-dmc-ultima-clase-erps"

In [75]:
# 4.1 Estructura del dataframe.
df_schema = StructType([
StructField("BANCO", StringType(),True),
StructField("CARTERA", StringType(),True),
StructField("PARTICIPACION", StringType(),True),
StructField("RANKING", StringType(),True),
StructField("PERIODO", StringType(),True),
StructField("TIPO_ENTIDAD", StringType(),True),

])

In [76]:
# 4.2 Definimos ruta del archivo
#Archivo en Cloud Storage - Google Cloud Platform
ruta_colocaciones_raw = storage_name+"/datalake/WORKLOAD/SAP/colocaciones.csv"

In [77]:
#Leer el archivo de origen
df_colocaciones = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema).load(ruta_colocaciones_raw)

In [78]:
#4.4 Mostramos la estructura del dataframe.
df_colocaciones.printSchema()

root
 |-- BANCO: string (nullable = true)
 |-- CARTERA: string (nullable = true)
 |-- PARTICIPACION: string (nullable = true)
 |-- RANKING: string (nullable = true)
 |-- PERIODO: string (nullable = true)
 |-- TIPO_ENTIDAD: string (nullable = true)



In [79]:
# 4.5 Mostraremos todos los datos del dataframe.
df_colocaciones.show(5)

+-----+----------+-------------------+-------+-------+----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|    TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+----------------+
|  BNB|18984734.0|0.10331274633647858|    2.0| DIC-19|BANCOS MÚLTIPLES|
|  BUN|18388820.0|0.10006985065406573|    3.0| DIC-19|BANCOS MÚLTIPLES|
|  BME|25457551.0|0.13853707451529038|    1.0| DIC-19|BANCOS MÚLTIPLES|
|  BIS|16021005.0|0.08718447283066778|    4.0| DIC-19|BANCOS MÚLTIPLES|
|  BCR|15642151.0|0.08512279278813675|    5.0| DIC-19|BANCOS MÚLTIPLES|
+-----+----------+-------------------+-------+-------+----------------+
only showing top 5 rows



In [80]:
#Archivo en Cloud Storage - Google Cloud Platform
ruta_colocaciones_landing = storage_name+"/datalake/LANDING/SAP/colocaciones"

df_colocaciones.write.mode("overwrite").format("parquet").save(ruta_colocaciones_landing)

                                                                                

In [81]:
print("=============================================")
print("Termino procesamiento de Colocaciones Landing")
print("=============================================")
print(" ")
print("============================================")
print("Inicio procesamiento de Depositos Landing")
print("============================================")

Termino procesamiento de Colocaciones Landing
 
Inicio procesamiento de Depositos Landing


In [82]:
# 4.2 Definimos ruta del archivo
#Archivo en Cloud Storage - Google Cloud Platform
ruta_depositos_raw = storage_name+"/datalake/WORKLOAD/SAP/depositos.csv"

In [83]:
#Leer el archivo de origen
df_depositos = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema).load(ruta_depositos_raw)

In [84]:
#4.4 Mostramos la estructura del dataframe.
df_depositos.printSchema()

root
 |-- BANCO: string (nullable = true)
 |-- CARTERA: string (nullable = true)
 |-- PARTICIPACION: string (nullable = true)
 |-- RANKING: string (nullable = true)
 |-- PERIODO: string (nullable = true)
 |-- TIPO_ENTIDAD: string (nullable = true)



In [85]:
df_depositos.show(5)

+-----+----------+-------------------+-------+-------+-----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|     TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+-----------------+
|  BNB|21153859.0|0.12184589234486297|    2.0| DIC-19|BANCOS MÚLTIPLES |
|  BUN|19183763.0|0.11049817063011366|    3.0| DIC-19|BANCOS MÚLTIPLES |
|  BME|27828839.0|0.16029367128127894|    1.0| DIC-19|BANCOS MÚLTIPLES |
|  BIS|17249380.0|0.09935615522896472|    4.0| DIC-19|BANCOS MÚLTIPLES |
|  BCR|15902397.0|0.09159755451179248|    5.0| DIC-19|BANCOS MÚLTIPLES |
+-----+----------+-------------------+-------+-------+-----------------+
only showing top 5 rows



In [86]:
#Archivo en Cloud Storage - Google Cloud Platform
ruta_depositos_landing = storage_name+"/datalake/LANDING/SAP/depositos"

df_depositos.write.mode("overwrite").format("parquet").save(ruta_depositos_landing)

                                                                                

In [87]:
print("=============================================")
print("Termino procesamiento de Depositos Landing")
print("=============================================")

Termino procesamiento de Depositos Landing


In [88]:
print("=============================================")
print("Inicio Poblamiento Curated")
print("=============================================")

Inicio Poblamiento Curated


In [89]:
df_colocaciones_landing = spark.read.format("parquet").option("header","true").load(ruta_colocaciones_landing)

In [90]:
df_colocaciones_landing.show(10)

+-------+----------+--------------------+-------+-------+----------------+
|  BANCO|   CARTERA|       PARTICIPACION|RANKING|PERIODO|    TIPO_ENTIDAD|
+-------+----------+--------------------+-------+-------+----------------+
|    BNB|18984734.0| 0.10331274633647858|    2.0| DIC-19|BANCOS MÚLTIPLES|
|    BUN|18388820.0| 0.10006985065406573|    3.0| DIC-19|BANCOS MÚLTIPLES|
|    BME|25457551.0| 0.13853707451529038|    1.0| DIC-19|BANCOS MÚLTIPLES|
|    BIS|16021005.0| 0.08718447283066778|    4.0| DIC-19|BANCOS MÚLTIPLES|
|    BCR|15642151.0| 0.08512279278813675|    5.0| DIC-19|BANCOS MÚLTIPLES|
|    BGA| 9807274.0| 0.05337006096658197|    9.0| DIC-19|BANCOS MÚLTIPLES|
|    BEC| 8454669.0| 0.04600933959653525|   10.0| DIC-19|BANCOS MÚLTIPLES|
|    BSO|11767753.0| 0.06403876296814771|    8.0| DIC-19|BANCOS MÚLTIPLES|
|    BNA|  118014.0|0.000642218660854...|   44.0| DIC-19|BANCOS MÚLTIPLES|
|BDB (3)|   80778.0|0.000439584616964...|   51.0| DIC-19|BANCOS MÚLTIPLES|
+-------+----------+-----

In [91]:
df_colocaciones_landing.printSchema()

root
 |-- BANCO: string (nullable = true)
 |-- CARTERA: string (nullable = true)
 |-- PARTICIPACION: string (nullable = true)
 |-- RANKING: string (nullable = true)
 |-- PERIODO: string (nullable = true)
 |-- TIPO_ENTIDAD: string (nullable = true)



In [92]:
df_colocaciones_procesado = df_colocaciones_landing.withColumn('CARTERA', regexp_replace('CARTERA', '- - -', '0'))
df_colocaciones_procesado = df_colocaciones_procesado.withColumn('PARTICIPACION', regexp_replace('PARTICIPACION', '- - -', '0'))
df_colocaciones_procesado = df_colocaciones_procesado.withColumn('RANKING', regexp_replace('RANKING', '- - -', '0'))
df_colocaciones_procesado.show(5)

+-----+----------+-------------------+-------+-------+----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|    TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+----------------+
|  BNB|18984734.0|0.10331274633647858|    2.0| DIC-19|BANCOS MÚLTIPLES|
|  BUN|18388820.0|0.10006985065406573|    3.0| DIC-19|BANCOS MÚLTIPLES|
|  BME|25457551.0|0.13853707451529038|    1.0| DIC-19|BANCOS MÚLTIPLES|
|  BIS|16021005.0|0.08718447283066778|    4.0| DIC-19|BANCOS MÚLTIPLES|
|  BCR|15642151.0|0.08512279278813675|    5.0| DIC-19|BANCOS MÚLTIPLES|
+-----+----------+-------------------+-------+-------+----------------+
only showing top 5 rows



In [93]:
#Archivo en Cloud Storage - Google Cloud Platform
ruta_colocaciones_curated = storage_name+"/datalake/CURATED/SAP/colocaciones"

In [94]:
df_colocaciones_procesado.write.mode("overwrite").format("parquet").save(ruta_colocaciones_curated)

                                                                                

In [95]:
df_depositos_landing = spark.read.format("parquet").option("header","true").load(ruta_depositos_landing)

In [96]:
df_depositos_landing.show(10)

+-------+----------+--------------------+-------+-------+-----------------+
|  BANCO|   CARTERA|       PARTICIPACION|RANKING|PERIODO|     TIPO_ENTIDAD|
+-------+----------+--------------------+-------+-------+-----------------+
|    BNB|21153859.0| 0.12184589234486297|    2.0| DIC-19|BANCOS MÚLTIPLES |
|    BUN|19183763.0| 0.11049817063011366|    3.0| DIC-19|BANCOS MÚLTIPLES |
|    BME|27828839.0| 0.16029367128127894|    1.0| DIC-19|BANCOS MÚLTIPLES |
|    BIS|17249380.0| 0.09935615522896472|    4.0| DIC-19|BANCOS MÚLTIPLES |
|    BCR|15902397.0| 0.09159755451179248|    5.0| DIC-19|BANCOS MÚLTIPLES |
|    BGA| 8937264.0| 0.05147849889713359|    8.0| DIC-19|BANCOS MÚLTIPLES |
|    BEC| 7626117.0| 0.04392631297161097|   10.0| DIC-19|BANCOS MÚLTIPLES |
|    BSO| 8024847.0|0.046222991447848666|    9.0| DIC-19|BANCOS MÚLTIPLES |
|    BNA|   66850.0|0.000385054939775...|   45.0| DIC-19|BANCOS MÚLTIPLES |
|BDB (3)|   55899.0|0.000321977353455...|   48.0| DIC-19|BANCOS MÚLTIPLES |
+-------+---

In [97]:
df_depositos_landing.printSchema()

root
 |-- BANCO: string (nullable = true)
 |-- CARTERA: string (nullable = true)
 |-- PARTICIPACION: string (nullable = true)
 |-- RANKING: string (nullable = true)
 |-- PERIODO: string (nullable = true)
 |-- TIPO_ENTIDAD: string (nullable = true)



In [98]:
df_depositos_procesado = df_depositos_landing.withColumn('CARTERA', regexp_replace('CARTERA', '- - -', '0'))
df_depositos_procesado = df_depositos_procesado.withColumn('PARTICIPACION', regexp_replace('PARTICIPACION', '- - -', '0'))
df_depositos_procesado = df_depositos_procesado.withColumn('RANKING', regexp_replace('RANKING', '- - -', '0'))
df_depositos_procesado.show(5)

+-----+----------+-------------------+-------+-------+-----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|     TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+-----------------+
|  BNB|21153859.0|0.12184589234486297|    2.0| DIC-19|BANCOS MÚLTIPLES |
|  BUN|19183763.0|0.11049817063011366|    3.0| DIC-19|BANCOS MÚLTIPLES |
|  BME|27828839.0|0.16029367128127894|    1.0| DIC-19|BANCOS MÚLTIPLES |
|  BIS|17249380.0|0.09935615522896472|    4.0| DIC-19|BANCOS MÚLTIPLES |
|  BCR|15902397.0|0.09159755451179248|    5.0| DIC-19|BANCOS MÚLTIPLES |
+-----+----------+-------------------+-------+-------+-----------------+
only showing top 5 rows



In [99]:
#Archivo en Cloud Storage - Google Cloud Platform
ruta_depositos_curated = storage_name+"/datalake/CURATED/SAP/depositos"

In [100]:
df_depositos_procesado.write.mode("overwrite").format("parquet").save(ruta_depositos_curated)

                                                                                

In [101]:
print("=============================================")
print("Fin Poblamiento Curated")
print("=============================================")

Fin Poblamiento Curated


In [102]:
df_colocaciones_curated = spark.read.format("parquet").option("header","true").load(ruta_colocaciones_curated)

df_depositos_curated = spark.read.format("parquet").option("header","true").load(ruta_depositos_curated)

In [103]:
df_colocaciones_curated.show(5)
df_depositos_curated.show(5)

+-----+----------+-------------------+-------+-------+----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|    TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+----------------+
|  BNB|18984734.0|0.10331274633647858|    2.0| DIC-19|BANCOS MÚLTIPLES|
|  BUN|18388820.0|0.10006985065406573|    3.0| DIC-19|BANCOS MÚLTIPLES|
|  BME|25457551.0|0.13853707451529038|    1.0| DIC-19|BANCOS MÚLTIPLES|
|  BIS|16021005.0|0.08718447283066778|    4.0| DIC-19|BANCOS MÚLTIPLES|
|  BCR|15642151.0|0.08512279278813675|    5.0| DIC-19|BANCOS MÚLTIPLES|
+-----+----------+-------------------+-------+-------+----------------+
only showing top 5 rows

+-----+----------+-------------------+-------+-------+-----------------+
|BANCO|   CARTERA|      PARTICIPACION|RANKING|PERIODO|     TIPO_ENTIDAD|
+-----+----------+-------------------+-------+-------+-----------------+
|  BNB|21153859.0|0.12184589234486297|    2.0| DIC-19|BANCOS MÚLTIPLES |
|  BUN|19183763.0|0.110498170630113

In [104]:
df_colocaciones_curated.createOrReplaceTempView("tb_colocaciones")
df_depositos_curated.createOrReplaceTempView("tb_depositos")

df_union = spark.sql("Select banco, cast(cartera as decimal(10,2)) monto,\
                     cast(participacion as decimal(10,4)) participacion, \
                     cast(ranking as decimal(10,2)) ranking, periodo, 'colocaciones' as tipoproducto  \
                     from tb_colocaciones\
                     union \
                     Select banco, cast(cartera as decimal(10,2)) monto,\
                     cast(participacion as decimal(10,4)) participacion, \
                     cast(ranking as decimal(10,2)) ranking, periodo, 'depositos' as tipoproducto  \
                     from tb_depositos")
df_union.show(5)



+-------+---------+-------------+-------+-------+------------+
|  banco|    monto|participacion|ranking|periodo|tipoproducto|
+-------+---------+-------------+-------+-------+------------+
|    IID|340924.92|       0.0017|  27.00| DIC-21|colocaciones|
|COO (7)| 33692.68|       0.0002|  62.00| MAY-22|colocaciones|
|    CSR|191170.72|       0.0009|  37.00| MAY-22|colocaciones|
|    CCP|271022.98|       0.0013|  30.00| MAY-22|colocaciones|
|    ICI|729548.00|       0.0040|  22.00| DIC-19|colocaciones|
+-------+---------+-------------+-------+-------+------------+
only showing top 5 rows



                                                                                

In [105]:
df_bancos = spark.sql("Select distinct c.banco, c.tipo_entidad \
                       from (SELECT * FROM tb_colocaciones union SELECT * FROM tb_depositos) as c")
df_bancos.show(5)

+-------+--------------------+
|  banco|        tipo_entidad|
+-------+--------------------+
|    BIS|    BANCOS MÚLTIPLES|
|    BUN|   BANCOS MÚLTIPLES |
|IDI (6)|INSTITUCIONES FIN...|
|    BSO|   BANCOS MÚLTIPLES |
|    CJN|COOPERATIVAS DE A...|
+-------+--------------------+
only showing top 5 rows



In [106]:
df_periodo = spark.sql("Select distinct c.periodo from \
                      (SELECT * FROM tb_colocaciones union SELECT * FROM tb_depositos) as c")
df_periodo.show(5)

+-------+
|periodo|
+-------+
| MAY-22|
| DIC-20|
| DIC-19|
| DIC-21|
+-------+



In [107]:
ruta_functional_captaciones_depositos =  storage_name+"/datalake/FUNCTIONAL/SAP/captaciones_depositos"

df_union.write.mode("overwrite").format("parquet").save(ruta_functional_captaciones_depositos)

                                                                                

In [108]:
ruta_functional_bancos = storage_name+"/datalake/FUNCTIONAL/SAP/bancos/"

df_bancos.write.mode("overwrite").format("parquet").save(ruta_functional_bancos)

                                                                                

In [109]:
ruta_functional_periodo = storage_name+"/datalake/FUNCTIONAL/SAP/periodo/"

df_periodo.write.mode("overwrite").format("parquet").save(ruta_functional_periodo)

                                                                                

In [110]:
df_colocaciones_depositos = spark.read.format("parquet").option("header","true").load(ruta_functional_captaciones_depositos)
df_colocaciones_depositos.show(10)

                                                                                

+-------+-----------+-------------+-------+-------+------------+
|  banco|      monto|participacion|ranking|periodo|tipoproducto|
+-------+-----------+-------------+-------+-------+------------+
|    BCR|15642151.00|       0.0851|   5.00| DIC-19|colocaciones|
|    BEC| 8881678.85|       0.0433|  10.00| MAY-22|colocaciones|
|CVE (6)|   20105.10|       0.0001|  64.00| MAY-22|colocaciones|
|    CCM|  112742.00|       0.0006|  43.00| DIC-20|colocaciones|
|    CMG|   97577.61|       0.0005|  49.00| DIC-21|colocaciones|
|CEY (8)|  110513.37|       0.0005|  46.00| MAY-22|colocaciones|
|    BFS|13755402.00|       0.0749|   6.00| DIC-19|colocaciones|
|CLC (7)|   17584.76|       0.0001|  65.00| MAY-22|colocaciones|
|    ISA|  327314.00|       0.0018|  25.00| DIC-19|colocaciones|
|    IID|  272962.00|       0.0014|  27.00| DIC-20|colocaciones|
+-------+-----------+-------------+-------+-------+------------+
only showing top 10 rows



In [111]:
df_bancos = spark.read.format("parquet").option("header","true").load(ruta_functional_bancos)
df_bancos.show(10)

[Stage 108:>                                                        (0 + 1) / 1]

+--------+--------------------+
|   banco|        tipo_entidad|
+--------+--------------------+
|COO (10)|COOPERATIVAS DE A...|
|     CIH|COOPERATIVAS DE A...|
|     CME|COOPERATIVAS DE A...|
|     IIM|INSTITUCIONES FIN...|
|     CTR|COOPERATIVAS DE A...|
|     CSR|COOPERATIVAS DE A...|
|     BEC|   BANCOS MÚLTIPLES |
|     PEF|         BANCOS PYME|
| CVE (6)|COOPERATIVAS DE A...|
|     IFO|INSTITUCIONES FIN...|
+--------+--------------------+
only showing top 10 rows



                                                                                

In [112]:
df_periodo = spark.read.format("parquet").option("header","true").load(ruta_functional_periodo)
df_periodo.show(10)

+-------+
|periodo|
+-------+
| MAY-22|
| DIC-21|
| DIC-20|
| DIC-19|
+-------+



In [113]:
print("=============================================")
print("Fin Poblamiento Functional")
print("=============================================")

Fin Poblamiento Functional


In [114]:
print("=============================================")
print("Inicio Big Query")
print("=============================================")

Inicio Big Query


# BigData

In [115]:
# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
#table_id = "your-project.your_dataset.your_table_name"
table_id = schema_name+"tbl_bancos"

schema = [
    bigquery.SchemaField("banco", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("tipo_entidad", "STRING", mode="REQUIRED"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table maximal-beach-351616.dwh.tbl_bancos


In [116]:
# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
#table_id = "your-project.your_dataset.your_table_name"
table_id = schema_name+"tbl_periodo"

schema = [
    bigquery.SchemaField("periodo", "STRING", mode="REQUIRED"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table maximal-beach-351616.dwh.tbl_periodo


In [117]:
# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
#table_id = "your-project.your_dataset.your_table_name"
table_id = schema_name+"tbl_colocaciones_depositos"

schema = [
    bigquery.SchemaField("banco", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("monto", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("participacion", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("ranking", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("periodo", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("tipoproducto", "STRING", mode="REQUIRED"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table maximal-beach-351616.dwh.tbl_colocaciones_depositos


In [118]:
client = bigquery.Client()

# TODO: MODIFICAR EL NOMBRE DE TABLA DE BIGQUERY
table_id = schema_name+"tbl_bancos"

job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
                                    source_format=bigquery.SourceFormat.PARQUET,
                                   )
uri = storage_name+"/datalake/FUNCTIONAL/SAP/bancos/*.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  

load_job.result()

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 93 rows.


In [119]:
table_id = schema_name+"tbl_periodo"

uri = storage_name+"/datalake/FUNCTIONAL/SAP/periodo/*.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  

load_job.result()

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 4 rows.


In [120]:
table_id = schema_name+"tbl_colocaciones_depositos"

uri = storage_name+"/datalake/FUNCTIONAL/SAP/captaciones_depositos/*.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  

load_job.result()

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 508 rows.


In [121]:
print("=============================================")
print("Termino Poblamiento Big Query")
print("=============================================")

Termino Poblamiento Big Query
