In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from framework.feature_factory.helpers import Helpers
from channelDemoMarket import Store

spark = SparkSession.builder.appName('Test').getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 96*2)

In [79]:
#Create custom configs here.
#This is not necessary, trying to implement custom configs to enable nested features.
from framework.configobj import ConfigObj
config = ConfigObj()

#config.add("_partition_start", [201706]).add("_partition_end", [201812])
#config.get_or_else("_partition_start", "")

# Easy add multiple layers deep
config.add("level1.newlevel2.config", "test_value")
config.add("level1.newlevel3", {"newDict":"adding_dict_vals"})
config.add("level1.simple.value", "simple_string_value")
config.add("level1.simple.list_value", ["this", "is", "a", "list", "of", "strings"])

# Pretty print the config settings
config.print()

{
    "level1": {
        "newlevel2": {
            "config": "test_value"
        },
        "newlevel3": {
            "newDict": "adding_dict_vals"
        },
        "simple": {
            "list_value": [
                "this",
                "is",
                "a",
                "list",
                "of",
                "strings"
            ],
            "value": "simple_string_value"
        }
    }
}


In [36]:
# Istantiate store
#store = Store(_snapshot_date = "2018-01-01") #This works, but no neted features.
store = Store(_snapshot_date = "2018-01-01")


In [37]:
# Get The feature factory
ff = store.ff

In [38]:
# Grab some sales features
mult_features, base_features = store.Sales().get_all()

In [39]:
### For fisualization only.
# Build a base dataframe from cores/sources
example_df = store.get_core("issuer")
print("n_rows: %d" %example_df.count())
print("n_cols: %d" %len(example_df.columns))
example_df.show(truncate=True, n=5)

n_rows: 5613
n_cols: 7
+--------------------+-----------+------------------+-----------+--------------------+----------------+----------------+
|header_capture_month|issuer_name|chip_indicator_uid|product_uid|         description|purchase_txn_amt|purchase_txn_cnt|
+--------------------+-----------+------------------+-----------+--------------------+----------------+----------------+
| 2016-09-01 00:00:00| Royal Bank|                 7|          1|     Hardware Stores|   7.127944081E7|          919933|
| 2017-04-01 00:00:00| Scotiabank|                 9|          1|Drug Stores, Phar...|   1.672346653E7|          784273|
| 2018-12-01 00:00:00| Royal Bank|                 3|          1|Grocery Stores, S...|       100088.58|            1194|
| 2018-08-01 00:00:00|       CIBC|                 7|          1|     Hardware Stores|   6.163271108E7|          706047|
| 2017-09-01 00:00:00|       CIBC|                 3|          1|Fuel Dispenser, A...|             0.0|               0|
+--------

In [40]:
### For visualization only.
# Build a base dataframe from cores/sources
bank_df = store.get_core("bank_id").alias('bank')
print("n_rows: %d" %bank_df.count())
print("n_cols: %d" %len(bank_df.columns))
bank_df.show(truncate=True, n=5)

n_rows: 5
n_cols: 2
+--------------------+---------+
|         issuer_name|issuer_id|
+--------------------+---------+
|          Royal Bank|        1|
|BMO Bank of Montreal|        2|
|          Scotiabank|        3|
|     TD Canada Trust|        4|
|                CIBC|        5|
+--------------------+---------+



In [41]:
example_df.select("issuer_name").distinct().show()

+--------------------+
|         issuer_name|
+--------------------+
|BMO Bank of Montreal|
|                CIBC|
|          Scotiabank|
|          Royal Bank|
|     TD Canada Trust|
+--------------------+



In [42]:
# Build a base dataframe from cores/sources
store_sales_df = store.get_core("issuer").filter(col('purchase_txn_amt') > 0).alias('clean_amount')

In [43]:
print("n_rows: %d" %store_sales_df.count())
print("n_cols: %d" %len(store_sales_df.columns))
store_sales_df.show(truncate=True, n=5)

