# 03 Measuring Customers

In [1]:
import json
import sqlalchemy
import pandas as pd

from pprint import pprint

In [2]:
# Make a sql connection with sqlalchmey
conn_string = "postgresql://postgres-db/churn?user=postgres&password=password" 
engine = sqlalchemy.create_engine(
    conn_string, connect_args={'options': '-csearch_path={}'.format("socialnet7,public")}
)
conn = engine.connect()

# Query with Pandas, e.g. list all tables
tables = pd.read_sql_query("SELECT * FROM information_schema.tables;", conn)
tables.head(3)

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
0,churn,socialnet7,active_period,BASE TABLE,,,,,,YES,NO,
1,churn,socialnet7,event_type,BASE TABLE,,,,,,YES,NO,
2,churn,socialnet7,metric_name,BASE TABLE,,,,,,YES,NO,


In [3]:
with open("/app/fightchurn/listings/conf/socialnet7_listings.json", "r") as f: 
    conf = json.loads(f.read())

chap3 = conf["chap3"]

## Dependent Data

In [4]:
event = pd.read_sql_query("SELECT * FROM event LIMIT 15000", conn) 
display(event.shape)
event.head()

(15000, 3)

Unnamed: 0,account_id,event_time,event_type_id
0,32,2020-03-11 14:35:16,6
1,32,2020-03-11 19:15:36,6
2,32,2020-03-11 02:19:26,6
3,32,2020-03-11 18:07:15,6
4,32,2020-03-11 23:32:31,6


In [5]:
event.event_type_id.value_counts()

6    8421
7    1906
2    1813
0     986
3     898
4     699
1     261
5      16
Name: event_type_id, dtype: int64

In [6]:
event_type = pd.read_sql_query("SELECT * FROM event_type", conn) 
display(event_type.shape)
event_type.head()

(8, 2)

Unnamed: 0,event_type_id,event_type_name
0,0,post
1,1,newfriend
2,2,like
3,3,adview
4,4,dislike


### Result Data

In [29]:
tmp = pd.read_sql_query("SELECT metric_name_id, COUNT(*) FROM metric GROUP BY metric_name_id", conn) 
tmp

Unnamed: 0,metric_name_id,count
0,0,163450
1,1,144501
2,2,160936
3,3,160510
4,4,154133
5,5,39979
6,6,155804
7,7,124663


In [8]:
tmp = pd.read_sql_query("SELECT * FROM metric_name", conn) 
display(tmp.shape)
tmp.head()

(9, 2)

Unnamed: 0,metric_name_id,metric_name
0,0,like_per_month
1,1,newfriend_per_month
2,2,post_per_month
3,3,adview_per_month
4,4,dislike_per_month


## Insert Queries

In [9]:
pprint(chap3["defaults"])

{'%event2measure': 'like',
 '%field2sum': '0',
 '%from_yyyy-mm-dd': '2020-02-02',
 '%to_yyyy-mm-dd': '2020-05-10',
 'mode': 'top',
 'type': 'sql'}


### Listing 3: metric table

In [10]:
pprint(chap3["list3"])

{'name': 'count_metric_insert',
 'params': {'%new_metric_id': 0, 'mode': 'run'},
 'v1': {'%event2measure': 'like'},
 'v2': {'%event2measure': 'newfriend', '%new_metric_id': 1},
 'v3': {'%event2measure': 'post', '%new_metric_id': 2},
 'v4': {'%event2measure': 'adview', '%new_metric_id': 3},
 'v5': {'%event2measure': 'dislike', '%new_metric_id': 4},
 'v6': {'%event2measure': 'unfriend', '%new_metric_id': 5},
 'v7': {'%event2measure': 'message', '%new_metric_id': 6},
 'v8': {'%event2measure': 'reply', '%new_metric_id': 7}}


In [11]:
from_yyyy_mm_dd = chap3["defaults"]["%from_yyyy-mm-dd"]
to_yyyy_mm_dd = chap3["defaults"]["%to_yyyy-mm-dd"]
new_metric_id = chap3["list3"]["params"]["%new_metric_id"]
event2measure = chap3["list3"]["v1"]["%event2measure"]

