### Dia01: Processamento de Grande Volume de Dados 
* Engine PySpark (Python API for Spark)
* Processamento em Memória

### Cenário: 
Temos 3 tabelas e desejamos fazer um ETL para gerar uma tabela analítica (ABT) na granularidade do cliente (client_id):
    * clients: tabela contendo clientes 
    * loan: tabela contendo empréstimos realizados pelos clientes
    * payments: tabela contendo pagamentos realizados clientes

#### Iniciando aplicação spark - Ambiente Turing Lab

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1571701187419_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fc8074dd190>

#### Lendo tabela de clientes (arquivo csv guardado no lake)

In [2]:
rdd_clients = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/Exemplo/clients.csv')
rdd_clients.count()

25

#### Verificando Schema da tabela

In [3]:
rdd_clients.printSchema()

root
 |-- client_id: integer (nullable = true)
 |-- joined: timestamp (nullable = true)
 |-- income: integer (nullable = true)
 |-- credit_score: integer (nullable = true)

#### Verificando 5 primeiras linhas da tabela

In [4]:
rdd_clients.show(5)

+---------+-------------------+------+------------+
|client_id|             joined|income|credit_score|
+---------+-------------------+------+------------+
|    46109|2002-04-16 00:00:00|172677|         527|
|    49545|2007-11-14 00:00:00|104564|         770|
|    41480|2013-03-11 00:00:00|122607|         585|
|    46180|2001-11-06 00:00:00| 43851|         562|
|    25707|2006-10-06 00:00:00|211422|         621|
+---------+-------------------+------+------------+
only showing top 5 rows

#### Lendo tabela de empréstimos realizados pelos clientes (arquivo csv guardado no lake)

In [5]:
rdd_loan = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/Exemplo/loans.csv')
rdd_loan.count()

443

In [6]:
rdd_loan.printSchema()

root
 |-- client_id: integer (nullable = true)
 |-- loan_type: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- repaid: integer (nullable = true)
 |-- loan_id: integer (nullable = true)
 |-- loan_start: timestamp (nullable = true)
 |-- loan_end: timestamp (nullable = true)
 |-- rate: double (nullable = true)

In [7]:
rdd_loan.show()

+---------+---------+-----------+------+-------+-------------------+-------------------+----+
|client_id|loan_type|loan_amount|repaid|loan_id|         loan_start|           loan_end|rate|
+---------+---------+-----------+------+-------+-------------------+-------------------+----+
|    46109|     home|      13672|     0|  10243|2002-04-16 00:00:00|2003-12-20 00:00:00|2.15|
|    46109|   credit|       9794|     0|  10984|2003-10-21 00:00:00|2005-07-17 00:00:00|1.25|
|    46109|     home|      12734|     1|  10990|2006-02-01 00:00:00|2007-07-05 00:00:00|0.68|
|    46109|     cash|      12518|     1|  10596|2010-12-08 00:00:00|2013-05-05 00:00:00|1.24|
|    46109|   credit|      14049|     1|  11415|2010-07-07 00:00:00|2012-05-21 00:00:00|3.13|
|    46109|     home|       6935|     0|  11501|2006-09-17 00:00:00|2008-11-26 00:00:00|1.94|
|    46109|     cash|       6177|     1|  11141|2007-03-12 00:00:00|2009-04-26 00:00:00|9.48|
|    46109|     home|      12656|     0|  11658|2006-05-26 0

#### Lendo tabela de pagamentos realizados pelos clientes (arquivo csv guardado no lake)

In [8]:
rdd_payments = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/Exemplo/payments.csv')
rdd_payments.count()

3456

In [9]:
rdd_payments.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- payment_amount: integer (nullable = true)
 |-- payment_date: timestamp (nullable = true)
 |-- missed: integer (nullable = true)

In [10]:
rdd_payments.show()

+-------+--------------+-------------------+------+
|loan_id|payment_amount|       payment_date|missed|
+-------+--------------+-------------------+------+
|  10243|          2369|2002-05-31 00:00:00|     1|
|  10243|          2439|2002-06-18 00:00:00|     1|
|  10243|          2662|2002-06-29 00:00:00|     0|
|  10243|          2268|2002-07-20 00:00:00|     0|
|  10243|          2027|2002-07-31 00:00:00|     1|
|  10243|          2243|2002-09-16 00:00:00|     1|
|  10984|          1466|2003-12-29 00:00:00|     0|
|  10984|          1887|2004-02-01 00:00:00|     1|
|  10984|          1360|2004-03-09 00:00:00|     1|
|  10984|          1350|2004-03-29 00:00:00|     0|
|  10984|          1728|2004-05-03 00:00:00|     1|
|  10990|          1981|2006-04-05 00:00:00|     0|
|  10990|          2284|2006-04-27 00:00:00|     1|
|  10990|          1304|2006-05-12 00:00:00|     0|
|  10990|          2253|2006-06-05 00:00:00|     1|
|  10990|          2453|2006-07-17 00:00:00|     0|
|  10990|   