n_rows: 5342
n_cols: 7
+--------------------+---------------+------------------+-----------+--------------------+----------------+----------------+
|header_capture_month|    issuer_name|chip_indicator_uid|product_uid|         description|purchase_txn_amt|purchase_txn_cnt|
+--------------------+---------------+------------------+-----------+--------------------+----------------+----------------+
| 2016-09-01 00:00:00|     Royal Bank|                 7|          1|     Hardware Stores|   7.127944081E7|          919933|
| 2017-04-01 00:00:00|     Scotiabank|                 9|          1|Drug Stores, Phar...|   1.672346653E7|          784273|
| 2018-12-01 00:00:00|     Royal Bank|                 3|          1|Grocery Stores, S...|       100088.58|            1194|
| 2018-08-01 00:00:00|           CIBC|                 7|          1|     Hardware Stores|   6.163271108E7|          706047|
| 2018-10-01 00:00:00|TD Canada Trust|                 3|          1|Eating Places, Re...|           9

In [44]:
store_sales_df.agg({"header_capture_month": "min"}).collect()[0][0]

datetime.datetime(2016, 1, 1, 0, 0)

### Continue with example

Make a join!

In [45]:
base_df = store_sales_df.join(bank_df, ['issuer_name'])\
  .select('clean_amount.*', 'bank.issuer_id')
base_df.show(truncate=True, n=5)

+---------------+--------------------+------------------+-----------+--------------------+----------------+----------------+---------+
|    issuer_name|header_capture_month|chip_indicator_uid|product_uid|         description|purchase_txn_amt|purchase_txn_cnt|issuer_id|
+---------------+--------------------+------------------+-----------+--------------------+----------------+----------------+---------+
|     Royal Bank| 2016-09-01 00:00:00|                 7|          1|     Hardware Stores|   7.127944081E7|          919933|        1|
|     Scotiabank| 2017-04-01 00:00:00|                 9|          1|Drug Stores, Phar...|   1.672346653E7|          784273|        3|
|     Royal Bank| 2018-12-01 00:00:00|                 3|          1|Grocery Stores, S...|       100088.58|            1194|        1|
|           CIBC| 2018-08-01 00:00:00|                 7|          1|     Hardware Stores|   6.163271108E7|          706047|        5|
|TD Canada Trust| 2018-10-01 00:00:00|                 

### Show distinct categoricals

In [46]:
base_df.select("chip_indicator_uid").distinct().collect()

[Row(chip_indicator_uid=5),
 Row(chip_indicator_uid=7),
 Row(chip_indicator_uid=3),
 Row(chip_indicator_uid=9),
 Row(chip_indicator_uid=4),
 Row(chip_indicator_uid=2),
 Row(chip_indicator_uid=6)]

In [47]:
base_df.select("product_uid").distinct().collect()

[Row(product_uid=2), Row(product_uid=1)]

### Build a Features Dataframe

Here, we are simply calling the aggregation methods in mult_features to be applied over a group by on 'issuer_name'

In [48]:
feature_df = ff.append_features(base_df, groupBy_cols = ['issuer_name'], feature_sets=[mult_features])
#feature_df = ff.append_features(store_sales_df, groupBy_cols = ['issuer_name'], feature_sets=[mult_features])
feature_df.show()

+--------------------+--------------------+--------------+
|         issuer_name|           net_sales|total_quantity|
+--------------------+--------------------+--------------+
|BMO Bank of Montreal|  8.85825478646001E9|     286346816|
|                CIBC|1.428790330563999E10|     483494762|
|          Scotiabank|1.126012862541997...|     385629905|
|          Royal Bank|1.919939227528001...|     636845558|
|     TD Canada Trust|2.102817429950998...|     734890180|
+--------------------+--------------------+--------------+



### Write to Disk and Read the Aggregated Data

Just a simple example on how to write the aggregated features dict to disk and read it.

In [49]:
feature_df.write.format("parquet").mode("overwrite").save("./temp/base_feats_df_out")
read_df = spark.read.format("parquet").load("./temp/base_feats_df_out")
read_df.show(truncate=True, n=5)

+--------------------+--------------------+--------------+
|         issuer_name|           net_sales|total_quantity|
+--------------------+--------------------+--------------+
|BMO Bank of Montreal|  8.85825478646001E9|     286346816|
|     TD Canada Trust|2.102817429950998...|     734890180|
|          Royal Bank|1.919939227528001...|     636845558|
|          Scotiabank|1.126012862541997...|     385629905|
|                CIBC|1.428790330563999E10|     483494762|
+--------------------+--------------------+--------------+



### Using Multipliers

We will create a new features dataframe using composite aggregations.

In [50]:
store.config.get_config('time_helpers').configs

