In [None]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pandas as pd
from pyspark.sql.window import Window
import os
# os.popen("echo $JAVA_HOME").read()
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

## RDD

In [None]:
#Create spark session
spark = SparkSession.builder.appName("challenge_nupay").getOrCreate()

In [None]:
#Load data as pandas
data_acc = pd.read_csv("tables/accounts/acc.csv")
data_city = pd.read_csv("tables/city/city.csv")
data_country = pd.read_csv("tables/country/country.csv")
data_customers = pd.read_csv("tables/customers/customers.csv")
data_d_month = pd.read_csv("tables/d_month/d_month.csv")
data_d_time = pd.read_csv("tables/d_time/d_time.csv")
data_d_week = pd.read_csv("tables/d_week/d_week.csv")
data_d_weekday = pd.read_csv("tables/d_weekday/d_weekday.csv")
data_d_year = pd.read_csv("tables/d_year/d_year.csv")
data_pix_mov = pd.read_csv("tables/pix_movements/pix_mov.csv")
data_state = pd.read_csv("tables/state/state.csv")
data_tran_in = pd.read_csv("tables/transfer_ins/tran_in.csv")
data_tran_out = pd.read_csv("tables/transfer_outs/tran_out.csv")

In [None]:
#Convert as spark DF
df_acc = spark.createDataFrame(data_acc)
df_city = spark.createDataFrame(data_city)
df_country = spark.createDataFrame(data_country)
df_customers = spark.createDataFrame(data_customers)
df_d_month = spark.createDataFrame(data_d_month)
df_d_time = spark.createDataFrame(data_d_time)
df_d_week = spark.createDataFrame(data_d_week)
df_d_weekday = spark.createDataFrame(data_d_weekday)
df_d_year = spark.createDataFrame(data_d_year)
df_pix_mov = spark.createDataFrame(data_pix_mov)
df_state = spark.createDataFrame(data_state)
df_tran_in = spark.createDataFrame(data_tran_in)
df_tran_out = spark.createDataFrame(data_tran_out).withColumnRenamed("amount", "amount_out")

#Named months
df_nome_mes = spark.createDataFrame(\
[("1", "Jan"),("2", "Feb"), ("3", "Mar"), ("4", "Apr"), ("5", "May"), \
 ("6", "Jun"), ("7", "Jul"), ("8", "Aug"), ("9", "Sep"), ("10", "Oct"), ("11", "Nov"), ("12", "Dec")], ["action_month", "month_name"])

In [None]:
df_d_weekday.show()

## Non explicit SQL

In [None]:
#Get only completed transactions
df_tran_out = df_tran_out.filter(f.col("status") == "completed")
df_tran_in = df_tran_in.filter(f.col("status") == "completed")
df_pix_mov = df_pix_mov.filter(f.col("status") == "completed")

In [None]:
#Get month of transactions (in and out)
df_in_time = df_tran_in.join(df_d_time, on=[df_tran_in.transaction_requested_at == df_d_time.time_id])
df_out_time = df_tran_out.join(df_d_time, on=[df_tran_out.transaction_requested_at == df_d_time.time_id])

#pix session
df_pix_mov_in = df_pix_mov.filter(f.col("in_or_out").like('%_in'))
df_pix_mov_out = df_pix_mov.filter(f.col("in_or_out").like('%_out'))

df_pix_mov_in_time = df_pix_mov_in.join(df_d_time, on=[df_pix_mov_in.pix_requested_at == df_d_time.time_id])
df_pix_mov_out_time = df_pix_mov_out.join(df_d_time, on=[df_pix_mov_out.pix_requested_at == df_d_time.time_id])

In [None]:
df_pix_mov_in_time.select("account_id", "pix_amount", "month_id").show(4)

In [None]:
df_in_time.select("account_id", "amount", "month_id").show(4)

In [None]:
df_out_time.select("account_id", "amount_out", "month_id").show(4)

In [None]:
#Grouped by account and month
df_in_time_group = df_in_time.groupby("account_id", "month_id").agg(f.sum("amount").alias("soma_entrada"))
df_out_time_group = df_out_time.groupby("account_id", "month_id").agg(f.sum("amount_out").alias("soma_saida"))


#pix session
df_pix_mov_in_time_group = df_pix_mov_in_time.groupby("account_id", "month_id").agg(f.sum("pix_amount").alias("soma_entrada_pix"))
df_pix_mov_out_time_group = df_pix_mov_out_time.groupby("account_id", "month_id").agg(f.sum("pix_amount").alias("soma_saida_pix"))

#join in and out (pix and nonpix)
df_joined_in = df_pix_mov_in_time_group.join(df_in_time_group, on=["account_id", "month_id"], how="left").fillna(0)
df_joined_out = df_pix_mov_out_time_group.join(df_out_time_group, on=["account_id", "month_id"], how="left").fillna(0)


#sum in
df_joined_in_sum = df_joined_in.withColumn("soma_in", f.col("soma_entrada_pix") + f.col("soma_entrada"))
df_joined_out_sum = df_joined_out.withColumn("soma_out", f.col("soma_saida_pix") + f.col("soma_saida"))

In [None]:
df_joined_in_sum.show(4)

In [None]:
df_joined_out_sum.show(4)

In [None]:
#Join IN and OUT
df_in_out_grouped = df_joined_in_sum.join(df_joined_out_sum, on=["account_id", "month_id"], how="left").\
select("account_id", "month_id", "soma_entrada", "soma_saida").fillna(0)

In [None]:
df_in_out_grouped.show(4)

