# Create datasets e import in Hive - transfer_monthly_balance

## Initial imports

In [1]:
 import java.io.File
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext

val hiveContext = new HiveContext(sc)
val warehouseLocation = "hdfs://namenode:8020/user/hive/warehouse"
val data_base = "db_nubank"
var table = ""
var dir_path = warehouseLocation + "/" + data_base + ".db/" + table
var db_table = data_base + "." + table

spark.sql("use db_nubank")

Intitializing Scala interpreter ...

Spark Web UI available at http://8ea9f1e3afef:4040
SparkContext available as 'sc' (version = 3.0.2, master = local[*], app id = local-1615150323907)
SparkSession available as 'spark'


import java.io.File
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3b5516a0
warehouseLocation: String = hdfs://namenode:8020/user/hive/warehouse
data_base: String = db_nubank
table: String = ""
dir_path: String = hdfs://namenode:8020/user/hive/warehouse/db_nubank.db/
db_table: String = db_nubank.
res0: org.apache.spark.sql.DataFrame = []


## Import data to datasets (transfer tables) and union all

In [2]:
val sql_transfer_ins = """select  t.id,t.account_id,t.status,t.amount
,time.action_timestamp, mon.action_month, year.action_year
,0 flag_is_pix
from transfer_ins t
inner join d_time time
on t.transaction_completed_at = time.time_id
inner join d_month mon
on mon.month_id = time.month_id
inner join d_year year
on year.year_id = time.year_id"""

val sql_transfer_outs = """select t.id,t.account_id,t.status,t.amount*-1
,time.action_timestamp, mon.action_month, year.action_year
,0 flag_is_pix
from transfer_outs t
inner join d_time time
on t.transaction_completed_at = time.time_id
inner join d_month mon
on mon.month_id = time.month_id
inner join d_year year
on year.year_id = time.year_id"""

val sql_pix_movements = """select t.id,t.account_id,t.status
,case when t.in_or_out = 'pix_out' then t.pix_amount*-1
  else t.pix_amount end as amount, time.action_timestamp, mon.action_month, year.action_year
,1 flag_is_pix
from pix_movements t
inner join d_time time
on t.pix_completed_at = time.time_id
inner join d_month mon
on mon.month_id = time.month_id
inner join d_year year
on year.year_id = time.year_id"""




val df_transfer_ins = spark.sql(sql_transfer_ins)
val df_transfer_outs = spark.sql(sql_transfer_outs)
val df_pix_movements = spark.sql(sql_pix_movements)
var df_transfer_union = df_transfer_ins.union(df_transfer_outs.union(df_pix_movements))

// Test accounts without transfer on month 2
// df_transfer_union = df_transfer_union.where(!($"action_month" === 2))

df_transfer_union.createOrReplaceTempView("tmp_transfer_union")

sql_transfer_ins: String =
select  t.id,t.account_id,t.status,t.amount
,time.action_timestamp, mon.action_month, year.action_year
,0 flag_is_pix
from transfer_ins t
inner join d_time time
on t.transaction_completed_at = time.time_id
inner join d_month mon
on mon.month_id = time.month_id
inner join d_year year
on year.year_id = time.year_id
sql_transfer_outs: String =
select t.id,t.account_id,t.status,t.amount*-1
,time.action_timestamp, mon.action_month, year.action_year
,0 flag_is_pix
from transfer_outs t
inner join d_time time
on t.transaction_completed_at = time.time_id
inner join d_month mon
on mon.month_id = time.month_id
inner join d_year year
on year.year_id = time.year_id
sql_pix_movements: String =
select t.id,t.account_id,t.status
,case when t.in_or_out = 'pix_out' then t.pix_a...


## Includes months without transactions

In [3]:
val sql_accounts_month_year = """select * from accounts,d_month,d_year
                     where concat(cast(year(created_at) as string),lpad(cast(month(created_at) as string),2,'0'))
                           <= concat(cast(action_year as string),lpad(cast(action_month as string),2,'0'))
                    """
val df_accounts_month_year = spark.sql(sql_accounts_month_year)
df_accounts_month_year.createOrReplaceTempView("tmp_accounts")

