# Importando Clientes

In [None]:
rdd_clients = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://treinamento-big-data/clients.csv')
rdd_clients.count()

In [None]:
# Verificando Metadados da tabela
rdd_clients.printSchema()

In [None]:
# Verificando 20 linhas da tabela
rdd_clients.show()

In [None]:
spark

In [None]:
rdd_payments = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://treinamento-big-data/payments.csv')
   
rdd_payments.count()

In [None]:
rdd_payments.printSchema()

In [None]:
rdd_payments.show()

In [None]:
rdd_loan = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://treinamento-big-data/loans.csv')
rdd_loan.count()

In [None]:
rdd_loan.printSchema()

In [None]:
rdd_loan.show()

In [None]:
payments_dfm = rdd_payments.registerTempTable("payments_dfm")
clients_dfm = rdd_clients.registerTempTable("clients_dfm")
loans_dfm = rdd_loan.registerTempTable("loans_dfm")


In [None]:
spark.sql(
             """SELECT 
                    *,
                    cast(loan_start as date) as DT_INICIO,
                    cast(loan_end as date) as DT_FIM,
                    datediff(cast(loan_end as date),cast(loan_start as date)) as QT_DIAS_START_END,
                    round(datediff(cast(loan_end as date),cast(loan_start as date))/30.5) as QT_MESES_START_END,
                    round(datediff(cast(loan_end as date),cast(loan_start as date))/365) as QT_ANOS_START_END
                FROM
                    loans_dfm 
                WHERE 
                    client_id = 46109 and loan_type in ('home') 
                order by cast(loan_start as date)    
             """
         ).show()

In [None]:
spark.sql(
             """SELECT 
                    *,
                    cast(payment_date as date ) as DT_PGTO
                    --min(payment_date) as DT_PRIMEIRO_PAGAMENTO,
                    --max(payment_date) as DT_ULTIMO_PAGAMENTO,
                FROM
                    payments_dfm 
                WHERE 
                    loan_id = 10243 
             """
         ).show()

In [None]:
spark.sql(
             """SELECT
                    loan_id,
                    sum(case when missed = 1 then 0 else payment_amount end) as VL_PGTO_MISS,
                    sum(case when missed = 0 then 0 else payment_amount end) as VL_PGTO_NOT_MISS,
                    sum(payment_amount) as VL_TOT,
                    cast(min(payment_date) as date) as DT_PRIMEIRO_PAGAMENTO,
                    cast(max(payment_date) as date) as DT_ULTIMO_PAGAMENTO,
                    count(*) as QT_PGTOS
                FROM
                    payments_dfm 
                WHERE 
                    loan_id = 10243
                GROUP BY  
                    loan_id
             """
         ).show()

In [None]:
clients_00 = spark.sql(
                     """SELECT 
                            *,
                            round(log(income),4) as VL_LOG_RENDA,
                            cast(joined as date) as DT_JOINED,
                            substr(cast(joined as date),6,2) as VL_MES_JOINED
                        FROM
                            clients_dfm                     
                     """
                     )
clients_00_dfm = clients_00.registerTempTable("clients_00_dfm")                     
clients_00.show()

In [None]:
loans_00 = spark.sql(
                     """SELECT 
                            client_id,
                            round(avg(loan_amount),4) as VL_MED_EMP_CLI,
                            min(loan_amount) as VL_MIN_EMP_CLI,
                            max(loan_amount) as VL_MAX_EMP_CLI
                        FROM
                            loans_dfm  
                        group by client_id     
                     """
                     )
loans_00_dfm = loans_00.registerTempTable("loans_00_dfm")                         
loans_00.show(5)

In [None]:
clientes_01 = spark.sql(
                     """SELECT 
                            a.*,
                            b.VL_MED_EMP_CLI,
                            b.VL_MIN_EMP_CLI,
                            b.VL_MAX_EMP_CLI
                        FROM
                            clients_00_dfm as a  
                        left join  loans_00_dfm as b
                        on a.client_id = b.client_id
                     """
                     )
                     
clientes_01.show()