from_yyyy_mm_dd, to_yyyy_mm_dd, new_metric_id, event2measure

('2020-02-02', '2020-05-10', 0, 'like')

In [12]:
metric_ids = [chap3["list3"]["params"]["%new_metric_id"], *[chap3["list3"][f"v{i}"]["%new_metric_id"] for i in range(2, 9)]]
event_names = [chap3["list3"][f"v{i}"]["%event2measure"] for i in range(1, 9)]

(zipped := list(zip(metric_ids, event_names)))

[(0, 'like'),
 (1, 'newfriend'),
 (2, 'post'),
 (3, 'adview'),
 (4, 'dislike'),
 (5, 'unfriend'),
 (6, 'message'),
 (7, 'reply')]

In [13]:
%%time
# listing_3_3_count_metric_insert.sql

query = f"""\
WITH date_vals AS (
  SELECT 
      i::timestamp AS metric_date 
    FROM 
      generate_series('{from_yyyy_mm_dd}', '{to_yyyy_mm_dd}', '7 day'::interval) AS i
)
-- INSERT INTO metric (account_id, metric_time, metric_name_id, metric_value)

    SELECT 
        account_id, 
        metric_date AS metric_time, 
        {new_metric_id} AS metric_name_id,  
        count(*) AS metric_value
      FROM event
INNER JOIN date_vals AS date
        ON event.event_time < date.metric_date 
       AND event.event_time >= (date.metric_date - interval '28 day')
INNER JOIN event_type AS type 
        ON type.event_type_id = event.event_type_id
    WHERE type.event_type_name = '{event2measure}'
 GROUP BY event.account_id, date.metric_date

-- ON CONFLICT DO NOTHING;
"""
res = pd.read_sql_query(query, conn)
res.head()

CPU times: user 805 ms, sys: 64.2 ms, total: 869 ms
Wall time: 19 s


Unnamed: 0,account_id,metric_time,metric_name_id,metric_value
0,1,2020-02-02,0,1
1,1,2020-02-09,0,1
2,1,2020-02-16,0,1
3,1,2020-02-23,0,2
4,1,2020-03-01,0,1


#### insert execution

In [16]:
def insert_metric_query(metric_id, event_name, from_yyyy_mm_dd, to_yyyy_mm_dd):
    
    query = f"""\
    WITH date_vals AS (
      SELECT 
          i::timestamp AS metric_date 
        FROM 
          generate_series('{from_yyyy_mm_dd}', '{to_yyyy_mm_dd}', '7 day'::interval) AS i
    )
    
    INSERT INTO metric (account_id, metric_time, metric_name_id, metric_value)

        SELECT 
            account_id, 
            metric_date AS metric_time, 
            {metric_id} AS metric_name_id,  
            count(*) AS metric_value
          FROM event
    INNER JOIN date_vals AS date
            ON event.event_time < date.metric_date 
           AND event.event_time >= (date.metric_date - interval '28 day')
    INNER JOIN event_type AS type 
            ON type.event_type_id = event.event_type_id
        WHERE type.event_type_name = '{event_name}'
     GROUP BY event.account_id, date.metric_date

    ON CONFLICT DO NOTHING;
    """
    return query

In [25]:
# with engine.begin() as connection:
#     result = connection.execute("DELETE FROM metric")
# result

In [26]:
%%time
 
tmp = pd.read_sql_query("SELECT * FROM metric", conn) 
assert tmp.shape[0] == 0, "Table 'metric' already populated with data"

results = []
for mid, name in zipped: 
    m_query = insert_metric_query(mid, name, from_yyyy_mm_dd, to_yyyy_mm_dd)
    
    with engine.begin() as connection:
        result = connection.execute(m_query)
    results.append(result)
    
results

CPU times: user 35.1 ms, sys: 11.9 ms, total: 47.1 ms
Wall time: 2min 10s


[<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55fdf8370>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55fdf8220>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55d20b250>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55a219e50>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55a219a90>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55a219fd0>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55a219a30>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55a219280>]

In [27]:
tmp = pd.read_sql_query("SELECT metric_name_id, COUNT(*) FROM metric GROUP BY metric_name_id", conn) 
tmp

