####Import required libraries and set spark context

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [0]:
spark = SparkSession.builder.appName('Read CSV').getOrCreate()

####Import data set of customer into dataframe

In [0]:
df_customer=sqlContext.sql("SELECT * FROM arl_feature_store.customer_data_credit_debit_src")

####Display data set of the customer

In [0]:
display(df_customer)
df_customer.printSchema()


customer_id,age,location,gender,transacted_amount,transacted_date,credit/debit,zip,ssn,transacted_at,fname,lname,email
CUST301488,36,MN,M,12832,2022-09-19,1,419154,8887475444,Pay with Tap,James,Rogers,james.rogers@olson.com
CUST235925,38,MA,F,48233,2022-10-23,1,199520,7323084732,Pay with Tap,Margaret,Vasquez,margaret.vasquez@day.com
CUST768307,41,WA,M,46000,2023-02-07,1,522663,1425823974,Zelle QR Code,Derek,Rivera,derek.rivera@greene.com
CUST854589,18,OK,F,54991,2023-01-22,1,236362,8475385450,Help Center,Sarah,Campbell,sarah.campbell@reese.com
CUST301488,36,MN,M,71251,2022-09-21,1,419154,8887475444,Food Street,James,Rogers,james.rogers@olson.com
CUST235925,38,MA,F,35028,2023-01-25,1,199520,7323084732,Fuel,Margaret,Vasquez,margaret.vasquez@day.com
CUST768307,41,WA,M,22869,2023-02-13,1,522663,1425823974,Cloths,Derek,Rivera,derek.rivera@greene.com
CUST854589,18,OK,F,49695,2023-03-27,1,236362,8475385450,Pay with Tap,Sarah,Campbell,sarah.campbell@reese.com
CUST301488,36,MN,M,73480,2022-11-03,1,419154,8887475444,Pay with Tap,James,Rogers,james.rogers@olson.com
CUST235925,38,MA,F,13099,2022-07-16,1,199520,7323084732,Zelle QR Code,Margaret,Vasquez,margaret.vasquez@day.com


