In [1]:
from pyhive import presto
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import numpy as np
import time

from datetime import datetime, timedelta

import seaborn as sns

conn = presto.connect(
    host='presto.processing.yoda.run', 
    ## presto.processing.yoda.run
    ## bi-presto.serving.data.production.internal
    port=80,
    username='manoj.ravirajan@rapido.bike'
)

In [13]:
start_date = '2022-11-14'
end_date = '2022-11-20'
service = 'auto'

In [14]:
code = f"""

with v0 as
(
select
    customerid,
    extract(week from cast(day as date)) as week,
    date_format(date_trunc('week', cast(day as date)), '%Y-%m-%d') start_date,
    sum(gross_rides_daily ) as gross_rides,
    sum(case when net_rides_daily > 0 then net_rides_daily end) as net_rides,
    sum(case when net_rides_daily > 0 then subscription_rides_daily end) as sub_rides,
    sum(case when net_rides_daily > 0 then discount_daily end) as burn,
    sum(case when subscription_rides_daily > 0 then discount_daily end) as subs_discount,
    sum(case when net_rides_daily > 0 then subtotal_daily end) as subtotal,
    sum(ao_sessions_unique_daily) as ao,
    sum(fe_sessions_unique_daily) as fe,
    sum(rr_sessions_unique_daily) as rr
from 
    datasets.customer_rf_daily_kpi
where
    day between date_format(date_trunc('week', cast('{start_date}' as date)), '%Y-%m-%d') --]] date_format((date_trunc('week', CURRENT_DATE) - INTERVAL '14' day), '%Y-%m-%d')
    and date_format(date_trunc('week', cast('{end_date}' as date) + INTERVAL '7' day) - INTERVAL '1' day, '%Y-%m-%d') --]] date_format((date_trunc('week', CURRENT_DATE)), '%Y-%m-%d')
    and service_name in ('Link', 'Auto')
group by 
    1,2,3
),

v1 as (
select
    start_date,
    week,
    case 
    when lifetime_stage = 'HANDHOLDING' then '1.HANDHOLDING'
    when lifetime_stage = 'HOOK' then '2.HOOK'
    when lifetime_stage = 'DETOX' then '3.DETOX'
    when lifetime_stage = 'SUSTENANCE' then '4.SUSTENANCE'
    when lifetime_stage = 'COMMITTED' then '5.COMMITTED'
    when lifetime_stage = 'CHURN_OTB' then '6.CHURN_OTB'
    when lifetime_stage = 'SOFT_CHURN' then '7.SOFT_CHURN'
    when lifetime_stage = 'DORMANT' then '8.DORMANT'
    when lifetime_stage in ('INACTIVE','UNKNOWN') then '9.INACTIVE' 
    end lifetime_stage,
    customer_id
from 
    (
    select
        date_format(date_trunc('week', cast(run_date as date) + interval '1' day), '%Y-%m-%d') start_date,
        (extract(week from cast(run_date as date)) + 1) as week,
        case
        when lower('{service}') = ('link') then link_lifetime_stage 
        when lower('{service}') = ('auto') then auto_lifetime_stage 
        else taxi_lifetime_stage end lifetime_stage,
        customer_id,
        count(*) records
    from datasets.iallocator_customer_segments 
    where 
        run_date between date_format(date_trunc('week', cast('{start_date}' as date)) - interval '1' day,'%Y-%m-%d') --]] date_format((date_trunc('week', CURRENT_DATE) - INTERVAL '15' day), '%Y-%m-%d')
        and date_format(cast('{end_date}' as date) - interval '5' day,'%Y-%m-%d') --]] date_format((date_trunc('week', CURRENT_DATE)), '%Y-%m-%d')
        and day_of_week(cast (run_date as date)) = 7
        and case
            when lower('{service}') = ('link') then link_lifetime_rides 
            when lower('{service}') = ('auto') then auto_lifetime_rides 
            else taxi_lifetime_rides end > 0 
    group by 1,2,3,4
    )
),

v2 as (
select 
    coalesce(v1.week, v0.week) week,
    case
    when lifetime_stage is null then '0.NEW'
    when lifetime_stage is not null then lifetime_stage
    end as lifetime_stage,
    count(distinct v1.customer_id) as Base,
    count(distinct case when net_rides > 0 then v0.customerid end) as Net_Customers,
    sum(gross_rides) as GrossRides,
    sum(net_rides) as NetRides,
    sum(sub_rides) as Subs_Rides,
    count(distinct case when sub_rides > 0 then v0.customerid end) as Subs_Customers,
    sum(burn) as Discount,
    sum(subs_discount) as Subs_Discount,
    sum(subtotal) as Subtotal,
    sum(ao) as AO,
    sum(fe) as FE,
    sum(rr) as RR,
    count(distinct case when ao > 0 then v0.customerid end) as AO_Cust,
    count(distinct case when fe > 0 then v0.customerid end) as FE_Cust,
    count(distinct case when rr > 0 then v0.customerid end) as RR_Cust
from
    v0
full outer join v1 on v0.customerid = v1.customer_id and v0.week = v1.week
group by 
    1,2 
order by 
    1,2
),

base as
(
select 
    coalesce(v1.week, v0.week) week,
    case
    when lifetime_stage is null then '0.NEW'
    when lifetime_stage is not null then lifetime_stage
    end as lifetime_stage,
    v0.start_date,
    v0.customerid as custs,
    (case when v1.customer_id is null then v0.customerid else v1.customer_id end) as customerid
from
    v0
full outer join v1 on v0.customerid = v1.customer_id and v0.week = v1.week
),

orders as (
select
    yyyymmdd,
    week(date_parse(yyyymmdd,'%Y%m%d')) week,
    date_format(date_trunc('week',date_parse(yyyymmdd,'%Y%m%d')), '%Y-%m-%d') startdate,
    customer_id,
    order_id,
    city_name,
    discount,
    amount,
    sub_total,
    service_obj_service_name,
    order_status
from orders.order_logs_snapshot
where 
    yyyymmdd between date_format(date_trunc('week', cast('{start_date}' as date)) - interval '14' day,'%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE) - INTERVAL '28' day), '%Y%m%d')
    and date_format(date_trunc('week', cast('{end_date}' as date) + INTERVAL '7' day) - INTERVAL '1' day, '%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE)), '%Y%m%d')
    and customer_id in (select distinct customerid from base)
    and service_obj_service_name in ('Link','Auto')
    and order_id is not null
    and service_obj_city_display_name is not null
    and (spd_fraud_flag != true OR spd_fraud_flag IS NULL)
),

order_coin as (
select
    week(date_parse(yyyymmdd,'%Y%m%d')) week_number,
    date_format(date_trunc('week',date_parse(yyyymmdd,'%Y%m%d')), '%Y-%m-%d') start_date,
    owner_id customerid,
    entity_id,
    cast(json_extract(coin_wallet_changes, '$[0].offerType') as varchar) offerType,
    coalesce(round(sum(case when transaction_type = 'credit' then cast(amount as double) end)),0) as coin_credited,
    coalesce(round(sum(case when transaction_type = 'debit' and subtype != 'coinExpired' then cast(amount as double) end)),0) as coin_utilized,
    coalesce(round(sum(case when transaction_type = 'debit' and subtype = 'coinExpired' then cast(amount as double) end)),0) as coin_expired

from payments.transactions_snapshot
where 
    yyyymmdd between date_format(date_trunc('week', cast('{start_date}' as date)),'%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE) - INTERVAL '14' day), '%Y%m%d')
    and date_format(date_trunc('week', cast('{end_date}' as date) + INTERVAL '7' day) - INTERVAL '1' day, '%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE)), '%Y%m%d')
    and owner_type ='customer'
    and transaction_status = 'done'
    and json_extract(coin_wallet_changes, '$[0].id') is not null 
group by 1,2,3,4,5
),

service_total_coins as (
select
    start_date,
    week_number,
    customer_id,
    entity_id,
    sum(case when offerType in ('locationOffer','rideOffer','scratchCardOffer') and service_obj_service_name in ('Auto','Link') then coin_credited end) total_coin_credited,
    sum(case when offerType in ('locationOffer','rideOffer','scratchCardOffer') and service_obj_service_name in ('Auto','Link') then coin_expired end) total_coin_expired,
    sum(case when offerType = 'giftOffer' and service_obj_service_name in ('Auto','Link') then coin_utilized end) giftOffer_utilized,
    sum(case when offerType = 'walletRechargeOffer' and service_obj_service_name in ('Auto','Link') then coin_utilized end) walletOffer_utilized,
    sum(case when offerType = 'dashboardCoinsCredit' and service_obj_service_name in ('Auto','Link') then coin_utilized end) dashboardCoinsCredit_utilized
from
    (
    select
        a.*,
        b.*
    from order_coin a 
    join orders b on a.customerid = b.customer_id and a.entity_id = b.order_id and a.start_date = b.startdate
    )
where start_date is not null
group by 1,2,3,4
),

coin_credit as
(
select
    start_date,
    week_number,
    coalesce(lifetime_stage, '0.NEW') lifetime_stage,
    sum(total_coin_credited) total_coin_credited,
    sum(walletOffer_utilized) walletOffer_utilized,
    sum(giftOffer_utilized) giftOffer_utilized,
    sum(dashboardCoinsCredit_utilized) dashboardCoinsCredit_utilized
from
    (
    select
        (case when a.week is null then b.week_number else a.week end) as week,
        (case when a.customerid is null then b.customer_id else a.customerid end) as customerid,
        (case when a.start_date is null then b.start_date else a.start_date end) as start_date,
        a.lifetime_stage,
        b.week_number,
        b.walletOffer_utilized,
        b.giftOffer_utilized,
        b.dashboardCoinsCredit_utilized,
        b.total_coin_credited
    from base a 
    right join service_total_coins b on a.start_date = b.start_date and a.customerid = b.customer_id
    )
where start_date is not null
group by 1,2,3
),

coin_expired_customer_level as (
select
    start_date as Start2,
    customerid as cust2,
    sum(coin_expired) coin_expired
from order_coin
where 
    entity_id in (select order_id from orders)
    and coin_expired > 0
group by 1,2
),

expired_final as (
select
    a.lifetime_stage,
    b.*
from v1 a 
join coin_expired_customer_level b on a.start_date = b.Start2 and a.customer_id = b.cust2
),

coin_expired_agg as (
select
    Start2,
    lifetime_stage,
    sum(coin_expired) coin_expired
from expired_final
group by 1,2
),

coins as (
select
    a.*,
    coalesce(b.coin_expired,0) coin_expired,
    (coalesce(a.total_coin_credited,0) - coalesce(b.coin_expired,0)) + coalesce(a.walletOffer_utilized,0) + coalesce(a.giftOffer_utilized,0) + coalesce(a.dashboardCoinsCredit_utilized,0) Total_coin_burn
from coin_credit a 
full outer join coin_expired_agg b on a.start_date = b.Start2 and a.lifetime_stage = b.lifetime_stage
),

final_with_coins as (
select
    a.*,
    b.Total_coin_burn
from v2 a 
left join coins b on a.Week = b.week_number and a.lifetime_stage = b.lifetime_stage
),

discounted_rides_v0 as (
select 
    startdate,
    customer_id,
    order_id,
    discount,
    amount,
    sub_total
from orders
where 
    yyyymmdd between date_format(date_trunc('week', cast('{start_date}' as date)),'%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE) - INTERVAL '14' day), '%Y%m%d')
    and date_format(date_trunc('week', cast('{end_date}' as date) + INTERVAL '7' day) - INTERVAL '1' day, '%Y%m%d') --]] date_format((date_trunc('week', CURRENT_DATE)), '%Y%m%d')
    and order_id is not null
    and order_status = 'dropped'
),

discounted_rides_v1 AS (
select
    b.StartDate,
    extract(week from date(b.StartDate)) week,
    coalesce(a.lifetime_stage,'0.NEW') lifetime_stage,
    count(distinct b.customer_id) Net_customers,
    count(distinct case when b.discount > 0 then b.order_id end) Discounted_Rides,
    sum(b.sub_total) sub_total,
    sum(b.discount) discount
from v1 a 
right join discounted_rides_v0 b on a.customer_id = b.customer_id and a.start_date = b.StartDate
group by 1,2,3
),

final_with_discounted as (
select
    a.*,
    b.Discounted_Rides
from final_with_coins a
join discounted_rides_v1 b on a.week = b.week and a.lifetime_stage = b.lifetime_stage
),

detailed as (
select 
    week,
    lifetime_stage,
    Base,
    Net_Customers,
    coalesce(try(Net_Customers*100.0/Base), 0) "Conversion%",
    GrossRides,
    NetRides,
    Subs_Customers,
    Subs_Rides,
    Discounted_Rides,
    coalesce(try(Discounted_Rides*100.0/NetRides), 0) as "Discounted_Rides%",
    coalesce(try(NetRides*100.0/GrossRides), 0) as "G2N%",
    coalesce(try(NetRides*1.0/Net_Customers), 0) RPC,
    coalesce(try(Subs_Rides*1.0/Subs_Customers), 0) RPC_Subs_Cust,
    coalesce(try(Subs_Rides*100.0/NetRides), 0) as "Subs_Rides%",
    Discount,
    Total_coin_burn,
    Discount + Total_coin_burn Discount_w_coins,
    Subs_Discount,
    coalesce(try(Subs_Discount*100.0/Discount), 0) as "Subs_Discount%",
    Subtotal,
    coalesce(try(Discount*100.0/Subtotal), 0) as "Discount%_w0_coins",
    coalesce(try((Discount + Total_coin_burn)*100.0/Subtotal), 0) "Discount%",
    coalesce(try(Subtotal*1.0/NetRides), 0) ATV,
    coalesce(try(Discount*1.0/NetRides), 0) DPR_wo_coins,
    coalesce(try((Discount + Total_coin_burn)*1.0/NetRides), 0) DPR_w_coins,
    AO as AO_Sessions,
    FE as FE_Sessions,
    RR as RR_Sessions,
    AO_Cust,
    coalesce(try(AO_Cust*100.0/Base), 0) "AO_Cust%",
    FE_Cust,
    coalesce(try(FE_Cust*100.0/Base), 0) "FE_Cust%",
    RR_Cust,
    coalesce(try(RR_Cust*100.0/Base), 0) "RR_Cust%",
    coalesce(try(RR*100.0/FE), 0) "FF_RR"    
from final_with_discounted
),

summary as (
select 
    week,
    'TOTAL' lifetime_stage,
    sum(Base) Base,
    sum(Net_Customers) Net_Customers,
    coalesce(try(sum(Net_Customers)*100.0/sum(Base)), 0) "Conversion%",
    sum(GrossRides) GrossRides,
    sum(NetRides) NetRides,
    sum(Subs_Customers) Subs_Customers,
    sum(Subs_Rides) Subs_Rides,
    sum(Discounted_Rides) Discounted_Rides,
    coalesce(try(sum(Discounted_Rides)*100.0/sum(NetRides)), 0) as "Discounted_Rides%",
    coalesce(try(sum(NetRides)*100.0/sum(GrossRides)), 0) as "G2N%",
    coalesce(try(sum(NetRides)*1.0/sum(Net_Customers)), 0) RPC,
    coalesce(try(sum(Subs_Rides)*1.0/sum(Subs_Customers)), 0) RPC_Subs_Cust,
    coalesce(try(sum(Subs_Rides)*100.0/sum(NetRides)), 0) as "Subs_Rides%",
    sum(Discount) Discount,
    sum(Total_coin_burn) Total_coin_burn,
    sum(Discount) + sum(Total_coin_burn) Discount_w_coins,
    sum(Subs_Discount) Subs_Discount,
    coalesce(try(sum(Subs_Discount)*100.0/sum(Discount)), 0) as "Subs_Discount%",
    sum(Subtotal) Subtotal,
    coalesce(try(sum(Discount)*100.0/sum(Subtotal)), 0) as "Discount%_w0_coins",
    coalesce(try((sum(Discount) + sum(Total_coin_burn))*100.0/sum(Subtotal)), 0) "Discount%",
    coalesce(try(sum(Subtotal)*1.0/sum(NetRides)), 0) ATV,
    coalesce(try(sum(Discount)*1.0/sum(NetRides)), 0) DPR_wo_coins,
    coalesce(try((sum(Discount) + sum(Total_coin_burn))*1.0/sum(NetRides)), 0) DPR_w_coins,
    sum(AO_Sessions) as AO_Sessions,
    sum(FE_Sessions) as FE_Sessions,
    sum(RR_Sessions) as RR_Sessions,
    sum(AO_Cust) AO_Cust,
    coalesce(try(sum(AO_Cust)*100.0/sum(Base)), 0) "AO_Cust%",
    sum(FE_Cust) FE_Cust,
    coalesce(try(sum(FE_Cust)*100.0/sum(Base)), 0) "FE_Cust%",
    sum(RR_Cust) RR_Cust,
    coalesce(try(sum(RR_Cust)*100.0/sum(Base)), 0) "RR_Cust%",
    coalesce(try(sum(RR_Sessions)*100.0/sum(FE_Sessions)), 0) "FF_RR"    
from detailed
group by 1
)

select * from detailed
union
select * from summary
order by 1 desc, 2
"""

