In [30]:
from common_functions import google_sheets
from datetime import datetime, timedelta

In [14]:
import boto3
import base64
import os
import snowflake.connector
import numpy as np
import pandas as pd
import json

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  warn_incompatible_dep(


In [8]:
def get_secret(secret_name):
    region_name = "us-east-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name=region_name)

    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except Exception as e:
        raise e
    else:
        if "SecretString" in get_secret_value_response:
            return get_secret_value_response["SecretString"]
        else:
            return base64.b64decode(get_secret_value_response["SecretBinary"])
        
def snowflake_query(query, columns=[]):
    config = {
            'user': os.environ["sagemaker_service_user"],
            'account': os.environ["sagemaker_account"],
            'private_key_file': '/tmp/sagemaker_service.p8',
            'database': os.environ["EGYPT_SNOWFLAKE_DATABASE"],
            'role': os.environ["sagemaker_role"],
            'schema': 'PUBLIC'
        }
        
    con = snowflake.connector.connect(**config)
    
    try:
        cur = con.cursor()
        cur.execute("USE WAREHOUSE COMPUTE_WH")
        cur.execute(query)
        column_names = [col[0] for col in cur.description]
        if len(columns) == 0:
            out = pd.DataFrame(np.array(cur.fetchall()), columns=column_names)
            out.columns = out.columns.str.lower() 
        else:
            out = pd.DataFrame(np.array(cur.fetchall()), columns=columns)
            out.columns = out.columns.str.lower()
        return out
    except Exception as e:
        print("Error: ", e)
    finally:
        cur.close()
        con.close()


In [7]:
snowflake_sagemaker_secrets = json.loads(get_secret("Snowflake-sagemaker"))

os.environ["SNOWFLAKE_AIRFLOW_WAREHOUSE"] = snowflake_sagemaker_secrets["airflow_scripts_main_warehouse"]
os.environ[f"EGYPT_SNOWFLAKE_DATABASE"] = snowflake_sagemaker_secrets[f"egypt_database"]
os.environ["sagemaker_key_bucket"] = snowflake_sagemaker_secrets["private_key_bucket"]
os.environ["sagemaker_key_path"] = snowflake_sagemaker_secrets["private_key_path"]
os.environ["sagemaker_service_user"] = snowflake_sagemaker_secrets["service_username"]
os.environ["sagemaker_account"] = snowflake_sagemaker_secrets["account"]
os.environ["sagemaker_role"] = snowflake_sagemaker_secrets["role"]
boto3.client('s3').download_file(os.environ["sagemaker_key_bucket"], os.environ["sagemaker_key_path"], '/tmp/sagemaker_service.p8')

snowflake_columns_encryption_key_secret = json.loads(get_secret("prod/snowflake/columns_encryption_key"))

os.environ["KEY"] = snowflake_columns_encryption_key_secret['ENCRYPTION_KEY']
os.environ["IV"] = snowflake_columns_encryption_key_secret['IV']
os.environ["AAD"] = snowflake_columns_encryption_key_secret['AAD']

In [9]:
query = f"""with ret_info as 
(
select 
	retailer_id as rid,
	min (created_at::date) as first_day,
	coalesce(count (case when created_at::date > current_date - interval '45 day' and status = 8 then id end)
		* 1.0 / nullif(count (case when created_at::date > current_date - interval '45 day' then id end),0),0) as cancelation_rate,
	max(case when r=2 and status=8 then 1 else 0 end) as canceled_last_order,
	max(case when r=2 then created_at::Date end) as last_order_date
from (
select 
	*,
	rank () over (partition by retailer_id order by created_at desc) as r
from egypt_marketplace.sales_orders
) as sub 
	group by 1
)  

, seller_base as 
(
select 
	seller_id,
	coalesce(sum (case when created_at >= current_date - interval '30 day' and status = 6 then total_price end),0)
		* 1.0 / nullif(sum (case when created_at >= current_date - interval '30 day' then total_price end),0) as nmv_to_gmv_actual,
	floor((coalesce(sum (case when created_at >= current_date - interval '30 day' and status = 6 then total_price end),0)
		* 1.0 / nullif(sum (case when created_at >= current_date - interval '30 day' then total_price end),0)) /0.1) *0.1 as nmv_to_gmv_last30,
	floor ((coalesce(sum (case when created_at >= current_date - interval '7 day' and status = 6 then total_price end),0)
		* 1.0 / nullif(sum (case when created_at >= current_date - interval '7 day' then total_price end),0)) /0.1) *0.1 as nmv_to_gmv_last7,
	count (case when status not in (3,6,7,8) then id end) as nonterminal_orders,
	count (case when estimated_delivery_date< current_date then sales_orders.id end) as due_until_yesterday,
	floor((count (case when estimated_delivery_Date is not null and status not in (3,6,7,8) and ESTIMATED_delivery_date < current_date then id end) 
		* 1.0 / nullif (count (case when status not in (3,6,7,8) then id end),0)) /0.05) *0.05 as late_orders_perc
from egypt_marketplace.saleS_orders
	where (created_at >= current_date - interval '30 day' or status not in (3,6,7,8))  
		and seller_id in (select distinct seller_id from egypt_marketplace.sales_orders where status not in (3,6,7,8) and (sales_orders.status = 2 or estimated_delivery_date <=current_date))
group by 1
)

, percentiles as (
select
	percentile_cont (0.7) within group (order by nonterminal_orders) as operc70,
	percentile_cont (0.35) within group (order by nonterminal_orders) as operc35,
	percentile_cont (0.7) within group (order by due_until_yesterday) as sperc70,
	percentile_cont (0.35) within group (order by due_until_yesterday)  as  sperc35
from seller_base
)

, seller_info as (
select 
	*, 
	row_number () over (order by nmv_to_gmv_ammended,nonterminal_percentile,size_percentile,late_orders_perc) as seller_priority ,
	concat ('nmv to gmv (last 30 days): ',concat(round(nmv_to_gmv_actual*100),'%'),
		'/ seller_size: ', case when size_percentile = 1 then 'large' when size_percentile = 2 then 'medium' else 'small' end
	) as extra_info
from (
select 
	*,
	case
		when nmv_to_gmv_last30 <= 0.5 then 0.5
		else nmv_to_gmv_last30 end as nmv_to_gmv_ammended,
	case 
		when nonterminal_orders>=operc70 then 1
		when nonterminal_orders>=operc35 then 2
		else 3
		end as nonterminal_percentile,
	case 
		when due_until_yesterday>=sperc70 then 1
		when due_until_yesterday>=sperc70 then 2
		else 3 
		end as size_percentile
from seller_base
	cross join percentiles
) as sub 
)

, spikes as 
(
select 
	seller_id,
	case when avg ( case when day <> current_date - interval '1 day' then placed end)*1.5 < max( case when day = current_date - interval '1 day' then placed end)  then 1 else 0 end as spiked
from (
SELECT
	seller_id,
	date_trunc ('day',created_at) as day,
	count (sales_orders.id) as placed,
	count (case when status in (1,2,4) then sales_orders.id end) as not_delivering
from egypt_marketplace.sales_orders 
	where created_at::date >= current_date - interval '7 day' and created_at::date <> current_date
group by 1,2
) as sub 
group by 1
)

, polygon_info as 
(
select 
	seller_id,
	array_agg (polygon_name) as polygons
from (
select 
	seller_id,
	polygon_id,
	polygons.name as polygon_name,
	sum (case when status in (3,7,8) then total_price end) * 1.0 / sum (total_price) as failed_gmv,
	sum (case when sales_orders.created_at >= current_date - interval '3 day' and status in (3,7,8) then total_price end) 
		* 1.0 / sum (case when sales_orders.created_at >= current_date - interval '3 day' then total_price end) as failed_last3
from egypt_marketplace.sales_orders
	join egypt_marketplace.polygons on polygons.id = sales_orders.polygon_id
where saleS_orders.created_at >= current_date - interval '30 day' and polygons.deleted_at is null 
group by 1,2,3
) as sub 
	where failed_last3 is not null and polygon_name is not null and ((failed_last3 >0.3) or (failed_last3 >0.1 and failed_gmv>0.5))
group by 1
)

, recalls as 
(
select distinct seller_id from (
select 
	t.seller_id,
	todays_deliveries,
	late_deliveries,
	t.status,
	case
		when (t.todays_deliveries::int*0.5 < count (case when estimated_delivery_date is not null and estimated_delivery_date=current_date then sales_orders.id end ))
		or (t.late_deliveries::int*0.5<count (case when estimated_delivery_date is not null and estimated_delivery_date<current_date then sales_orders.id end )) 
    	or (count (case when sales_orders.status = 2 then sales_orders.id end) <> 0) then 1 else 0 end as recall_flag
from (
	select 
		*,
		dense_rank () over (order by t.created_at desc) as r
	from  MATERIALIZED_VIEWS.liveops_tasks as t
		where t.created_at::Date = date(CONVERT_TIMEZONE('UTC', 'Africa/Cairo',current_timestamp))
			and t.type = 'seller_followup' 
	) as t
left join egypt_marketplace.sales_orders on sales_orders.seller_id::varchar = t.seller_id and sales_orders.created_at < t.created_at
	where r=1 and sales_orders.status not in (3,6,7,8)
	group by 1,2,3,4
	) as sub
where status <> 'done' or (status = 'done' and recall_flag =1)
)

-- --/*
select 
	*,
	rank () over (partition by "agent_email" order by "type" desc, draft_rk) as "priority"
from (
select 		
	uuid_string()::text as "Id", 
	unioned.*, 
	case when right (extra_draft,2) = ' /' then left (extra_draft,length (extra_draft)-2) else extra_draft end as "extra_info" ,
	case
		when row_number () over (partition by "type" order by draft_rk) <= (count (*) over (partition by "type") *1.0 / 2) 	
			then  1
		when row_number () over (partition by "type" order by draft_rk) > (count (*) over (partition by "type") *1.0 / 2) 	
			then  2
		-- when row_number () over (partition by "type" order by priority) > (count (*) over (partition by "type") *1.0 / 2) 	
		-- 	and date_part('hour',current_timestamp) >= 18 then  'shady_mansour@maxab.io'
		end as "agent_email"
from (
select
	'seller_followup' as "type",
	'new' as "status",
    CONVERT_TIMEZONE('UTC', 'Africa/Cairo',current_timestamp) as "created_at",
    concat ('Follow up on seller: ', entity_name) as "task_title",
    seller_priority as draft_rk,
    sales_orders.seller_id::varchar as "seller_id",
    sellers.entity_name as "seller_name",
    null as "order_id",
    null as "retailer_number",
    null as "retailer_name",
    count (distinct case when estimated_delivery_date is not null and estimated_delivery_date=current_date then sales_orders.id end )::varchar as "todays_deliveries",
	count (distinct case when estimated_delivery_date is not null and estimated_delivery_date<current_date then sales_orders.id end )::varchar as "late_deliveries",
    count (distinct case when sales_orders.status = 2 then sales_orders.id end)::varchar as "new_orders",
    polygons::varchar as "critical_polygons",
	concat(extra_info, case when spiked =1 then ' / Has order spike' end) as extra_draft
from egypt_marketplace.sales_orders
    left join egypt_marketplace.sellers on sellers.id = sales_orders.seller_id
	join seller_info on seller_info.seller_id = sales_orders.seller_id
	left join polygon_info on polygon_info.seller_id = sales_orders.seller_id 
	left join spikes on spikes.seller_id = sales_orders.seller_id and spiked = 1
where sales_orders.status not in (3,6,7,8) and  sales_orders.seller_id not in (74,101)
	and (sales_orders.status = 2 or estimated_delivery_date <=current_date)
	and ((select max(created_at::date) from  MATERIALIZED_VIEWS.liveops_tasks) <> date(CONVERT_TIMEZONE('UTC', 'Africa/Cairo',current_timestamp)) or (sales_orders.seller_id in (select seller_id::int from recalls)))
group by 1,2,3,4,5,6,7,8,9,10,14,15

-- -- select * from egypt_marketplace.sales_orders where seller_id =110 and sales_orders.status = 2
	
union all 


select 
	'order_followup',
	'new',
	CONVERT_TIMEZONE('UTC', 'Africa/Cairo',current_timestamp),
	concat ('Follow up on retailer: ', name),
	r,
	array_agg(distinct seller_id)::varchar,
	replace(replace(array_agg(distinct entity_name)::varchar,'"',''),',',' / '),
	array_agg(id)::varchar,
	mobile,
	name,
	null,
	null,
	null,
	null, 
	concat ( 
			case when coalesce(estimated_delivery_Date,current_date)<current_date then 'late_order /' else '' end,
			case when created_at::Date = first_day then ' first_order /' else '' end,
			case when canceled_last_order = 1 then ' canceled last order /' else '' end, 
			case when cancelation_rate > 0.3 then ' high cancelation rate /' else '' end,
			case when coalesce(last_order_date,current_date) < current_date - interval '20 day' then ' returning (last order > 20 days)' else '' end
		) as extras 
from 
	(select * ,row_number () over (order by late,cancels,fo) as r from(
		select 
			sales_orders.*,
			entity_name,
			retailers.name,
			retailers.mobile,
			ret_info.*,
			case when estimated_delivery_date < current_date then 1 else 2 end as late,
			case when sales_orders.created_at::Date = first_day then 1 else  2 end as fo,
			case when cancelation_Rate > 0.3 or canceled_last_order = 1 then 1 else 2 end as cancels
		from egypt_marketplace.sales_orders
			left join egypt_marketplace.sellers on sellers.id = sales_orders.seller_id
			left join public.retailers on retailers.id = sales_orders.retailer_id
			left join ret_info on ret_info.rid = sales_orders.retailer_id
		where sales_orders.status not in (3,6,7,8) and seller_id not in (74,101)
			and (sales_orders.created_at::Date = first_day or order_price >= 15000 
			--or cancelation_Rate > 0.3 or canceled_last_order = 1 or coalesce(last_order_date,current_date) < current_date - interval '20 day'
			)
			and sales_orders.id not in (sELECT value::INT AS flattened_order_id FROM MATERIALIZED_VIEWS.liveops_tasks,
       			LATERAL FLATTEN(input => SPLIT(order_id, ',')) WHERE type = 'order_followup') 
		)	as sub 
	) as ranker
group by 1,2,3,4,5,9,10,11,12,13,14,15
) as unioned
) as final
order by   "type" desc,draft_rk --"seller_id", "task_title" 
--*/

-- select 	
-- 	*
-- from fintech.Tasks"""

In [36]:
df =snowflake_query(query)

In [40]:
df.columns = ['ID', 'TYPE', 'STATUS', 'CREATED_AT', 'TASK_TITLE', 'DRAFT_RK',
              'SELLER_ID', 'SELLER_NAME', 'ORDER_ID', 'RETAILER_NUMBER',
              'RETAILER_NAME', 'Feedback', 'LATE_DELIVERIES', 'NEW_ORDERS',
              'CRITICAL_POLYGONS', 'EXTRA_DRAFT', 'EXTRA_INFO', 'AGENT_EMAIL',
              'PRIORITY']


ValueError: Length mismatch: Expected axis has 20 elements, new values have 19 elements

In [41]:
print(df.columns.tolist)

<bound method IndexOpsMixin.tolist of Index(['ID', 'TYPE', 'STATUS', 'CREATED_AT', 'TASK_TITLE', 'DRAFT_RK',
       'SELLER_ID', 'SELLER_NAME', 'ORDER_ID', 'RETAILER_NUMBER',
       'RETAILER_NAME', 'Feedback', 'LATE_DELIVERIES', 'NEW_ORDERS',
       'CRITICAL_POLYGONS', 'EXTRA_DRAFT', 'EXTRA_INFO', 'AGENT_EMAIL',
       'PRIORITY', 'UPDATED_AT'],
      dtype='object')>


In [27]:
df = df.head(1)

In [31]:
now = datetime.now() + timedelta(hours=3)

In [39]:
df["UPDATED_AT"] = now

In [43]:
df.head()

Unnamed: 0,ID,TYPE,STATUS,CREATED_AT,TASK_TITLE,DRAFT_RK,SELLER_ID,SELLER_NAME,ORDER_ID,RETAILER_NUMBER,RETAILER_NAME,Feedback,LATE_DELIVERIES,NEW_ORDERS,CRITICAL_POLYGONS,EXTRA_DRAFT,EXTRA_INFO,AGENT_EMAIL,PRIORITY,UPDATED_AT
0,a45c144c-5123-4055-af0d-258bde2c43df,seller_followup,new,2025-06-24 15:34:08.044,Follow up on seller: جملة الاخوة المنشية - كشك...,1,3291,جملة الاخوة المنشية - كشكول كامل,,,,11,0,8,"[""امبابة"",""العمرانية""]",,,1,1,2025-06-24 12:33:27.246717
1,d3d718e1-5d5c-4ef5-b8b8-2ea2b51e8a45,seller_followup,new,2025-06-24 15:34:08.044,Follow up on seller: شركة القدس - كشكول كامل,2,4862,شركة القدس - كشكول كامل,,,,5,0,21,"[""العوايد"",""العجمي"",""المراغي"",""العجمي 2""]",nmv to gmv (last 30 days): 50%/ seller_size: s...,nmv to gmv (last 30 days): 50%/ seller_size: s...,1,2,2025-06-24 12:33:27.246717
2,812a1753-b9d6-4a26-adab-1f383e3e77b7,seller_followup,new,2025-06-24 15:34:08.044,Follow up on seller: اولاد همام,3,3194,اولاد همام,,,,6,5,16,"[""النزهة"",""مدينة نصر""]",,,1,3,2025-06-24 12:33:27.246717
3,48083724-0920-45a3-af66-e4d83af4989e,seller_followup,new,2025-06-24 15:34:08.044,Follow up on seller: شركة الفنار - كشكول كامل,4,4965,شركة الفنار - كشكول كامل,,,,6,6,0,"[""سيدي بشر بحري""]",,,1,4,2025-06-24 12:33:27.246717
4,47218003-ceb0-44a8-b966-3ec2bb684612,seller_followup,new,2025-06-24 15:34:08.044,Follow up on seller: الاستاذ الاسكندرية - كشكو...,5,4852,الاستاذ الاسكندرية - كشكول كامل,,,,6,13,14,"[""السيوف""]",,,1,5,2025-06-24 12:33:27.246717


In [42]:
google_sheets("Liveops Tasks", "Sheet2", "append", df=df)

/home/ec2-user/service_account_key.json


  updated_data = pd.concat([existing_data, df], ignore_index=True)


'Data is appended to the sheet successfully'