root
 |-- customer_id: string (nullable = true)
 |-- age: long (nullable = true)
 |-- location: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- transacted_amount: long (nullable = true)
 |-- transacted_date: date (nullable = true)
 |-- credit/debit: long (nullable = true)
 |-- zip: long (nullable = true)
 |-- ssn: long (nullable = true)
 |-- transacted_at: string (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- email: string (nullable = true)



####Write function to filter data based on date range provided and to filter by credit transaction if credit/debit=1 then credit credit/debit=0 then debit. In this notebook we are concentrating on credit transaction.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, StringType
from pytz import timezone

def filter_df_by_date_credit(df, ts_column, credit_debit, start_date, end_date):
    df = df.filter(col(credit_debit)== 1)
    if ts_column and start_date:
        df = df.filter(col(ts_column) <= start_date)
    if ts_column and end_date:
        df = df.filter(col(ts_column) >= end_date)
    return df
 

####Data preparation to pass parameters for 7 days, 30 days and 60 days

In [0]:
from datetime import *
start_date=date.today()
print(start_date)
end_date_7_days=start_date-timedelta(days=7)
print(end_date_7_days)
end_date_30_days=start_date-timedelta(days=30)
print(end_date_30_days)
end_date_60_days=start_date-timedelta(days=60)
print(end_date_60_days)

2023-02-01
2023-01-25
2023-01-02
2022-12-03


####Check if data filtering function is working properly

In [0]:
from datetime import datetime
filtered_data= filter_df_by_date_credit(
    df_customer, ts_column="transacted_date", credit_debit="credit/debit", start_date=start_date, end_date=end_date_60_days)
display(filtered_data)

customer_id,age,location,gender,transacted_amount,transacted_date,credit/debit,zip,ssn,transacted_at,fname,lname,email
CUST854589,18,OK,F,54991,2023-01-22,1,236362,8475385450,Help Center,Sarah,Campbell,sarah.campbell@reese.com
CUST235925,38,MA,F,35028,2023-01-25,1,199520,7323084732,Fuel,Margaret,Vasquez,margaret.vasquez@day.com
CUST854589,36,MN,M,14554,2023-01-16,1,419154,8887475444,Cloths,James,Rogers,james.rogers@olson.com
CUST235925,36,MN,M,87181,2022-12-25,1,419154,8887475444,Help Center,James,Rogers,james.rogers@olson.com
CUST854589,18,OK,F,21406,2023-01-25,1,236362,8475385450,Pay with Tap,Sarah,Campbell,sarah.campbell@reese.com
CUST301488,36,MN,M,54401,2022-12-22,1,419154,8887475444,Zelle QR Code,James,Rogers,james.rogers@olson.com


#### Build feature to get sum of amount transacted by each customer over given period of time(input)

In [0]:
def transacted_amt_by_customer(df, ts_column, credit_debit, start_date, end_date):

    """
    Computes the total amount transacted by customer over the time frame passed as input parameter
    """
    df = filter_df_by_date_credit(df, ts_column, credit_debit, start_date, end_date)
 
    trans_amt_feature = (
        df.groupBy(
            "customer_id") 
        .agg(
            sum("transacted_amount").alias("amount_transacted_by_cust_over_given_period"),
            count("transacted_date").alias("total_no_of_transactions"),
        )
        .select(
            col("customer_id").alias("cust_id"),
            col("amount_transacted_by_cust_over_given_period").cast(FloatType()),
            col("total_no_of_transactions").cast(IntegerType())
        )
    )
    trans_amt_feature=trans_amt_feature.withColumn("todays_date",current_date())

    return trans_amt_feature

####Run the code to realise the feature and store it in data frame for 7, 30 and 60 days

In [0]:
from datetime import datetime
transacted_amt_feature_credit_7_days = transacted_amt_by_customer(
    df_customer, ts_column="transacted_date", credit_debit="credit/debit", start_date=start_date, end_date=end_date_7_days)

transacted_amt_feature_credit_30_days = transacted_amt_by_customer(
    df_customer, ts_column="transacted_date", credit_debit="credit/debit", start_date=start_date, end_date=end_date_30_days)

transacted_amt_feature_credit_60_days = transacted_amt_by_customer(
    df_customer, ts_column="transacted_date", credit_debit="credit/debit", start_date=start_date, end_date=end_date_60_days)

####Rename the dataframe columns to give meaningful column names.

In [0]:
transacted_amt_feature_credit_7_days=transacted_amt_feature_credit_7_days.withColumnRenamed("amount_transacted_by_cust_over_given_period","amount_transacted_by_cust_over_7_days_credit").withColumnRenamed("total_no_of_transactions","total_no_of_credit_transactions_7_days")

transacted_amt_feature_credit_30_days=transacted_amt_feature_credit_30_days.withColumnRenamed("amount_transacted_by_cust_over_given_period","amount_transacted_by_cust_over_30_days_credit").withColumnRenamed("total_no_of_transactions","total_no_of_credit_transactions_30_days")

transacted_amt_feature_credit_60_days=transacted_amt_feature_credit_60_days.withColumnRenamed("amount_transacted_by_cust_over_given_period","amount_transacted_by_cust_over_60_days_credit").withColumnRenamed("total_no_of_transactions","total_no_of_credit_transactions_60_days")

####Results of the features for 7,30 and 60 days.

In [0]:
display(transacted_amt_feature_credit_7_days)
display(transacted_amt_feature_credit_30_days)
display(transacted_amt_feature_credit_60_days)

cust_id,amount_transacted_by_cust_over_7_days_credit,total_no_of_credit_transactions_7_days,todays_date
CUST235925,35028.0,1,2023-02-01
CUST854589,21406.0,1,2023-02-01


cust_id,amount_transacted_by_cust_over_30_days_credit,total_no_of_credit_transactions_30_days,todays_date
CUST235925,35028.0,1,2023-02-01
CUST854589,90951.0,3,2023-02-01


cust_id,amount_transacted_by_cust_over_60_days_credit,total_no_of_credit_transactions_60_days,todays_date
CUST235925,122209.0,2,2023-02-01
CUST301488,54401.0,1,2023-02-01
CUST854589,90951.0,3,2023-02-01


####Use Feature Store library to create new feature table

In [0]:
%sql 
CREATE DATABASE IF NOT EXISTS arl_feature_store;

####Install feature store

In [0]:
pip install databricks-feature-store

Python interpreter will be restarted.
Python interpreter will be restarted.


####Set the context for the feature store.

In [0]:
from databricks import feature_store
fs = feature_store.FeatureStoreClient()

####Create a feature store table

In [0]:
fs.drop_table('arl_feature_store.customer_transaction_feature')

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-295300276217568>:1[0m
[0;32m----> 1[0m [43mfs[49m[38;5;241;43m.[39;49m[43mdrop_table[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43marl_feature_store.customer_transaction_feature[39;49m[38;5;124;43m'[39;49m[43m)[49m

File [0;32m/databricks/python/lib/python3.9/site-packages/databricks/feature_store/client.py:338[0m, in [0;36mFeatureStoreClient.drop_table[0;34m(self, name)[0m
[1;32m    318[0m [38;5;124;03m"""[39;00m
[1;32m    319[0m [38;5;124;03mDelete the specified feature table. This API also drops the underlying Delta table.[39;00m
[1;32m    320[0m 
[0;32m   (...)[0m
[1;32m    331[0m [38;5;124;03m    published online stores separately.[39;00m
[1;32m    332[0m [38;5;124;03m"""[39;00m
[1;32m    333[0m name [38;5;241m=[39m uc_utils[38;5;241m.[39mg

####Create a feature table with 7 day feature

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

fs.create_table(
    name="arl_feature_store.customer_transaction_feature",
    primary_keys=["cust_id"],
    df=transacted_amt_feature_credit_7_days,
    description="This table will hold the data of total amount transacted by customer for give timerange"
)


2023/02/01 13:31:08 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.arl_feature_store.customer_transaction_feature'.
  yield prop, self.__getattribute__(prop)
Out[14]: <FeatureTable: keys=['cust_id'], tags={}>

#### Merge the feature table with 30 days feature data.

In [0]:
fs.write_table(
  name = "arl_feature_store.customer_transaction_feature",
  df = transacted_amt_feature_credit_30_days,
  mode = "merge")

####Merge the feature table with 60 days feature data.

In [0]:
fs.write_table(
  name = "arl_feature_store.customer_transaction_feature",
  df = transacted_amt_feature_credit_60_days,
  mode = "merge")

####Check if data is stored in feature store table

In [0]:
%sql
select 
todays_date, 
cust_id,
amount_transacted_by_cust_over_7_days_credit,
total_no_of_credit_transactions_7_days,
amount_transacted_by_cust_over_30_days_credit,
total_no_of_credit_transactions_30_days,
amount_transacted_by_cust_over_60_days_credit,
total_no_of_credit_transactions_60_days
from arl_feature_store.customer_transaction_feature;

todays_date,cust_id,amount_transacted_by_cust_over_7_days_credit,total_no_of_credit_transactions_7_days,amount_transacted_by_cust_over_30_days_credit,total_no_of_credit_transactions_30_days,amount_transacted_by_cust_over_60_days_credit,total_no_of_credit_transactions_60_days
2023-02-01,CUST235925,35028.0,1.0,35028.0,1.0,122209.0,2
2023-02-01,CUST301488,,,,,54401.0,1
2023-02-01,CUST854589,21406.0,1.0,90951.0,3.0,90951.0,3


In [0]:
%sql
describe  arl_feature_store.customer_transaction_feature;

col_name,data_type,comment
cust_id,string,
amount_transacted_by_cust_over_7_days_credit,float,
total_no_of_credit_transactions_7_days,int,
todays_date,date,
amount_transacted_by_cust_over_30_days_credit,float,
total_no_of_credit_transactions_30_days,int,
amount_transacted_by_cust_over_60_days_credit,float,
total_no_of_credit_transactions_60_days,int,
