In [1]:
# Graph Analysis

In [2]:
import numpy as np
import pandas as pd
import datetime as dt
pd.set_option('display.max_rows', 500)

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

res_folder = '/dbfs/user/yao/graph_research/geography'
geo_code = 'philly'
# cbsa_code = 37980

In [3]:
end_date = '2020-07-31'
date_fmt = '%Y-%m-%d'
last_month_end = (dt.datetime.strptime(end_date, date_fmt).replace(day=1)+dt.timedelta(days=-1)).strftime(date_fmt)

In [4]:
%sql
CREATE OR REPLACE TEMPORARY VIEW philly_year AS
(

    select year(ao.purchase_date_pt) year, count(*) vol, count(distinct ao.shop_id) unique_shops, count(distinct coalesce(ao.customer_id, ao.customer_phone)) unique_customers, sum(ao.gmv) total_gmv      
    from aggregates.all_orders ao
    join aggregates.dim_shops ds on ao.shop_id = ds.shop_id    
    join dimension.zipcode_cbsa_regions cbsa on ds.postal_code = cbsa.zip
    where cbsa.cbsa_code = 37980
    and (ds.is_live = True or ds.is_temporarily_suspended = True) and ds.shop_type in ('freemium', 'partner')
    and ds.ds_pt = '2020-07-31'
--   and ao.purchase_date_pt between '2019-03-01' and '2020-02-29'
    and ao.is_all_menus_order = false
    and ao.is_successful = true
    group by 1
    order by 1
);

In [5]:
df = spark.table('philly_year').toPandas()
df

Unnamed: 0,year,vol,unique_shops,unique_customers,total_gmv
0,2010,29,4,27,688.97
1,2011,987,16,673,28820.98
2,2012,4436,33,2723,121529.42
3,2013,18933,65,11522,522877.61
4,2014,44524,91,26250,1249889.82
5,2015,73211,187,42738,2022293.96
6,2016,130867,234,72507,3659944.49
7,2017,253707,347,126654,7132773.49
8,2018,493658,551,213790,14197085.07
9,2019,913109,783,338287,26957804.34


In [6]:
import datetime as dt
from dateutil.relativedelta import relativedelta

year_start = '2015-01-01'
date_fmt = '%Y-%m-%d'
df_arr = []
while year_start < '2020-01-02':
    year_end = (dt.datetime.strptime(year_start, date_fmt)+relativedelta(years=1)+relativedelta(days=-1)).strftime(date_fmt)
    if year_start[:4] == '2020':
        year_end = '2020-07-31'
    if year_start[:4] == '2015':
        year_end = '2016-12-31'
    print(year_start, year_end)
    sql_str = f"""
        CREATE OR REPLACE TEMPORARY VIEW philly_year AS
        (
            select year(ao.purchase_date_pt) year, count(*) vol, count(distinct ao.shop_id) unique_shops, count(distinct coalesce(ao.customer_id, ao.customer_phone)) unique_customers, sum(ao.gmv) total_gmv      
            from aggregates.all_orders ao
            join aggregates.dim_shops ds on ao.shop_id = ds.shop_id    
            join dimension.zipcode_cbsa_regions cbsa on ds.postal_code = cbsa.zip
            where cbsa.cbsa_code = 37980
            and (ds.is_live = True or ds.is_temporarily_suspended = True) and ds.shop_type in ('freemium', 'partner')
            and ds.ds_pt = '{year_end}'
            and ao.purchase_date_pt between '{year_start}' and '{year_end}'
            and ao.is_all_menus_order = false
            and ao.is_successful = true
            group by 1
        )
    """  
    _ = spark.sql(sql_str) 
    df_arr.append(spark.table('philly_year').toPandas())
    year_start = (dt.datetime.strptime(year_end, date_fmt)+relativedelta(days=1)).strftime(date_fmt)

In [7]:
import pandas as pd
pd.concat(df_arr)

Unnamed: 0,year,vol,unique_shops,unique_customers,total_gmv
0,2015,89569,233,52681,2464952.02
1,2016,155891,323,86878,4338397.01
0,2017,297390,475,149809,8336390.28
0,2018,559551,699,244243,16059710.4
0,2019,933559,836,347029,27520609.72
0,2020,834678,899,320345,28473463.28