### Vamos fazer todo ETL via HIVE (Nos possibilita utilizar SQL)

#### Disponibilzar tabelas (RDDs em memória para o Hive)

In [11]:
rdd_clients.registerTempTable("clientes")
rdd_loan.registerTempTable("emprestimos")
rdd_payments.registerTempTable("pagamentos")

#### Vamos utilizar SQL para explorar os emprestimos feitos pelo cliente de id 46109

In [12]:
spark.sql(
             """SELECT 
                    client_id,
                    loan_type,                    
                    cast(loan_start as date) as DT_INICIO,
                    cast(loan_end as date) as DT_FIM,
                    datediff(cast(loan_end as date),cast(loan_start as date)) as QT_DIAS_START_END,
                    round(datediff(cast(loan_end as date),cast(loan_start as date))/30.5) as QT_MESES_START_END,
                    round(datediff(cast(loan_end as date),cast(loan_start as date))/365) as QT_ANOS_START_END
                FROM
                    emprestimos 
                WHERE 
                    client_id = 46109 
                order by cast(loan_start as date)    
             """
         ).show()

+---------+---------+----------+----------+-----------------+------------------+-----------------+
|client_id|loan_type| DT_INICIO|    DT_FIM|QT_DIAS_START_END|QT_MESES_START_END|QT_ANOS_START_END|
+---------+---------+----------+----------+-----------------+------------------+-----------------+
|    46109|   credit|2001-03-25|2003-10-04|              923|                30|              3.0|
|    46109|   credit|2001-09-24|2003-08-31|              706|                23|              2.0|
|    46109|     home|2002-04-16|2003-12-20|              613|                20|              2.0|
|    46109|   credit|2003-10-21|2005-07-17|              635|                21|              2.0|
|    46109|    other|2003-12-06|2005-08-19|              622|                20|              2.0|
|    46109|    other|2004-04-05|2006-10-13|              921|                30|              3.0|
|    46109|     cash|2005-06-17|2007-03-01|              622|                20|              2.0|
|    46109

#### Questão 01) Quais são e quantos tipos de emprestimos diferentes existem na tabela de emprestimos ?

In [13]:
spark.sql(
             """SELECT 
                    loan_type as TP_EMPRESTIMO, 
                    count(*) as QTD_EMPRESTIMOS
                FROM
                    emprestimos 
                group by loan_type
             """
         ).show(100)

+-------------+---------------+
|TP_EMPRESTIMO|QTD_EMPRESTIMOS|
+-------------+---------------+
|         cash|            108|
|        other|            107|
|         home|            121|
|       credit|            107|
+-------------+---------------+

#### Vamos trazer as informações de pagamentos para a tabela de empréstimos

In [14]:
rdd_loan_payments = spark.sql(
                             """SELECT 
                                    a.client_id,
                                    a.loan_id,
                                    a.loan_type,
                                    a.loan_amount,
                                    a.repaid,
                                    a.rate, 
                                    datediff(cast(a.loan_end as date),cast(a.loan_start as date)) as QT_DIAS_START_END,
                                    datediff(cast(b.payment_date as date),cast(a.loan_start as date)) as QT_DIAS_START_PAYMENT,
                                    b.payment_amount,
                                    b.missed
                                FROM
                                    emprestimos as a
                                LEFT JOIN
                                    pagamentos as b
                                ON a.loan_id = b.loan_id
                             """
                         )
rdd_loan_payments.registerTempTable("loan_payments")
rdd_loan_payments.count()

3456

In [15]:
rdd_loan_payments.show()

+---------+-------+---------+-----------+------+----+-----------------+---------------------+--------------+------+
|client_id|loan_id|loan_type|loan_amount|repaid|rate|QT_DIAS_START_END|QT_DIAS_START_PAYMENT|payment_amount|missed|
+---------+-------+---------+-----------+------+----+-----------------+---------------------+--------------+------+
|    46109|  10243|     home|      13672|     0|2.15|              613|                  153|          2243|     1|
|    46109|  10243|     home|      13672|     0|2.15|              613|                  106|          2027|     1|
|    46109|  10243|     home|      13672|     0|2.15|              613|                   95|          2268|     0|
|    46109|  10243|     home|      13672|     0|2.15|              613|                   74|          2662|     0|
|    46109|  10243|     home|      13672|     0|2.15|              613|                   63|          2439|     1|
|    46109|  10243|     home|      13672|     0|2.15|              613| 

