# PyPika Insight Building Demo

## PyPika Imports

In [1]:
from pypika import (
    Query,
    Table,
    Tables,
    Field,
    AliasedQuery,
    CustomFunction,
    Order,
    Database,
    Schema,
    Case,
    Criterion,
    analytics as an,
    functions as fn
)

from mimo_custom import custom_functions as cf
from mimo_custom import variable_list as vl

## Variables

In [2]:
mcc_list = [5598,5655,5940,5941,5996,7032,7941,7992,20006,20076,20077,20106]

## Initialise Tables

In [3]:
# schema = Schema("consumption_db")

# transactions_table = Table("transactions_v", schema=schema).as_("base")
# pa_table = Table("parties_accounts_relations_v",schema=schema)

transactions_table = Table("transactions_v_incl_cc").as_("base")
pa_table = Table("parties_accounts_relations_v")

## Parties Accounts Relations

In [4]:
pa_mapping = Query.from_(pa_table).select("*").as_("pa_mapping")

# pa_mapping.get_sql(quote_char=None)

## List Filters

In [9]:
mcc_filter = vl.NarrativeCriterions.generate_criterion("",transactions_table.mcc, "IN", mcc_list)

## Join Transactions and Parties Accounts Relations

In [10]:
rank = (
    an.Rank()
    .over(pa_mapping.prophet_party_id)
    .orderby(transactions_table.transaction_booking_timestamp, order=Order.desc)
    .orderby(transactions_table.amount, order=Order.desc)
    .orderby(transactions_table.merchant_brand_name, order=Order.asc)
    .orderby(transactions_table.clean_narrative, order=Order.asc)
    .orderby(transactions_table.prophet_transaction_id, order=Order.desc)
    .as_("_rank")
)

trn_pa_join = (
    Query.from_(transactions_table).inner_join(pa_mapping)
    .on_field("prophet_account_id").select(
    pa_mapping.prophet_party_id.as_("prophet_party_id_a"),
    pa_mapping.brand.as_("brand_a"),
    transactions_table.transaction_booking_timestamp,
    transactions_table.amount,
    transactions_table.merchant_brand_name,
    transactions_table.clean_narrative,
    rank,
    cf.Concat_Ws(':',transactions_table.merchant_system_category1_name, transactions_table.merchant_system_category2_name).as_("category_1"), 
    transactions_table.merchant_system_category3_name.as_("category_2")
    )
    .where(cf.UnixTimestamp(transactions_table.record_creation_timestamp,{0}) >= cf.UnixTimestamp('2023-10-01T00:00:00.000',{0}))
    .where(cf.UnixTimestamp(transactions_table.record_creation_timestamp,{0}) < cf.UnixTimestamp('2023-11-01T00:00:00.00',{0}))
    .where((transactions_table.pos_refund_flag != 1) | (transactions_table.pos_refund_flag.isnull()))
    .where(-(transactions_table.amount)>=10)
    .where(mcc_filter)
).as_("sub_query")

# trn_pa_join.get_sql(quote_char=None)

## Sub Query

In [11]:
sub_query = Query.from_(trn_pa_join).select("*").where(trn_pa_join._rank==1)

# secondary_aggregate.get_sql(quote_char=None)

## Final Query

In [12]:
parent_query = (Query
            .with_(sub_query, "sq")
            .from_(AliasedQuery("sq"))
            .select(
                    sub_query.prophet_party_id_a.as_("prophet_party_id"),
                    sub_query.brand_a.as_("brand"),
                    fn.Cast(None,"String").as_("prophet_account_id"),
                    cf.reflect("java.util.UUID", "randomUUID").as_("insight_id"),
                    cf.FromUnixTime(cf.UnixTimestamp(),{0}).as_("event_timestamp"),
                    fn.Cast(None,"String").as_("transaction_record_creation_timestamp"),
                    fn.Cast(None,"String").as_("transaction_booking_timestamp"),
                    fn.Cast(None,"String").as_("transaction_value_timestamp"),
                    cf.Named_Struct("flag_a",sub_query.category_1,"flag_b",sub_query.category_2).as_('insight_attributes'),
                    fn.Cast("0.0.1","String").as_("user_story_version"),
                    fn.Cast("HttpLIVYOperator","String").as_("created_by"),
                    cf.FromUnixTime(cf.UnixTimestamp(),{0}).as_("record_creation_timestamp"),
                    fn.Cast(None,"String").as_("data_source"),
                    fn.Cast("10007-Sports-Retailer-Pg","String").as_("user_story_id")
))

In [13]:
final_query = parent_query.get_sql(quote_char=None)
final_query.format(""" "yyyy-MM-dd'T'HH:mm:ss.SSS" """)

'WITH sq AS (SELECT * FROM (SELECT pa_mapping.prophet_party_id prophet_party_id_a,pa_mapping.brand brand_a,base.transaction_booking_timestamp,base.amount,base.merchant_brand_name,base.clean_narrative,RANK() OVER(PARTITION BY pa_mapping.prophet_party_id ORDER BY base.transaction_booking_timestamp DESC,base.amount DESC,base.merchant_brand_name ASC,base.clean_narrative ASC,base.prophet_transaction_id DESC) _rank,CONCAT_WS(\':\',base.merchant_system_category1_name,base.merchant_system_category2_name) category_1,base.merchant_system_category3_name category_2 FROM transactions_v_incl_cc base JOIN (SELECT * FROM parties_accounts_relations_v) pa_mapping ON base.prophet_account_id=pa_mapping.prophet_account_id WHERE UNIX_TIMESTAMP(base.record_creation_timestamp, "yyyy-MM-dd\'T\'HH:mm:ss.SSS" )>=UNIX_TIMESTAMP(\'2023-10-01T00:00:00.000\', "yyyy-MM-dd\'T\'HH:mm:ss.SSS" ) AND UNIX_TIMESTAMP(base.record_creation_timestamp, "yyyy-MM-dd\'T\'HH:mm:ss.SSS" )<UNIX_TIMESTAMP(\'2023-11-01T00:00:00.00\', "