In [1]:
from mkt_meas_sdk.task import BQTask, PythonTask

In [2]:

class CONTRAFACT_MLB(BQTask): # A BQTask will run a query and dump the results to a table with the same name as the class
    """This is the decription of our dataset and will be written on Fury"""
    
    materialization = "incremental" # This configures if we´re dropping the table or appending to it
    
    def depends_on(self):
        """
        This function must return a dict (may be empty)
            Key: 
                Any string that you want. This can be called in the template by the ref function
                Example: select * from {{ref("key")}}
            
            Value: 
                An imported class task. It doesn´t need to be instantiated
        
        Since the SDK controls the name of the table, we ask the ancestor task for the table name
        """
        return {}
    
    @staticmethod # REQUIRED: This tells python to attach a function to a class without passing self as first argument
    def get_dql_template():
        """
        This function must return a string with our query template. Note that it´s called Dql because
        we want only to describe how the data will be queried. 
        DO NOT:
        1. Create temp tables here
        1. Use DECLARE STATEMENTS
        
        DO:
        1. Use Common Table Expressions (WITH CTE AS (....))
        
        Jinja Templating:
            1. All jinja variables must be delimited by {{}}
            2. Favor using the ref function {{ref("my dependency")}}
        """
        
        sql = """
with nb as (
SELECT SIT_SITE_ID,
CAST(ORDER_CREATED_DTTM AS DATE) ORDER_CREATED_DATE,
COUNT(DISTINCT CUS_CUST_ID) NEW_BUYERS_AGMV
FROM `meli-marketing.OPSECOM.LK_ORDER_CUST_NRB_FLAGS`
WHERE
DATE(ORDER_CREATED_DTTM) >= '{{dt}}'
AND FLAG_NEW_BUYER_AGMV
GROUP BY 1, 2
ORDER BY 1, 2
),
sesiones as (
SELECT
  TIM_DAY,
  SIT_SITE_ID,
  SUM(SESSIONS_CANAL_INTERO) AS SESSIONS_INTERNO,
  SUM(SESSIONS_MARCARIAS) AS SESSIONS_MARCARIAS,
  SUM(SESSIONS_ORGANIC) AS SESSIONS_ORGANIC,
  SUM(SESSIONS_OTROS) AS SESSIONS_OTHERS,
  SUM(SESSIONS_PAGAS) AS SESSIONS_PAGAS
FROM
  `meli-marketing.MKTPUBLIC.V_BT_KPI_DAILY_ML_SUMMARY`
WHERE
  TIM_DAY >= '{{dt}}'
GROUP BY
  1,2
ORDER BY 1
),
gmv_competencia as (
SELECT
DATA,
sum(gmv_compre_confie) AS GMV_COMPETENCIA,
FROM `meli-bi-data.SBOX_MKPBR.DM_MARKETSHARE_DAILY` 
where data >= '{{dt}}'
GROUP BY 1
ORDER BY 1
),

 ORDERS_DATA AS (
SELECT ORD.ORD_CREATED_DT, ORD_ITEM.SHIPPING.FREE_SHIPPING_FLG,
CASE WHEN ORD_ITEM.SHIPPING.FREE_SHIPPING_FLG IS TRUE THEN COUNT(DISTINCT ORD.ORD_ORDER_ID) ELSE 0 END AS ORDERS_FREE_SHIPPING,
CASE WHEN ORD_ITEM.SHIPPING.FREE_SHIPPING_FLG IS FALSE THEN COUNT(DISTINCT ORD.ORD_ORDER_ID) ELSE 0 END AS ORDERS_NO_FREE_SHIPPING,
SUM(ORD_SHIPPING.COST) AS BUYER_SHIPPING_COST
FROM `meli-bi-data.WHOWNER.BT_ORD_ORDERS` ORD
LEFT JOIN `meli-bi-data.WHOWNER.LK_CURRENCY_CONVERTION` cc ON cc.SIT_SITE_ID = ORD.SIT_SITE_ID AND cc.TIM_DAY = ORD.ORD_CREATED_DT
WHERE ORD.SIT_SITE_ID = 'MLB'
AND ORD_TGMV_FLG IS TRUE
AND ORD_AUTO_OFFER_FLG IS FALSE
AND ORD_CLOSED_DT IS NOT NULL
AND ORD.ORD_CREATED_DT >= '{{dt}}'
AND CASE
            WHEN ORD.ORD_GMV_FLG = FALSE THEN FALSE
            WHEN ( COALESCE(ORD.ORD_ITEM.BASE_CURRENT_PRICE ,COALESCE(ORD.CC_USD_RATIO, cc.USD_RATIO) * ORD.ORD_ITEM.UNIT_PRICE, 0.0) < 10000.0
            AND ( ORD.ORD_CATEGORY.LEVELS[SAFE_OFFSET(0)].ID NOT IN (1459, 1743, 1540, 28, 5973) OR ORD.ORD_CATEGORY.LEVELS[SAFE_OFFSET(0)].ID IS NULL )
            AND ORD.SIT_SITE_ID NOT IN ('ABN', 'ASM', 'CCM')
            ) THEN TRUE
            ELSE FALSE
         END = TRUE
GROUP BY 1, 2),

COST AS (SELECT TIM_DAY,
SIT_SITE_ID,
cast(sum(COST_LC) as INT64) INVERSION
FROM `meli-marketing.MATT.V_BT_MATT_ML_TOOL` TB
WHERE  
ATTRIBUTION_TYPE = 'TIME DECAY 7 DAYS CALIB'
AND TIM_DAY >= '{{dt}}'
AND BUSINESS_UNIT = 'ML'
and SIT_SITE_ID = 'MLB'
group by 1,2
order by 1

),

DUMMIES AS (SELECT *
FROM `mkt-measurement.DUMMIES.MLB`
WHERE DATE >= '{{dt}}'
)

SELECT
  ODR_CREATED_DT,
  COUNT(DISTINCT ODR_ORDER_ID) AS ORDERS_COUNT,
  sessions_others,
   SESSIONS_INTERNO,
   SESSIONS_MARCARIAS,
   SESSIONS_ORGANIC,
  SESSIONS_PAGAS,
  NEW_BUYERS_AGMV,
  INVERSION,
  cast(g.GMV_COMPETENCIA as int64) GMV_COMPETENCIA,
  SUM(ORDERS_FREE_SHIPPING)/(SUM(ORDERS_NO_FREE_SHIPPING)+SUM(ORDERS_FREE_SHIPPING)) FREE_SHIPPING,
ISO_WEEK,
BB_AMERICANAS,
COMMERCIAL_DATE,
WEEKDAY,
HOLIDAY,
TEST_DUMMY

FROM
  sesiones a
  inner join `meli-bi-data.WHOWNER.BT_MATT_ATTRIBUTION_ML` b
  on a.SIT_SITE_ID=b.SIT_SITE_ID and a.TIM_DAY=b.ODR_CREATED_DT
  inner join nb 
  on a.SIT_SITE_ID=nb.SIT_SITE_ID and a.TIM_DAY=nb.ORDER_CREATED_DATE
  inner join gmv_competencia g
  On a.TIM_DAY = g.DATA
  inner join ORDERS_DATA d
  on g.data = d.ord_created_dt
  inner join COST inv
  on inv.tim_day = d.ord_created_dt
  inner join DUMMIES dm
  on inv.tim_day = dm.date
WHERE
  a.TIM_DAY >= '{{dt}}'
  AND a.SIT_SITE_ID= 'MLB'
group by 1,3,4,5,6,7,8,9,10,12,13,14,15,16,17
ORDER BY 1


"""
        return sql
    