#### Agregar a tabela (rdd_loan_payments) de emprestimos e pagamentos para que fique na granularidade do cliente

In [16]:
rdd_loan_payments_agg = spark.sql(
                             """SELECT
                                    client_id,
                                    sum(case when loan_type = 'home' then  1 else 0 end) as QT_EMP_HOME,
                                    sum(case when loan_type = 'credit' then  1 else 0 end) as QT_EMP_CRED,
                                    sum(case when loan_type = 'cash' then  1 else 0 end) as QT_EMP_CASH,
                                    sum(case when loan_type = 'other' then  1 else 0 end) as QT_EMP_OTHER,
                                    mean(loan_amount) as mean_loan_amount,
                                    max(loan_amount) as max_loan_amount,
                                    min(loan_amount) as min_loan_amount,
                                    mean(payment_amount) as mean_payment_amount,
                                    max(payment_amount) as max_payment_amount,
                                    min(payment_amount) as min_payment_amount, 
                                    max(QT_DIAS_START_PAYMENT) as MAX_QT_DIAS_START_PAYMENT,
                                    mean(missed) as mean_missed,
                                    mean(repaid) as mean_repaid,
                                    mean(rate) as mean_rate,
                                    max(QT_DIAS_START_END) as MAX_QT_DIAS_START_END                                    
                                FROM
                                    loan_payments 
                                GROUP BY
                                    client_id
                             """
                         )
rdd_loan_payments_agg.registerTempTable("loan_payments_agg")
rdd_loan_payments_agg.count()

25

In [17]:
rdd_loan_payments_agg.printSchema()

root
 |-- client_id: integer (nullable = true)
 |-- QT_EMP_HOME: long (nullable = true)
 |-- QT_EMP_CRED: long (nullable = true)
 |-- QT_EMP_CASH: long (nullable = true)
 |-- QT_EMP_OTHER: long (nullable = true)
 |-- mean_loan_amount: double (nullable = true)
 |-- max_loan_amount: integer (nullable = true)
 |-- min_loan_amount: integer (nullable = true)
 |-- mean_payment_amount: double (nullable = true)
 |-- max_payment_amount: integer (nullable = true)
 |-- min_payment_amount: integer (nullable = true)
 |-- MAX_QT_DIAS_START_PAYMENT: integer (nullable = true)
 |-- mean_missed: double (nullable = true)
 |-- mean_repaid: double (nullable = true)
 |-- mean_rate: double (nullable = true)
 |-- MAX_QT_DIAS_START_END: integer (nullable = true)

#### Trazer as informações de empréstimos e pagamentos para a tabela de clientes (apenas algumas variaveis como exemplo)

In [18]:
# Criando variavel temporal de referencia atual

import datetime
import time

# DEslocando 3 horas devido ao fuso horário entre Brasil e EUA
now = datetime.datetime.now() + datetime.timedelta(0, 0, 0, 0, 0, -3)
anomesdia = now.strftime("%Y%m%d")
timestpref = now.strftime('%Y-%m-%dT%H:%M:%S')


print('Data de referencia (AAAAMMDD): ',anomesdia)

('Data de referencia (AAAAMMDD): ', '20191021')

In [19]:
rdd_final = spark.sql(
                             """SELECT
                                    a.client_id as PK_ID,
                                    a.credit_score as VL_SCORE,
                                    a.income as VL_RENDA,
                                    datediff(cast('{}' as date),cast(a.joined as date)) as QT_DIAS_JOINED_NOW,
                                    b.QT_EMP_HOME,
                                    b.QT_EMP_CRED,
                                    round(b.mean_missed,2) as VL_MED_MISSED,
                                    round(b.mean_repaid,2) as VL_MED_REPAID,
                                    {} as PK_DATREF,
                                    '{}' as PK_DATPROC
                                FROM
                                    clientes as a
                                LEFT JOIN
                                    loan_payments_agg as b
                                ON a.client_id = b.client_id
                             """.format(now,anomesdia,timestpref)
                         )
rdd_final.registerTempTable("rdd_final")
rdd_final.count()

25

In [20]:
rdd_final.show()