Unnamed: 0,metric_name_id,count
0,0,163450
1,1,144501
2,2,160936
3,3,160510
4,4,154133
5,5,39979
6,6,155804
7,7,124663


### Listing 4: metric_name table

In [12]:
pprint(chap3["list4"])

{'name': 'metric_name_insert',
 'params': {'%new_metric_id': 0,
            '%new_metric_name': 'like_per_month',
            'mode': 'run'},
 'v1': {},
 'v11': {'%new_metric_id': 8, '%new_metric_name': 'account_tenure'},
 'v2': {'%new_metric_id': 1, '%new_metric_name': 'newfriend_per_month'},
 'v3': {'%new_metric_id': 2, '%new_metric_name': 'post_per_month'},
 'v4': {'%new_metric_id': 3, '%new_metric_name': 'adview_per_month'},
 'v5': {'%new_metric_id': 4, '%new_metric_name': 'dislike_per_month'},
 'v6': {'%new_metric_id': 5, '%new_metric_name': 'unfriend_per_month'},
 'v7': {'%new_metric_id': 6, '%new_metric_name': 'message_per_month'},
 'v8': {'%new_metric_id': 7, '%new_metric_name': 'reply_per_month'}}


In [25]:
new_metric_id = chap3["list4"]["params"]["%new_metric_id"]
new_metric_name = chap3["list4"]["params"]["%new_metric_name"]

new_metric_id, new_metric_name

(0, 'like')

In [43]:
metric_ids = [
    chap3["list4"]["params"]["%new_metric_id"], 
    *[chap3["list4"][f"v{i}"]["%new_metric_id"] for i in list(range(2, 9))+[11]]
]
metric_names = [
    chap3["list4"]["params"]["%new_metric_name"], 
    *[chap3["list4"][f"v{i}"]["%new_metric_name"] for i in list(range(2, 9))+[11]]
]

(zipped := list(zip(metric_ids, metric_names)))

[(0, 'like_per_month'),
 (1, 'newfriend_per_month'),
 (2, 'post_per_month'),
 (3, 'adview_per_month'),
 (4, 'dislike_per_month'),
 (5, 'unfriend_per_month'),
 (6, 'message_per_month'),
 (7, 'reply_per_month'),
 (8, 'account_tenure')]

In [44]:
# listing_3_4_metric_name_insert.sql

query = f"""\
INSERT INTO metric_name VALUES ({new_metric_id},'{new_metric_name}')
ON CONFLICT DO NOTHING;
"""

#### insert execution

In [45]:
def insert_metric_name_query(metric_id, metric_name):
     
    query = f"""\
    INSERT INTO metric_name VALUES ({metric_id},'{metric_name}')
    ON CONFLICT DO NOTHING;
    """
    return query


In [46]:
%%time
 
tmp = pd.read_sql_query("SELECT * FROM metric_name", conn) 
assert tmp.shape[0] == 0, "Table 'metric_name' already populated with data"

results = []
for mid, mname in zipped: 
    m_query = insert_metric_name_query(mid, mname)
    
    with engine.begin() as connection:
        result = connection.execute(m_query)
    results.append(result)
    
results

CPU times: user 12.5 ms, sys: 425 µs, total: 12.9 ms
Wall time: 21.4 ms


[<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f326c371100>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f3225abe940>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f32250a0160>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f32250acf70>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f322586a880>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f322586a640>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f322586a190>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f322586a070>,
 <sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f322586adc0>]

### Listing 13: account_tenure metric

In [47]:
pprint(chap3["list13"])

{'name': 'account_tenure_insert',
 'params': {'%new_metric_id': 8, 'mode': 'run'}}


In [48]:
from_yyyy_mm_dd = chap3["defaults"]["%from_yyyy-mm-dd"]
to_yyyy_mm_dd = chap3["defaults"]["%to_yyyy-mm-dd"]
new_metric_id = chap3["list13"]["params"]["%new_metric_id"]

from_yyyy_mm_dd, to_yyyy_mm_dd, new_metric_id

('2020-02-02', '2020-05-10', 8)

In [49]:
%%time
# listing_3_13_account_tenure_insert.sql