In [3]:
from melitk.connectors import BigQuery
import json
import base64
from melitk.melipass import get_secret

credentials = json.loads(base64.b64decode(get_secret('GCP_CREDENTIALS')))

engine = BigQuery(credentials)

In [4]:
task = CONTRAFACT_MLB(engine)
# You can render the query
task.render(dt='2023-01-01')

# You can get the required inputs
task.get_task_inputs()


[2023-04-20 10:07:44-0400] INFO - prefect.FlowRunner | Beginning Flow run for '`meli-marketing.MEASUREMENT.contrafact_mlb`_render'
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'get_dql_template': Starting task run...
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'get_dql_template': Finished task run for task with final state: 'Success'
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'dt': Starting task run...
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'dt': Finished task run for task with final state: 'Success'
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'cartesian_product_of_parameters': Starting task run...
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'cartesian_product_of_parameters': Finished task run for task with final state: 'Success'
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'render_dql': Starting task run...
[2023-04-20 10:07:44-0400] INFO - prefect.TaskRunner | Task 'ren

['dt']

In [5]:
task(dt='2023-01-01')

[2023-04-20 10:12:27-0400] INFO - prefect.FlowRunner | Beginning Flow run for '`meli-marketing.MEASUREMENT.contrafact_mlb`_render'
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'dt': Starting task run...
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'get_dql_template': Starting task run...
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'dt': Finished task run for task with final state: 'Success'
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'get_dql_template': Finished task run for task with final state: 'Success'
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'cartesian_product_of_parameters': Starting task run...
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'cartesian_product_of_parameters': Finished task run for task with final state: 'Success'
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'render_dql': Starting task run...
[2023-04-20 10:12:28-0400] INFO - prefect.TaskRunner | Task 'ren