**TUGAS FILTERING DATA DI HUE (Mba Arniz)**

In [None]:
SELECT cifno AS CIF, acctno AS "nomor rekening",branch ,cbal_base ,`class` AS "tipe nasabah" ,group_type ,ds ,status ,open_date 
FROM datalake.asrs_fact_savingmaster
WHERE group_type='Britama' AND status=1 AND ds='202102' LIMIT 100;

**TUGAS FILTERING DATA DI DSW (Mba Ratih)**

In [None]:
from pyspark.sql import SparkSession, SQLContext, HiveContext
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from calendar import monthrange
import datetime as dt
import pandas as pd
from dateutil.relativedelta import relativedelta
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName(
  "Arniz - ETL Product Recommendation"
).config(
  "spark.dynamicAllocation.enabled", "false"
).config(
  "spark.executor.instances", "2"
).config(
  "spark.executor.cores", "2"
).config(
  "spark.executor.memory", "4g"
).config(
  "spark.network.timeout", 60
).config(
  "spark.yarn.executor.memoryOverhead", "4g"
).config(
  "spark.driver.maxResultSize", "2g"
).enableHiveSupport(
).getOrCreate(
)

sqlContext = SQLContext(spark)
pd.options.display.html.table_schema = True
pd.options.display.max_rows = 999

df= spark.sql("""
SELECT a.cifno AS cif ,a.acctno AS "nomor rekening", a.branch, a.cbal_base,a.`class` AS "tipe nasabah", a.group_type,a.ds,a.status,a.open_date, 
b.tipe_nasabah_desc, b.nama_lengkap, b.jenis_kelamin,b.status_nikah,b.npwp,
b.kecamatan_id,b.kelurahan_id,b.handphone,b.pendidikan,b.jenis_pekerjaan,b.nama_kantor,
b.kode_bidang_pekerjaan,b.bidang_pekerjaan,b.kode_jabatan,b.jabatan,b.penghasilan_per_bulan,
b.omset_per_bulan,b.sumber_penghasilan,b.tujuan_buka_rekening,b.kode_tujuan_buka_rekening, 
CAST(DATEDIFF(now(), FROM_UNIXTIME(UNIX_TIMESTAMP(b.tanggal_lahir , 'dd-MM-yyyy'))) /365.25 AS INT) AS usia,
CONCAT_WS(',',trim(b.alamat_id1),trim(b.alamat_id2),trim(b.alamat_id3),trim(b.alamat_id4)) AS alamat
FROM datalake.asrs_fact_savingmaster AS a
LEFT JOIN datalake.`6969_crm_dim_dly_customer_info` AS b
ON a.cifno=b.`cfcif#`
WHERE a.group_type='Britama' AND a.status=1 AND a.ds='202102' AND a.`class`='I' 
AND year(a.open_date)= 2020
""")
df5.show(5)
df.write.mode('overwrite').format('parquet').saveAsTable('temp.tabel_simpanan_midah')


**TUGAS OPTIMALISASI QUERY (Mba Rizka Azmira)**

In [None]:
#=========================================# 
#           Import Dependencies           #
#=========================================#

from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import *
from dateutil.relativedelta import relativedelta
import datetime
from datetime import *
import pandas as pd
import matplotlib.pyplot as plt
#import seaborn as sns
import numpy as np
from time import sleep

#=========================================#
#              Pandas Config              #
#=========================================#

pd.options.display.max_columns = 999
pd.options.display.max_rows = 999
pd.options.display.html.table_schema = True

#=========================================# 
#              Spark Session              #
#=========================================#

spark = SparkSession\
  .builder\
  .appName("Nanda - Credit Debit Ceria")\
  .config('spark.dynamicAllocation.enabled', 'false')\
  .config('spark.executor.instances', '6')\
  .config('spark.executor.cores', '5')\
  .config('spark.executor.memory', '10g')\
  .config('spark.yarn.executor.memoryOverhead', '8g')\
  .enableHiveSupport()\
  .getOrCreate()
  
#algoritma
#1. ambil whitelist_account_number menjadi acctno from datamart.ceria_Master_customer
#2. ngambil beberapa fitur dari datalake.`8000_rc_trx_as4_ddhist by join acctno
#3. ddhist difilter 2020 dan 2021 dan month < 4
#4. dari langkah 2 dan 3, menghitung transaksi keluar (Debit) dan transaksi masuk (Credit) dalam satu bulan


spark.sql('drop table if exists temp.try_midah_ceria3')

list_ds = ['202001', '202002','202003','202004','202005','202006','202007','202008','202009','202010','202011','202012',
           '202101','202102','202103','202104']

for i in range(16):
  ds = list_ds[i]
  print(ds)
  
  #mengabil whitelist acc
  acc_whitelist=spark.sql("""
  select CAST(whitelist_account_number AS DECIMAL(19,0)) AS acctno 
  FROM datamart.ceria_Master_customer
  """)
  acc_whitelist.createOrReplaceTempView('acc_whitelist')

  #filter ddhist
  df_ddhist=spark.read.table('datalake.`8000_rc_trx_as4_ddhist`')\
                .filter("concat(year,lpad(month,2,'0')) like '{}%'".format(ds))
  df_ddhist.createOrReplaceTempView('df_ddhist')

  #join untuk mengambil fitur di ddmast
  fitur_whitelist=spark.sql("""
  SELECT DISTINCT tracct,trdorc,trdate,
                      CONCAT(year, '-', month, '-', day) AS dt,
                      trtime,trremk,seq,amt
  FROM acc_whitelist a
  LEFT JOIN df_ddhist b
  ON trim(a.acctno) = trim(b.tracct)""")
  fitur_whitelist.createOrReplaceTempView('df_whitelist_features')

  table_final = spark.sql("""
  SELECT tracct,TRUNC(dt, 'month') AS month_dt,
             SUM(CASE WHEN trdorc = 'D' THEN amt END) AS debit_amt,
             SUM(CASE WHEN trdorc = 'C' THEN amt END) AS credit_amt,
             SUM(CASE WHEN trdorc = 'C' THEN amt ELSE 0 END) - SUM(CASE WHEN trdorc = 'D' THEN amt ELSE 0 END) AS disp_income_amt 
  FROM df_whitelist_features
  GROUP BY tracct,TRUNC(dt, 'month')""")

  #write to HDFS
  table_final\
    .write\
    .mode('append')\
    .format('parquet')\
    .saveAsTable("temp.try_midah_ceria3")\

  spark.sql('refresh table temp.try_midah_ceria3')  
  table_final.show(1000)