+-----+--------+--------+------------------+-----------+-----------+-------------+-------------+---------+-------------------+
|PK_ID|VL_SCORE|VL_RENDA|QT_DIAS_JOINED_NOW|QT_EMP_HOME|QT_EMP_CRED|VL_MED_MISSED|VL_MED_REPAID|PK_DATREF|         PK_DATPROC|
+-----+--------+--------+------------------+-----------+-----------+-------------+-------------+---------+-------------------+
|35214|     696|   95849|              5918|         93|         21|         0.51|         0.39| 20191021|2019-10-21T21:14:35|
|44601|     518|  156341|              5479|         47|         58|         0.46|         0.73| 20191021|2019-10-21T21:14:35|
|26945|     806|  214516|              6903|         30|         43|         0.51|         0.34| 20191021|2019-10-21T21:14:35|
|49545|     770|  104564|              4359|         49|         41|         0.58|         0.59| 20191021|2019-10-21T21:14:35|
|49624|     800|   49036|              2634|         24|         35|         0.47|         0.62| 20191021|2019-

In [21]:
rdd_final.printSchema()

root
 |-- PK_ID: integer (nullable = true)
 |-- VL_SCORE: integer (nullable = true)
 |-- VL_RENDA: integer (nullable = true)
 |-- QT_DIAS_JOINED_NOW: integer (nullable = true)
 |-- QT_EMP_HOME: long (nullable = true)
 |-- QT_EMP_CRED: long (nullable = true)
 |-- VL_MED_MISSED: double (nullable = true)
 |-- VL_MED_REPAID: double (nullable = true)
 |-- PK_DATREF: integer (nullable = false)
 |-- PK_DATPROC: string (nullable = false)

## Formas de Salvar o resultado final do ETL
    * CSV (particionado ou não particionado)
    * PARQUET (particionado ou não particionado)
    * HDFS 
Recomendado: Parquet particionado        

#### Salvando PARQUET não particionado no ambiente S3 
Lembre-se de trocar o nome da pasta de BrunoJ para o seu nome !!

In [22]:
rdd_final.write.parquet("s3://turing-bkt-treinamentos-etl/Alunos/Turma_20191021/ABT_CLI_EMP_PAG",mode="overwrite")

#### Salvando PARQUET particionado por data (AAAAMMDD) no ambiente S3 
Cada nova data o código cria automaticamente uma nova partição com dados

In [23]:
nm_path_s3 = 's3://turing-bkt-treinamentos-etl/Alunos/Turma_20191021/PARTIT_ABT_CLI_EMP_PAG/'
rdd_final.write.partitionBy('PK_DATREF').parquet(nm_path_s3, mode='append')

#### Criando Schema HIVE para salvar tabelas resultantes do ETL

In [24]:
spark.sql("create database treinamento").show()

++
||
++
++

#### Salvando PARQUET particionado via HIVE

In [25]:
rdd_final.write.mode('append').partitionBy('PK_DATREF').saveAsTable("treinamento.PARTIT_ABT_CLI_EMP_PAG")

In [26]:
spark.sql("show tables in treinamento").show(10,False)

+-----------+----------------------+-----------+
|database   |tableName             |isTemporary|
+-----------+----------------------+-----------+
|treinamento|partit_abt_cli_emp_pag|false      |
|           |clientes              |true       |
|           |emprestimos           |true       |
|           |loan_payments         |true       |
|           |loan_payments_agg     |true       |
|           |pagamentos            |true       |
|           |rdd_final             |true       |
+-----------+----------------------+-----------+

### Para verificar os arquivos salvos no dirtório (Utilize prompt de comandos)

* hdfs dfs -ls /user/spark/warehouse/treinamento.db

* hdfs dfs -ls /user/spark/warehouse/treinamento.db/partit_abt_cli_emp_pag

* hdfs dfs -ls /user/spark/warehouse/treinamento.db/partit_abt_cli_emp_pag/PK_DATREF=20190513

### Para limpar tabelas temporarias da memoria HIVE

In [27]:
spark.catalog.dropTempView("clientes") 
spark.catalog.dropTempView("clients_dfm") 
spark.catalog.dropTempView("emprestimos") 
spark.catalog.dropTempView("loan_payments") 
spark.catalog.dropTempView("loan_payments_agg") 
spark.catalog.dropTempView("pagamentos") 
spark.catalog.dropTempView("rdd_final") 


spark.sql("show tables in treinamento").show(10,False)

+-----------+----------------------+-----------+
|database   |tableName             |isTemporary|
+-----------+----------------------+-----------+
|treinamento|partit_abt_cli_emp_pag|false      |
+-----------+----------------------+-----------+

#### Lembrar que se o ETL for para gerar uma ABT de modelagem é necessário fazer a amostra por aqui para reduzir tempo/esforço da máquina quando na interface de Analytics

#### Exemplo de amostragem 
* Gerar amostra de 10%, sem reposição, da tabela de pagamentos

In [28]:
rdd_payments_amt_10pct = rdd_payments.sample(False, 0.1, 12345)
rdd_payments_amt_10pct.count()

324