query = f"""\

with RECURSIVE date_vals AS (    
    SELECT 
        i::timestamp AS metric_date     
      FROM 
        generate_series('{from_yyyy_mm_dd}', '{to_yyyy_mm_dd}', '7 day'::interval) AS i
),
earlier_starts AS (
    SELECT 
        account_id, 
        metric_date, 
        MIN(start_date) AS start_date
      FROM subscription 
INNER JOIN date_vals
        ON start_date <= metric_date    
       AND (end_date > metric_date OR end_date IS null)
  GROUP BY account_id, metric_date

    UNION

    SELECT 
        sub.account_id, 
        metric_date, 
        sub.start_date    
      FROM 
        subscription AS sub 
INNER JOIN 
        earlier_starts AS early -- self recursive join
        ON sub.account_id = early.account_id
       AND sub.start_date < early.start_date
       AND sub.end_date >= (early.start_date - 31)
)

-- INSERT INTO metric (account_id,metric_time,metric_name_id, metric_value)

    SELECT 
        account_id,
        metric_date AS metric_time, 
        {new_metric_id} AS metric_name_id, 
        EXTRACT(days FROM metric_date-MIN(start_date)) AS metric_value
      FROM 
        earlier_starts
  GROUP BY account_id, metric_date
  ORDER BY account_id, metric_date

-- ON CONFLICT DO NOTHING;
"""

res = pd.read_sql_query(query, conn)
res.head()

CPU times: user 750 ms, sys: 43.3 ms, total: 794 ms
Wall time: 3.34 s


Unnamed: 0,account_id,metric_time,metric_name_id,metric_value
0,1,2020-02-02,8,5.0
1,1,2020-02-09,8,12.0
2,1,2020-02-16,8,19.0
3,1,2020-02-23,8,26.0
4,1,2020-03-01,8,33.0


#### insert execution

In [30]:
from_yyyy_mm_dd = chap3["defaults"]["%from_yyyy-mm-dd"]
to_yyyy_mm_dd = chap3["defaults"]["%to_yyyy-mm-dd"]
new_metric_id = chap3["list13"]["params"]["%new_metric_id"]

from_yyyy_mm_dd, to_yyyy_mm_dd, new_metric_id

('2020-02-02', '2020-05-10', 8)

In [31]:
query = f"""\

with RECURSIVE date_vals AS (    
    SELECT 
        i::timestamp AS metric_date     
      FROM 
        generate_series('{from_yyyy_mm_dd}', '{to_yyyy_mm_dd}', '7 day'::interval) AS i
),
earlier_starts AS (
    SELECT 
        account_id, 
        metric_date, 
        MIN(start_date) AS start_date
      FROM subscription 
INNER JOIN date_vals
        ON start_date <= metric_date    
       AND (end_date > metric_date OR end_date IS null)
  GROUP BY account_id, metric_date

    UNION

    SELECT 
        sub.account_id, 
        metric_date, 
        sub.start_date    
      FROM 
        subscription AS sub 
INNER JOIN 
        earlier_starts AS early -- self recursive join
        ON sub.account_id = early.account_id
       AND sub.start_date < early.start_date
       AND sub.end_date >= (early.start_date - 31)
)

INSERT INTO metric (account_id,metric_time,metric_name_id, metric_value)

    SELECT 
        account_id,
        metric_date AS metric_time, 
        {new_metric_id} AS metric_name_id, 
        EXTRACT(days FROM metric_date-MIN(start_date)) AS metric_value
      FROM 
        earlier_starts
  GROUP BY account_id, metric_date
  ORDER BY account_id, metric_date

ON CONFLICT DO NOTHING;
"""

with engine.begin() as connection:
    result = connection.execute(query)
result

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fc55d1d92b0>

In [32]:
tmp = pd.read_sql_query("SELECT metric_name_id, COUNT(*) FROM metric GROUP BY metric_name_id", conn) 
tmp

Unnamed: 0,metric_name_id,count
0,0,163450
1,1,144501
2,2,160936
3,3,160510
4,4,154133
5,5,39979
6,6,155804
7,7,124663
8,8,158237