val sql_transfer = """select 
                      ac.account_id
                     ,ac.account_branch
                     ,ac.account_check_digit
                     ,ac.account_number
                     ,coalesce(tr.amount,0) amount
                     ,coalesce(tr.action_timestamp
                          ,cast(
                              concat(cast(ac.action_year as string)
                                    ,'-'
                                    ,lpad(cast(ac.action_month as string),2,'0')
                                    ,'-01')                             
                                    as timestamp)
                              ) as action_timestamp
                     ,ac.action_month
                     ,ac.action_year
                     ,tr.flag_is_pix
                     ,coalesce(tr.status,"completed") status
                    from tmp_accounts ac
                    left join tmp_transfer_union tr
                    on ac.account_id = tr.account_id
                    and ac.action_month = tr.action_month
                    and ac.action_year = tr.action_year
                    """

val df_transfer = spark.sql(sql_transfer)
df_transfer.createOrReplaceTempView("tmp_transfer")

sql_accounts_month_year: String =
"select * from accounts,d_month,d_year
                     where concat(cast(year(created_at) as string),lpad(cast(month(created_at) as string),2,'0'))
                           <= concat(cast(action_year as string),lpad(cast(action_month as string),2,'0'))
                    "
df_accounts_month_year: org.apache.spark.sql.DataFrame = [account_id: string, customer_id: string ... 9 more fields]
sql_transfer: String =
"select
                      ac.account_id
                     ,ac.account_branch
                     ,ac.account_check_digit
                     ,ac.account_number
                     ,coalesce(tr.amount,0) amount
                     ,coalesce(tr.action_timestamp
                          ,cast(
                              concat(...


## Calculates amount_balance column

In [4]:
val sql_balance = """
 select
  t1.account_id
 ,t1.account_branch
 ,t1.account_check_digit
 ,t1.account_number
 ,t1.amount
 ,t1.action_timestamp
 ,t1.action_month
 ,t1.action_year
 ,t1.flag_is_pix
 ,t1.status
 ,round(sum(t2.amount),2) amount_balance
 from tmp_transfer as t1
 inner join tmp_transfer as t2
 ON t1.action_timestamp >= t2.action_timestamp
 and t1.account_id = t2.account_id
 WHERE t1.status = "completed"
 GROUP BY
  t1.account_id
 ,t1.account_branch
 ,t1.account_check_digit
 ,t1.account_number
 ,t1.amount
 ,t1.action_timestamp
 ,t1.action_month
 ,t1.action_year
 ,t1.flag_is_pix
 ,t1.status
"""
var df_transfer_balance = spark.sql(sql_balance).persist()

df_transfer_balance.printSchema()

df_transfer_balance.createOrReplaceTempView("tmp_transfer_balance")

df_transfer_balance.show(10,false)

root
 |-- account_id: string (nullable = true)
 |-- account_branch: string (nullable = true)
 |-- account_check_digit: string (nullable = true)
 |-- account_number: string (nullable = true)
 |-- amount: float (nullable = false)
 |-- action_timestamp: timestamp (nullable = true)
 |-- action_month: integer (nullable = true)
 |-- action_year: string (nullable = true)
 |-- flag_is_pix: integer (nullable = true)
 |-- status: string (nullable = false)
 |-- amount_balance: double (nullable = true)

+-------------------+--------------+-------------------+--------------+--------+----------------------+------------+-----------+-----------+---------+--------------+
|account_id         |account_branch|account_check_digit|account_number|amount  |action_timestamp      |action_month|action_year|flag_is_pix|status   |amount_balance|
+-------------------+--------------+-------------------+--------------+--------+----------------------+------------+-----------+-----------+---------+--------------+
|1029

sql_balance: String =
"
 select
  t1.account_id
 ,t1.account_branch
 ,t1.account_check_digit
 ,t1.account_number
 ,t1.amount
 ,t1.action_timestamp
 ,t1.action_month
 ,t1.action_year
 ,t1.flag_is_pix
 ,t1.status
 ,round(sum(t2.amount),2) amount_balance
 from tmp_transfer as t1
 inner join tmp_transfer as t2
 ON t1.action_timestamp >= t2.action_timestamp
 and t1.account_id = t2.account_id
 WHERE t1.status = "completed"
 GROUP BY
  t1.account_id
 ,t1.account_branch
 ,t1.account_check_digit
 ,t1.account_number
 ,t1.amount
 ,t1.action_timestamp
 ,t1.action_month
 ,t1.action_year
 ,t1.flag_is_pix
 ,t1.status
"
df_transfer_balance: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [account_id: string, account_branch: string ... 9 more fields]


## Generates the "Account Monthly Balance"

In [5]:
var sql = """
select   
 t1.account_id
,t1.account_branch
,t1.account_check_digit
,t1.account_number
,t1.action_month
,t1.action_year
,t1.amount_balance
from tmp_transfer_balance t1
inner join 
 (select account_id,action_month,action_year,max(action_timestamp) action_timestamp 
  from tmp_transfer_balance
  group by  account_id,action_month,action_year) t2
on t1.account_id = t2.account_id
and t1.action_timestamp = t2.action_timestamp
where 1=1
--and t1.account_id = "2481861278853952512" 
and t1.action_year = 2020
order by t1.account_id, action_year, action_month
"""

val df_transfer_monthly_balance = spark.sql(sql).persist()
df_transfer_monthly_balance.show()
// df_transfer_monthly_balance.createOrReplaceTempView("tmp_transfer_monthly_balance")

+-------------------+--------------+-------------------+--------------+------------+-----------+--------------+
|         account_id|account_branch|account_check_digit|account_number|action_month|action_year|amount_balance|
+-------------------+--------------+-------------------+--------------+------------+-----------+--------------+
|1000667155163612544|          1565|                  7|         52530|           1|       2020|       5159.38|
|1000667155163612544|          1565|                  7|         52530|           2|       2020|      10677.62|
|1000667155163612544|          1565|                  7|         52530|           3|       2020|      11662.26|
|1000667155163612544|          1565|                  7|         52530|           4|       2020|      11585.97|
|1000667155163612544|          1565|                  7|         52530|           5|       2020|      12496.18|
|1000667155163612544|          1565|                  7|         52530|           6|       2020|       -

sql: String =
"
select
 t1.account_id
,t1.account_branch
,t1.account_check_digit
,t1.account_number
,t1.action_month
,t1.action_year
,t1.amount_balance
from tmp_transfer_balance t1
inner join
 (select account_id,action_month,action_year,max(action_timestamp) action_timestamp
  from tmp_transfer_balance
  group by  account_id,action_month,action_year) t2
on t1.account_id = t2.account_id
and t1.action_timestamp = t2.action_timestamp
where 1=1
--and t1.account_id = "2481861278853952512"
and t1.action_year = 2020
order by t1.account_id, action_year, action_month
"
df_transfer_monthly_balance: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [account_id: string, account_branch: string ... 5 more fields]


## Saving results

In [7]:
df_transfer_monthly_balance
    .repartition(1)
    .write
    .option("header", "true")
    .csv("file:////dataset/nubank/out_transfer_monthly_balance")

df_transfer_union
    .repartition(1)
    .write
    .option("header", "true")
    .csv("file:////dataset/nubank/out_transfer_union")

table = "transfer_monthly_balance"
dir_path = warehouseLocation + "/" + data_base + ".db/" + table
db_table = data_base + "." + table

spark.sql("DROP TABLE IF EXISTS " + db_table)
df_transfer_monthly_balance.write
  .option("path", dir_path)
  .mode("Overwrite")
  .saveAsTable(db_table)

table = "transfer_union"
dir_path = warehouseLocation + "/" + data_base + ".db/" + table
db_table = data_base + "." + table

spark.sql("DROP TABLE IF EXISTS " + db_table)
df_transfer_union.write
  .option("path", dir_path)
  .mode("Overwrite")
  .saveAsTable(db_table)

table: String = transfer_union
dir_path: String = hdfs://namenode:8020/user/hive/warehouse/db_nubank.db/transfer_union
db_table: String = db_nubank.transfer_union
table: String = transfer_union
dir_path: String = hdfs://namenode:8020/user/hive/warehouse/db_nubank.db/transfer_union
db_table: String = db_nubank.transfer_union


In [None]:
spark.sql(sql).explain()