# Import Libs

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

In [None]:
report_month = '2021-05-01'

# Base

In [None]:
%%time
df1 = spark.sql(""" SELECT DISTINCT CUST_ID,ACCT_NUM,PYMNT_DT,RPT_MTH FROM (
SELECT DISTINCT B.CUST_ID,B.ACCT_NUM,PYMNT_DT,RPT_MTH,
ROW_NUMBER() OVER (PARTITION BY B.CUST_ID, B.ACCT_NUM ORDER BY PYMNT_DT DESC) AS RN
FROM TABLE_NAME B INNER JOIN ANOTHER_TABLE CA
ON B.CUST_ID=CA.CUST_ID AND B.ACCT_NUM=CA.ACCT_NUM 
WHERE B.FIN_UPG_FLAG= 'Y' -- AND OTHER FILTERS
AND B.PYMNT_DT BETWEEN date_format('{0}','yyyy-MM-dd') and last_day('{0}')
AND (CA.ACCT_TERM_DT IS NULL OR CA.ACCT_TERM_DT > last_day(add_months('{0}', -1)))
AND B.RPT_MTH = date_format('{0}', 'yyyy-MM-01'))
WHERE RN=1""".format(report_month))
df1.createOrReplaceTempView('Base')

In [None]:
spark.sql('select count(distinct cust_id||acct_num) from base').show()

# Customer purchased Accessories(Target)

In [None]:
%%time
df4 = spark.sql("""SELECT * FROM (
SELECT DISTINCT CUST_ID,ACCT_NUM,EQP_CLASS_DESC,PYMNT_DT, 1 AS ACCESSORY,
ROW_NUMBER() OVER (PARTITION BY CUST_ID,ACCT_NUM ORDER BY PYMNT_DT DESC) AS RNUM
FROM TABLE_NAME WHERE 
EQP_CLASS_DESC = 'Accessories' AND
SLS_DIST_CHNL_TYPE_CD = 'N' AND
PYMNT_DT BETWEEN date_format('{0}','yyyy-MM-dd') AND last_day('{0}')
AND rpt_mth = '{0}')
WHERE RNUM=1""".format(report_month))
df4.createOrReplaceTempView('accessories_base')

# Join Base & Target

In [None]:
%%time
df5 = spark.sql("""SELECT A.*,B.ACCESSORY ACC_FLAG FROM BASE A LEFT JOIN ACCESSORIES_BASE B
ON A.CUST_ID = B.CUST_ID AND A.ACCT_NUM = B.ACCT_NUM AND A.PYMNT_DT = B.PYMNT_DT""".format(report_month))
df5.createOrReplaceTempView('acc_base')
print(df5.count(), len(df5.columns))

# Features

# 1st Entity

In [None]:
df6 = spark.sql("""SELECT * FROM (
SELECT DISTINCT A.CUST_ID,A.ACCT_NUM,COALESCE(B.TOTAL_LINES_ON_ACCT,0) AS TOTAL_LINES_ON_ACCT,
COALESCE(B.ACCT_ACTIVE_LOAN_CNT,0) AS ACCT_ACTIVE_LOAN_CNT,
COALESCE(B.ACCT_TENURE_MNTHS,0) AS ACCT_TENURE_MTHS,
ROW_NUMBER() OVER (PARTITION BY A.CUST_ID,A.ACCT_NUM ORDER BY date_format('{0}','yyyy-MM-dd') desc) AS RNUM
FROM ACC_BASE A LEFT JOIN FEATURE_TABLE B
ON A.CUST_ID = B.CUST_ID
WHERE B.RPT_MTH = date_format(add_months('{0}',-1), 'yyyy-MM-01')
AND B.BASE_MTH IS NOT NULL) AB
WHERE RNUM=1""".format(report_month))
df6.createorReplaceTempView('FEATURE_ENTITY1')
df6.printSchema()
print("Rown & Column:\n", df6.count(), len(df6.columns))

# 2nd Entity

In [None]:
%%time
df8 = spark.sql("""SELECT * FROM (
SELECT A.CUST_ID,A.ACCT_NUM,COALESCE(B.REV_LAST_2YR_PURCHASE,0) AS REV_LAST_2YR_PURCHASE,
COALESCE(B.TOT_ACCSSRY_PURCHASE,0) AS TOT_ACCSSRY_PURCHASE,
ROW_NUMBER() OVER(PARTITION BY A.CUST_ID,A.ACCT_NUM ORDER BY '{0}' DESC) AS RN1
FROM ACC_BASE A
LEFT JOIN
(SELECT * FROM 
  (SELECT CUST_ID,ACCT_NUM,SLS_DIST_CHNL_TYPE_CD,SUM(ITEM_PRICE_AMT) AS REV_LAST_2YR_PURCHASE,
  SUM(NET_SALES) AS TOT_ACCSSRY_PURCHASE
  FROM TABLE WHERE
  LOWER(EQP_CLASS_DESC) = 'accessories' AND 
  PYMNT_DT>=date_format(add_months('{0}',-24),'yyyy-MM-dd') AND PYMNT_DT<date_format('{0}','yyyy-MM-dd')
  AND RPT_MTH<'{0}'
  GROUP BY 1,2,3) AB
  WHERE TOT_ACCSSRY_PURCHASE>0)B
  ON A.CUST_ID=B.CUST_ID AND A.ACCT_NUM=B.ACCT_NUM)X
  WHERE RN1=1""".format(report_month))
df8.createOrReplaceTempView('FEATURE_ENTITY2')


# Join all entities

In [None]:
%%time
df200 = spark.sql("""
SELECT DISTINCT A.CUST_ID,A.ACCT_NUM,A.ACC_FLAG,
COALESCE(B.TOTAL_LINES_ON_ACCT,0) AS TOTAL_LINES_ON_ACCT,
COALESCE(B.ACCT_ACTIVE_LOAN_CNT,0) AS ACCT_ACTIVE_LOAN_CNT,
COALESCE(B.ACCT_TENURE_MTHS,0) AS ACCT_TENURE_MTHS,
COALESCE(ROUND(D.REV_LAST_2YR_PURCHASE,2),0) AS REV_LAST_2YR_PURCHASE,
COALESCE(ROUND(D.TOT_ACCSSRY_PURCHASE,2),0) AS TOT_ACCSSRY_PURCHASE
FROM ACC_BASE A LEFT JOIN FEATURE_ENTITY1 B
ON A.CUST_ID = B.CUST_ID AND A.ACCT_NUM = B.ACCT_NUM
LEFT JOIN FEATURE_ENTITY2 D
ON A.CUST_ID = D.CUST_ID AND A.ACCT_NUM = D.ACCT_NUM
""")
df200.createOrReplaceTempView('FINAL_DATASET')
df200.printSchema()
print("Rows & Columns:\n", df200.count(), len(df200.columns))

# Write to parquet

In [None]:
%%time
d = spark.sql("SELECT * FROM FINAL_DATASET")
d.write.parquet("path/final_dataset.parquet")

# Grant Read only access to all

In [None]:
!hdfs dfs -chmod -R 777 'file_path'