In [8]:
sql_str = f"""
CREATE OR REPLACE TEMPORARY VIEW shop_consumer_geo AS
(
    select ao.purchase_date_pt, ao.shop_id, ds.postal_code shop_zip, ds.shop_lat, ds.shop_lng, coalesce(ao.customer_id, ao.customer_phone) customer_id, ao.customer_zip, ao.gmv,
    case when order_channel like '%app' then 'app' 
    when order_channel = 'call' then 'phone' 
    else 'other' end as channel      
    from aggregates.all_orders ao
    join aggregates.dim_shops ds on ao.shop_id = ds.shop_id    
    join dimension.zipcode_cbsa_regions cbsa on ds.postal_code = cbsa.zip
    where cbsa.cbsa_code = 37980
    and (ds.is_live = True or ds.is_temporarily_suspended = True) and ds.shop_type in ('freemium', 'partner')
    and ds.ds_pt = '2020-02-29'
    and ao.purchase_date_pt between '2019-03-01' and '2020-02-29'
    and ao.is_all_menus_order = false
    and ao.is_successful = true
)
"""

In [9]:
import numpy as np
import pandas as pd
import datetime as dt
from dateutil.relativedelta import relativedelta
from graphframes import *
from time import time
from itertools import combinations
import geopy.distance
import math


pd.set_option('display.max_rows', 500)

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

res_folder = '/dbfs/user/yao/graph_research/time'
geo_code = 'philly'
cbsa_code = 37980


In [10]:
def construct_shop_edges():
    geo_df = spark.table('philly_year').toPandas()
    geo_shops = geo_df.groupby(['shop_id', 'shop_lat', 'shop_lng']).agg({'customer_id': lambda x: x.nunique(), 'gmv': np.sum}).reset_index().rename(columns={'customer_id':'n_customer', 'gmv':'total_gmv'})
    print(f'# shops: {geo_shops.shape[0]}')

    geo_df = geo_df[~geo_df['customer_id'].isnull()]
    app_users = geo_df[geo_df['channel']=='app'][['customer_id']].drop_duplicates()

    shop_ids = geo_df['shop_id'].unique()
    customer_ids = geo_df['customer_id'].unique()
    shop_nodes = pd.DataFrame({'id':shop_ids, 'type': ['shop']*len(shop_ids)})
    customer_nodes = pd.DataFrame({'id':customer_ids, 'type': ['customer']*len(customer_ids)})
    edges = geo_df.rename(columns={'shop_id':'src', 'customer_id':'dst'})

    # find multi-shop consumers

    count_shops = edges.groupby('dst')['src'].agg(lambda x: x.nunique()).reset_index().rename(columns={'src':'n_shops'}).sort_values('n_shops', ascending=False)
    multi_shops_c = count_shops[count_shops['n_shops']>1]
    multi_shops_c_edges = edges.merge(multi_shops_c[['dst']], on='dst')
    multi_c_shop = multi_shops_c_edges[['dst', 'src', 'shop_lat', 'shop_lng','gmv']].groupby(['dst', 'src', 'shop_lat', 'shop_lng',]).agg(len).rename(columns={'gmv':'one_shop_count'}).reset_index()

    # repeat shop-shop edge by log2(N) where N is the # of purchases of the connecting consumer.
    
    by_c_count = edges[['dst', 'src']].groupby('dst').count().rename(columns={'src':'total_count'}).reset_index()
    by_c_count['n_rep'] = by_c_count['total_count'].apply(lambda x: round(np.log2(x)))
    multi_c = by_c_count.merge(multi_shops_c[['dst']], on='dst')
    multi_c['phone']=multi_c['dst'].apply(lambda x: x[0]=='+')
    multi_c = multi_c.merge(app_users, left_on='dst', right_on='customer_id', how='left', indicator=True)
    multi_c['app']=multi_c['_merge'].apply(lambda x: 1 if x=='both' else 0)
    n_app_multi = multi_c['app'].sum()
    n_phone_multi = multi_c['phone'].sum()
    app_vol = edges.merge(multi_c[multi_c['app']==1]).shape[0]
    phone_vol = edges.merge(multi_c[multi_c['phone']==1]).shape[0]
    print(f'# of multi_shop user: {len(multi_c)}, # of multi_shop app user: {n_app_multi}, # of multi_shop phone user: {n_phone_multi}')

    consumers = edges[['dst']].drop_duplicates()
    consumers['phone']=consumers['dst'].apply(lambda x: x[0]=='+')
    n_phone_users = consumers['phone'].sum()
    print( f'# of all users: {consumers.shape[0]}, # of app user: {app_users.shape[0]}, # of phone user: {n_phone_users}')
    print(f'# of total vol: {len(edges)},# of multi_shop vol: {len(multi_shops_c_edges)}, # of multi_shop app user vol: {app_vol}, # of multi_shop phone user: {phone_vol}')

    rep_map = {}
    for k, v, isapp in zip(multi_c['dst'], multi_c['n_rep'], multi_c['app']):
      app_weight = 1.5 if isapp else 1 # app user weight 1.5
      rep_map[k] = v * app_weight

    s_edges_arr = []
    dist_arr = []
    s_count_arr = []
    count = 0
    for dst in multi_c['dst']:
        count+=1
        if not count % 5000:
            print(f'processing {count}th consumer')
        temp=multi_c_shop[multi_c_shop['dst']==dst]
        s_count_arr.append(len(temp))
        for el in combinations(temp['src'], 2):
            [s_edges_arr.append([el[0], el[1]]) for _ in range(int(rep_map[dst]))]

    # compute a few metrics on multi-shop consumers        
            sa = temp[temp['src']==el[0]]
            sb = temp[temp['src']==el[1]]
            dist_arr.append(geopy.distance.geodesic((sa['shop_lat'].tolist()[0], sa['shop_lng'].tolist()[0]), (sb['shop_lat'].tolist()[0], sb['shop_lng'].tolist()[0])).miles)
    shop_edges = pd.DataFrame(s_edges_arr, columns=['src','dst'])
    
    print(f'# shop-c-shop links: {len(dist_arr)}, median dist: {np.median(dist_arr)}, mean dist: {np.mean(dist_arr)}')
    print(f'unique shop count by multi-shop consumer - median: {np.median(s_count_arr):.1f}, mean: {np.mean(s_count_arr):.1f}')     

    multi_count_arr = []
    for shop_id in edges['src'].unique():
        temp = edges[edges['src']==shop_id]
        multi_temp=temp.merge(multi_shops_c[['dst']], on='dst')
        multi_count_arr.append((shop_id, multi_temp['dst'].nunique(), temp['dst'].nunique()))

    print('silos: ', [el[0] for el in multi_count_arr if el[1]==0])
    multi_consumer_count = [el[1] for el in multi_count_arr if el[1]>0]
    multi_consumer_pct = [el[1]/el[2] for el in multi_count_arr if el[1]>0]
    print(f'unique multi-shop consumer count by store - median: {np.median(multi_consumer_count):.1f}, mean: {np.mean(multi_consumer_count):.1f}')
    print(f'unique multi-shop consumer pct by store - median: {np.median(multi_consumer_pct)*100:.1f}, mean: {np.mean(multi_consumer_pct)*100:.1f}')
    
    return geo_df, shop_nodes, shop_edges