{'snapshot_type': 'DAILY',
 'snapshot_date': '2018-01-01',
 'partition_col': None,
 'date_col': 'header_capture_month',
 'date_col_format': '%Y-%m-%d',
 'partition_col_format': '%Y%m',
 'date_filters': {'ranges': {'1m': {'start': '2017-12-01',
    'end': '2018-01-01'},
   '3m': {'start': '2017-10-01', 'end': '2018-01-01'}}},
 'partition_lower': '201710',
 'partition_upper': '201801'}

In [51]:
time_multipliers = store.get_daterange_multiplier()

In [52]:
mult_by_time_features = mult_features.multiply(time_multipliers, "STORE")

In [53]:
feature_df = ff.append_features(store_sales_df,
                                groupBy_cols = ['clean_amount.chip_indicator_uid'],
                                feature_sets=[mult_features, mult_by_time_features])
feature_df.show()

+------------------+--------------------+--------------+--------------------+--------------------+-----------------------+-----------------------+
|chip_indicator_uid|           net_sales|total_quantity|  STORE_1M_NET_SALES|  STORE_3M_NET_SALES|STORE_1M_TOTAL_QUANTITY|STORE_3M_TOTAL_QUANTITY|
+------------------+--------------------+--------------+--------------------+--------------------+-----------------------+-----------------------+
|                 5|      1.4351703091E8|       3915814|          4527384.78|       1.252198736E7|                 102887|                 311881|
|                 7|5.751287592964993E10|    1518959975|1.8397529930000005E9| 5.008232215580001E9|               41209251|              122181830|
|                 3| 5.409171115000006E7|       1696271|  390674.47000000003|  1088949.6099999999|                   6348|                  20523|
|                 9|1.683412248285999...|    1000494061| 5.750419964200001E8|1.5866615026900005E9|               31548

### Write and read the new features dataframe

In [54]:
feature_df.write.format("parquet").mode("overwrite").save("./temp/agg_df_out")
spark.read.format("parquet").load("./temp/agg_df_out").show()

+------------------+--------------------+--------------+--------------------+--------------------+-----------------------+-----------------------+
|chip_indicator_uid|           net_sales|total_quantity|  STORE_1M_NET_SALES|  STORE_3M_NET_SALES|STORE_1M_TOTAL_QUANTITY|STORE_3M_TOTAL_QUANTITY|
+------------------+--------------------+--------------+--------------------+--------------------+-----------------------+-----------------------+
|                 9|1.683412248285999...|    1000494061| 5.750419964200001E8|1.5866615026900005E9|               31548693|               92396379|
|                 5|      1.4351703091E8|       3915814|          4527384.78|       1.252198736E7|                 102887|                 311881|
|                 2|2.5820206799999997E7|        243088|  1044744.5299999999|  2558333.0100000007|                   9274|                  23823|
|                 7|5.751287592964993E10|    1518959975|1.8397529930000005E9| 5.008232215580001E9|               41209

### Categorical Multiplier
Now let's assume we have several cateogical columns for which we want to calculate aggregates.

The categorical multiplier allows you to either specific to which columns you wish to apply the multiplier as well as a minimum distinct values count n and an ignore list of columns and it will efficiently find all the columns with < n distinct values

In [71]:
#feature_df
categorical_multiplier = Helpers().get_categoricals_multiplier(df = store.get_core("issuer"),
                                                               col_list = ['chip_indicator_uid', 'product_uid'])
mult_by_cat_features = mult_features.multiply(categorical_multiplier, "STORE")

In [74]:
feature_df = ff.append_features(base_df,
                                groupBy_cols=['issuer_name'],
                                #Just adds raw columns with the values of the grouped by quantity.
                                #groupBy_cols=['issuer_name', 'chip_indicator_uid', 'product_uid'],
                                feature_sets=[mult_features, mult_by_time_features, mult_by_cat_features])

In [75]:
print("n cols = %d" %len(feature_df.columns))
feature_df.show(truncate=True, n=5)

n cols = 27
+--------------------+------------------+-----------+--------------------+--------------+------------------+------------------+-----------------------+-----------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+-----------------------------+-----------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+----------------------------------+----------------------------------+
|         issuer_name|chip_indicator_uid|product_uid|           net_sales|total_quantity|STORE_1M_NET_SALES|STORE_3M_NET_SALES|ST

### Nested features?

The dataframe above uses the multipliers opperations with each aggregation individually. i.e. the number of columns follow an additive pattern, rather than multiplicative. Can we change that?

In [76]:
#Try using the config file?