In [15]:
df_code = pd.read_sql(code, conn)
df_code

Unnamed: 0,week,lifetime_stage,Base,Net_Customers,Conversion%,GrossRides,NetRides,Subs_Customers,Subs_Rides,Discounted_Rides,...,AO_Sessions,FE_Sessions,RR_Sessions,AO_Cust,AO_Cust%,FE_Cust,FE_Cust%,RR_Cust,RR_Cust%,FF_RR
0,46,0.NEW,0,945423,0.0,3389094,1689838,11377,43683,747387,...,10831866,8348656,2679816,1974257,0.0,1897821,0.0,1296509,0.0,32.1
1,46,1.HANDHOLDING,897943,204486,22.8,998158,437854,4420,18408,133577,...,2721657,2099256,737783,357137,39.8,332723,37.1,273351,30.4,35.1
2,46,2.HOOK,515841,189423,36.7,1053327,452171,6439,26087,99455,...,2422672,1838895,750704,282574,54.8,257389,49.9,238820,46.3,40.8
3,46,3.DETOX,23670,12329,52.1,85759,38176,2348,10324,21218,...,189576,142148,60136,16380,69.2,14729,62.2,14185,59.9,42.3
4,46,4.SUSTENANCE,296290,134034,45.2,903301,336462,4704,21091,64234,...,1925879,1424777,642549,199306,67.3,176332,59.5,178480,60.2,45.1
5,46,5.COMMITTED,250792,179382,71.5,1808722,659980,8944,42022,93466,...,3194509,2234880,1224399,214806,85.7,178359,71.1,206986,82.5,54.8
6,46,6.CHURN_OTB,109798,43284,39.4,238967,108532,1424,6788,27087,...,556908,417106,177683,63935,58.2,57257,52.1,55373,50.4,42.6
7,46,7.SOFT_CHURN,79158,22793,28.8,102421,50393,667,3043,14931,...,264517,204119,76494,35988,45.5,33267,42.0,28996,36.6,37.5
8,46,8.DORMANT,3628099,546481,15.1,2443901,1133911,16031,74143,346320,...,6690793,5201917,1865566,991631,27.3,923346,25.4,747463,20.6,35.9
9,46,9.INACTIVE,6132883,342584,5.6,1254478,642624,9037,38988,209744,...,4013192,3177578,995997,681806,11.1,648846,10.6,466090,7.6,31.3


In [16]:
taxi_auto_stage = df_code

In [6]:
taxi_taxi_stage.to_csv("taxi-taxi stage_46.csv", index = False)

In [11]:
taxi_link_stage.to_csv("taxi-link stage_46.csv", index = False)

In [17]:
taxi_auto_stage.to_csv("taxi-auto stage_46.csv", index = False)