In [None]:
loans_02 = spark.sql(
                     """SELECT 
                            a.*,
                            b.payment_amount,
                            b.payment_date,
                            b.missed
                        FROM
                            loans_dfm as a  
                        left join  payments_dfm as b
                        on a.loan_id = b.loan_id
                     """
                     )
loans_02_dfm = loans_02.registerTempTable("loans_02_dfm")                       
loans_02.show()

In [None]:
loans_03 = spark.sql(
                     """SELECT 
                            loan_type,
                            round(avg(payment_amount),2) as VL_MED_PAGAMENTOS,
                            count(*) as QT_CONTRATOS_TIPO
                        FROM
                            loans_02_dfm 
                        group by 1  
                     """
                     )
loans_03_dfm = loans_03.registerTempTable("loans_03_dfm")                       
loans_03.show()

In [None]:
loans_04 = spark.sql(
                     """SELECT 
                            client_id,
                            sum(case when loan_type in ('cash') then 1 else 0 end) as QT_EMP_CASH,
                            avg(case when loan_type in ('cash') then loan_amount else 0 end) as VL_MEDEMP_CASH,
                            count(*) as QT_CONTRATOS_TIPO
                        FROM
                            loans_02_dfm 
                        group by 1  
                     """
                     )
loans_04_dfm = loans_04.registerTempTable("loans_04_dfm")                       
loans_04.show()

In [None]:
import time
from datetime import datetime
from datetime import date, timedelta
dataref = datetime.now()-timedelta(0, 0, 0, 0, 0, +2)
anomesdia = dataref.strftime("%Y%m%d")
dataref
#print('Data Hoje: ',dataref.strftime("%Y%m%d"))

In [None]:
etl_final = spark.sql(
                     """SELECT 
                            *,
                            '{}' as PK_DATREF
                        FROM
                            loans_04_dfm 
                     """.format(anomesdia)
                     )
             
etl_final.show()

In [None]:
etl_final.write.parquet("etl_final_00.parquet")
# Verificar os arquivos no sistema hdfs:
# hdfs dfs -ls /user/zeppelin

In [None]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)

# Salvando a tabela como temporária no Hive
#hive_context.registerDataFrameAsTable(etl_final, "etl_final_00")

# Salvando a tabela no schema (database) treinamento como permanente do Hive (modo append)
etl_final.write.mode('append').saveAsTable("default.etl_final_00")
hive_context.sql("show tables in default").show()
#hive_context.sql("drop table etl_final_01_").show()

In [None]:
spark.sql("show databases").show()

In [None]:
spark.catalog.dropTempView("clients_00_dfm") 
spark.catalog.dropTempView("clients_dfm") 
spark.catalog.dropTempView("loans_00_dfm") 
spark.catalog.dropTempView("loans_02_dfm") 
spark.catalog.dropTempView("loans_03_dfm") 
spark.catalog.dropTempView("loans_04_dfm") 
spark.catalog.dropTempView("loans_dfm") 
spark.catalog.dropTempView("payments_dfm") 

In [None]:
hive_context.sql("show tables in default").show()

In [None]:
tab_hive = hive_context.sql("select * from default.etl_final_00").show()

In [None]:
# Salvando a tabela no schema (database) treinamento como permanente do Hive (modo append) particionado
etl_final.write.mode('append').partitionBy('PK_DATREF').saveAsTable("default.etl_final_01")

hive_context.sql("show tables in default").show()

In [None]:
# Ler partição Hive
df_hive_parquet = sqlContext.read.option("mergeSchema", "true").parquet("/user/hive/warehouse/etl_final_01/PK_DATREF=20200117")
df_hive_parquet.count()


In [None]:
df_hive_parquet.show()

In [None]:
nm_path_s3 = 's3://treinamento-big-data/'
Tabela='UserEJL'+anomesdia
etl_final.write.partitionBy('PK_DATREF').parquet(nm_path_s3 + Tabela , mode='append')

In [None]:
print('Exercício')
#Metadados:
#PK_CLIENTE
#Valor medio, minimo e maximo de contratos por tipo de emprestimo
#Valor médio pago por tipo de empréstimo
#Valor médio não pago por tipo empréstimo
#Duração (meses) média dos contratos 
#Quantidade média de pagamentos