def label_propogation(shop_nodes, shop_edges):
    # construct shop-to-shop graph            
    vertices_df = spark.createDataFrame(shop_nodes)
    edges_df = spark.createDataFrame(shop_edges)
    print(f'shop-shop graph - nodes: {shop_nodes.shape[0]}, edges: {shop_edges.shape[0]}')
    shop_graph = GraphFrame(vertices_df, edges_df)        

    # community detection using label propogation
    start = time()
    communities = shop_graph.labelPropagation(maxIter=10)
    print(time()-start)
    print(f"There are {communities.select('label').distinct().count()} communities in sample graph.")

    community_df = communities.toPandas()
    community_df.to_csv(f'{res_folder}/community_{geo_code}_{year_end[:4]}.csv')

    size_arr = []
    for label in community_df['label'].unique():
        sub = community_df[community_df['label']==label]
        size_arr.append([sub.shape[0], label])

    size_df = pd.DataFrame(size_arr, columns=['size', 'label'])
    size_df.sort_values('size', ascending=False, inplace=True)
    return community_df, size_df

In [11]:
def community_summary(label, community_df, shop_edges):
    sub_df = community_df[community_df['label']==label].copy()
    vertices_sub = sub_df[['id']]
    edges_l = pd.merge(shop_edges, vertices_sub, left_on='src', right_on='id')
    edges_sub_all = pd.merge(edges_l, vertices_sub, left_on='dst', right_on='id', how='left', indicator=True)
    edges_sub_intra = edges_sub_all[edges_sub_all['_merge']=='both']
    edges_sub_inter = edges_sub_all[edges_sub_all['_merge']!='both']

    multi_count_arr_sub = []
    for shop_id in edges_sub_intra['src'].unique():
        temp = edges[edges['src']==shop_id]
        multi_temp=temp.merge(multi_shops_c[['dst']], on='dst')
        multi_count_arr_sub.append((shop_id, multi_temp['dst'].nunique(), temp['dst'].nunique()))

    multi_consumer_count_sub = [el[1] for el in multi_count_arr_sub if el[1]>0]
    multi_consumer_pct_sub = [el[1]/el[2] for el in multi_count_arr_sub if el[1]>0]

    return [label, vertices_sub.shape[0], edges_sub_intra.shape[0], edges_sub_inter.shape[0], edges_sub_intra.drop_duplicates().shape[0],edges_sub_inter.drop_duplicates().shape[0], np.median(multi_consumer_pct_sub)]