In [None]:
#Balance
df_balance = df_in_out_grouped.withColumn("saldo", f.col("soma_entrada")-f.col("soma_saida"))

In [None]:
df_balance.show(4)

In [None]:
#Get action month
df_balance_action_month = df_balance.join(df_d_month, on=["month_id"]).select("account_id", "saldo", "action_month")

In [None]:
#Calculate balance using window 
window  = Window.partitionBy("account_id").orderBy("action_month")
df_balance_action_month_win = df_balance_action_month.withColumn("antes", f.lag("saldo").over(window)) \
    .fillna(0).withColumn("saldo_final", f.col("saldo") + f.col("antes"))


In [None]:
df_balance_action_month_win.show()

In [None]:
#Get Month name
df_final = df_balance_action_month_win.join(df_nome_mes, on=["action_month"])\
.drop("action_month", "saldo", "antes")\
.withColumnRenamed("saldo_final", "saldo")\
.withColumnRenamed("month_name", "mes")

In [None]:
df_final.show()

## Explicit SQL

In [None]:
df_acc.registerTempTable("df_acc")
df_city.registerTempTable("df_city")
df_country.registerTempTable("df_country")
df_customers.registerTempTable("df_customers")
df_d_month.registerTempTable("df_d_month")
df_d_time.registerTempTable("df_d_time")
df_d_week.registerTempTable("df_d_week")
df_d_weekday.registerTempTable("df_d_weekday")
df_d_year.registerTempTable("df_d_year")
df_pix_mov.registerTempTable("df_pix_mov")
df_state.registerTempTable("df_state")
df_tran_in.registerTempTable("df_tran_in")
df_tran_out.registerTempTable("df_tran_out")
df_nome_mes.registerTempTable("df_nome_mes")

In [None]:
df_pix_mov.printSchema()

In [None]:
df = spark.sql("select account_id, saldo_final as saldo, \
                  CASE mes \
                  WHEN '1' THEN 'Jan' \
                  WHEN '2' THEN 'Feb' \
                  WHEN '3' THEN 'Mar' \
                  WHEN '4' THEN 'Apr' \
                  WHEN '5' THEN 'May' \
                  WHEN '6' THEN 'Jun' \
                  WHEN '7' THEN 'Jul' \
                  WHEN '8' THEN 'Aug' \
                  WHEN '9' THEN 'Sep' \
                  WHEN '10' THEN 'Oct' \
                  WHEN '11' THEN 'Nov' \
                  WHEN '12' THEN 'Dec' \
                  END as mes from (select account_id, saldo, \
                  action_month as mes,\
                  coalesce(LAG(saldo,1) OVER (PARTITION BY account_id ORDER BY action_month),0) as antes,\
                  (saldo) + (coalesce(LAG(saldo,1) OVER (PARTITION BY account_id ORDER BY action_month),0)) as saldo_final\
                  from (select entrada.account_id, (coalesce(entrada.soma_entrada, 0) - coalesce(saida.soma_saida, 0)) as saldo, entrada.month_id from (select account_id, \
                  (coalesce(soma_entrada, 0) + coalesce(soma_pix_entrada, 0)) as soma_entrada,\
                  month_id from\
                  (select t_in_time.account_id, coalesce(t_in_time.soma_entrada, 0) as soma_entrada, t_in_time.month_id, pix_in_time.soma_pix_entrada from \
                        (select t_in.account_id, \
                                sum(t_in.amount) as soma_entrada, \
                                d_time.month_id \
                                from df_tran_in t_in \
                                inner join df_d_time d_time on (t_in.transaction_requested_at = d_time.time_id) \
                                where t_in.status = 'completed' group by t_in.account_id, d_time.month_id) t_in_time \
                        left join \
                        (select pix.account_id, \
                                sum(pix.pix_amount) as soma_pix_entrada, \
                                d_time.month_id \
                                from df_pix_mov pix \
                                inner join df_d_time d_time on (pix.pix_requested_at = d_time.time_id) \
                                where pix.status = 'completed' and pix.in_or_out like '%_in' group by pix.account_id, d_time.month_id) pix_in_time \
                        on (t_in_time.account_id = pix_in_time.account_id and t_in_time.month_id = pix_in_time.month_id))) entrada left join (\
         select account_id, \
                  (coalesce(soma_saida, 0) + coalesce(soma_pix_saida, 0)) as soma_saida,\
                  month_id from\
                  (select t_out_time.account_id, coalesce(t_out_time.soma_saida,0) as soma_saida, t_out_time.month_id, pix_out_time.soma_pix_saida from \
                        (select t_out.account_id, \
                                sum(t_out.amount_out) as soma_saida, \
                                d_time.month_id \
                                from df_tran_out t_out \
                                inner join df_d_time d_time on (t_out.transaction_requested_at = d_time.time_id) \
                                where t_out.status = 'completed' group by t_out.account_id, d_time.month_id) t_out_time \
                        left join \
                        (select pix.account_id, \
                                sum(pix.pix_amount) as soma_pix_saida, \
                                d_time.month_id \
                                from df_pix_mov pix \
                                inner join df_d_time d_time on (pix.pix_requested_at = d_time.time_id) \
                                where pix.status = 'completed' and pix.in_or_out like '%_out' group by pix.account_id, d_time.month_id) pix_out_time \
                        on (t_out_time.account_id = pix_out_time.account_id and t_out_time.month_id = pix_out_time.month_id))) saida on (entrada.account_id = \
                        saida.account_id and entrada.month_id = saida.month_id)) saldo inner join df_d_month mes on (saldo.month_id = mes.month_id))")

In [None]:
df.filter(f.col("account_id") == "24030010077126692").show()