def get_metrics(geo_df, community_df, size_df, shop_edges, year_end):
    # business metrics summary
    geo_user = geo_df[['customer_id']].drop_duplicates().merge(app_users, on='customer_id', how='left', indicator=True)
    geo_user['app']=geo_user['_merge'].apply(lambda x: 1 if x=='both' else 0)
    geo_shop_app = geo_df[['shop_id', 'customer_id']].drop_duplicates().merge(geo_user, on='customer_id').groupby('shop_id').agg({'customer_id':len, 'app': sum}).reset_index()
    geo_shop_app['app_user_ratio'] = geo_shop_app['app']/geo_shop_app['customer_id']
    geo_shops = geo_shops.merge(geo_shop_app, on='shop_id')

    metrics_arr = []
    for i, label in enumerate(size_df.label):  
        sub = community_df[community_df['label']==label]
        sub = sub.merge(geo_shops, left_on='id', right_on='shop_id')
        metrics_arr.append([label, sub['total_gmv'].mean(), sub['app_user_ratio'].mean()])

    business_df = pd.DataFrame(metrics_arr, columns=['label','gmv_mean', 'app_user_ratio_mean'])

    # radius of community
    sub_arr = []

    for i, label in enumerate(size_df.label):  
        sub = community_df[community_df['label']==label]
        sub = sub.merge(geo_shops, left_on='id', right_on='shop_id')
        center_lat, center_lng = sub['shop_lat'].median(), sub['shop_lng'].median()
        sub['dist'] = sub.apply(lambda x: geopy.distance.geodesic((x['shop_lat'], x['shop_lng']), (center_lat, center_lng)).miles, axis=1)
        sub_arr.append(sub)

    gmap_final = pd.concat(sub_arr)
    radius_df = gmap_final.groupby(['label'])['dist'].agg(lambda x: np.percentile(x, 80)).reset_index().rename(columns={'dist':'radius'}).merge(size_df, on='label').sort_values('size', ascending=False)

    # community_statistics
    size_df = size_df[size_df['size']>1].copy()
    temp = size_df.merge(community_df, on='label')
    summary_arr = []
    for i, label in enumerate(temp['label'].unique()):
        res = community_summary(label, community_df, shop_edges, i) 
        summary_arr.append(res)
    summary_df = pd.DataFrame(summary_arr, columns=['label', 'nodes', 'in_edges', 'out_edges', 'u_in_edges', 'u_out_edges', 'mc_pct'])
    
    # app retention & new users
    year_end_minus_one = (dt.datetime.strptime(year_end, date_fmt)+relativedelta(months=-1)).strftime(date_fmt)
    app_df = geo_df[geo_df['channel']=='app']
    last_month = app_df[(app_df['purchase_date_pt']>year_end_minus_one)&(app_df['purchase_date_pt']<=year_end)].groupby('shop_id')['customer_id'].agg(lambda x: set(x)).reset_index().rename(columns={'customer_id':'last_id'})
    this_month = app_df[app_df['purchase_date_pt']>year_end_minus_one].groupby('shop_id')['customer_id'].agg(lambda x: set(x)).reset_index()

    retention = last_month.merge(this_month, on='shop_id', how='left')
    retention['retention'] = retention.apply(lambda x: len(x.last_id&x.customer_id)/len(x.last_id) if type(x.customer_id)==set else 0, axis=1)

    prior_months = app_df[app_df['purchase_date_pt']<=year_end_minus_one].groupby('shop_id')['customer_id'].agg(lambda x: set(x)).reset_index().rename(columns={'customer_id':'prior_id'})

    new = prior_months.merge(this_month, on='shop_id', how='left')
    new['new_pct'] = new.apply(lambda x: len(x.customer_id-x.prior_id)/len(x.customer_id) if type(x.customer_id)==set else 0,axis=1)
    new['new_count'] = new.apply(lambda x: len(x.customer_id-x.prior_id) if type(x.customer_id)==set else 0,axis=1)
    
    shop_retention = community_df.merge(retention, left_on='id', right_on='shop_id', how='left')
    community_retention = shop_retention[~shop_retention['retention'].isnull()].groupby('label')['retention'].agg(np.mean).reset_index()

    shop_new = community_df.merge(new, left_on='id', right_on='shop_id', how='left')
    community_new_pct = shop_new[~shop_new['new_pct'].isnull()].groupby('label')['new_pct'].agg(np.mean).reset_index()
    community_new_count = shop_new[~shop_new['new_count'].isnull()].groupby('label')['new_count'].agg(np.mean).reset_index()
    
    # combine all metrics
    performace_metrics = radius_df.merge(summary_df, on='label').merge(business_df, on='label').merge(community_retention, on='label', how='left').merge(community_new_pct, on='label', how='left').merge(community_new_count, on='label', how='left')
    performace_metrics.to_csv(f'{res_folder}/metrics_{geo_code}_{year_end[:4]}.csv')
    
def get_map_markers(size_df):
    colors = ['red', 'blue', 'yellow', 'green', 'purple', 'teal', 'gray', 'olive', 'maroon', 'navy', 'black', 'white']
    mshapes = ['circle', 'square', 'marker']

    map_str_arr = []
    sub_arr = []
    top_communities = size_df[size_df['size']>=10] 
    top_labels = size_df['label'].tolist()[:36] if len(top_communities)>36 else top_communities.label
    for i, label in enumerate(top_labels):  
        sub = community_df[community_df['label']==label]
        sub = sub.merge(geo_shops, left_on='id', right_on='shop_id')
        center_lat, center_lng = sub['shop_lat'].median(), sub['shop_lng'].median()
        sub['dist'] = sub.apply(lambda x: geopy.distance.geodesic((x['shop_lat'], x['shop_lng']), (center_lat, center_lng)).miles, axis=1)

        color = colors[i%len(colors)]
        mshape_arr = [mshapes[i//len(colors)]]*len(sub)
        sub = pd.concat([sub, pd.DataFrame({'mshape': mshape_arr})], axis=1) 
        sub['map_str'] = sub.apply(lambda x: str(x.shop_lat)+','+str(x.shop_lng)+','+color+','+x.mshape+',', axis=1)
        map_str_arr.extend(sub['map_str'].tolist())
        sub_arr.append(sub)

    gmap_final = pd.concat(sub_arr)
    gmap_final.to_csv(f'{res_folder}/gmap_{geo_code}_{year_end[:4]}.csv', index=False)    
    
def process_time_window(year_end):
    geo_df, shop_nodes, shop_edges = construct_shop_edges()
    community_df, size_df = label_propogation(shop_nodes, shop_edges)
    get_metrics(geo_df, community_df, size_df, shop_edges, year_end)
    get_map_markers(size_df)      

In [12]:
year_start = '2015-01-01'
date_fmt = '%Y-%m-%d'
df_arr = []
while year_start < '2015-01-02':
    year_end = (dt.datetime.strptime(year_start, date_fmt)+relativedelta(years=1)+relativedelta(days=-1)).strftime(date_fmt)
    ds_end = year_end
    if year_start[:4] == '2020':
        year_end = '2020-07-31'
    if year_start[:4] == '2015':
        ds_end = '2016-12-31'
    print(year_start, year_end)
    sql_str = f"""
        CREATE OR REPLACE TEMPORARY VIEW philly_year AS
        (
            select ao.purchase_date_pt, ao.shop_id, ds.postal_code shop_zip, ds.shop_lat, ds.shop_lng, coalesce(ao.customer_id, ao.customer_phone) customer_id, ao.customer_zip, ao.gmv,
            case when order_channel like '%app' then 'app' 
                when order_channel = 'call' then 'phone' 
                else 'other' end as channel      
            from aggregates.all_orders ao
            join aggregates.dim_shops ds on ao.shop_id = ds.shop_id    
            join dimension.zipcode_cbsa_regions cbsa on ds.postal_code = cbsa.zip
            where cbsa.cbsa_code = {cbsa_code}
            and (ds.is_live = True or ds.is_temporarily_suspended = True) and ds.shop_type in ('freemium', 'partner')
            and ds.ds_pt = '{ds_end}'
            and ao.purchase_date_pt between '{year_start}' and '{year_end}'
            and ao.is_all_menus_order = false
            and ao.is_successful = true
        )
    """  
    _ = spark.sql(sql_str)    
    process_time_window(year_end)
    year_start = (dt.datetime.strptime(year_end, date_fmt)+relativedelta(days=1)).strftime(date_fmt)