In [1]:
import pandas as pd
import sqlalchemy
import pgspecial

import itertools
import datetime
from dateutil.rrule import rrule, MONTHLY

In [2]:
%load_ext sql

In [3]:
db_connect = "benj@localhost/olist"
connectStr =  f"postgresql://{db_connect}"
%sql $connectStr

'Connected: benj@olist'

# Exploration db

Nombre de clients dans la base

In [4]:
%%sql
select count(distinct customer_unique_id) from customers

 * postgresql://benj@localhost/olist
1 rows affected.


count
96096


In [5]:
nb_orders = %sql select count(*) from orders;
nb_t_id = %sql select count(*) from ( select distinct order_id,customer_id  from orders) t; 
print(f'{nb_orders} {nb_t_id}')

 * postgresql://benj@localhost/olist
1 rows affected.
 * postgresql://benj@localhost/olist
1 rows affected.
+-------+
| count |
+-------+
| 99441 |
+-------+ +-------+
| count |
+-------+
| 99441 |
+-------+


Status des commandes

In [6]:
%sql select distinct order_status,count(*) from orders group by order_status

 * postgresql://benj@localhost/olist
8 rows affected.


order_status,count
approved,2
canceled,625
created,5
delivered,96478
invoiced,314
processing,301
shipped,1107
unavailable,609


Toutes les commandes ont au moins un paiement associé ?

In [7]:
%%sql 
select distinct order_status,count(*) from orders o where
not exists (select 1 from order_payments pa where pa.order_id = o.order_id)
group by order_status

 * postgresql://benj@localhost/olist
1 rows affected.


order_status,count
delivered,1


Les différrents types de paiement

In [8]:
%%sql
select distinct payment_type,count(*) from order_payments
group by payment_type 

 * postgresql://benj@localhost/olist
5 rows affected.


payment_type,count
boleto,19784
credit_card,76795
debit_card,1529
not_defined,3
voucher,5775


Payment en plusieurs versements = payment_installments

Les paiments avec des bons et en liquide se font en une fois.

In [9]:
%%sql
select distinct payment_installments,payment_type,count(*) from order_payments
where payment_type in ('voucher','boleto')
group by payment_installments,payment_type

 * postgresql://benj@localhost/olist
2 rows affected.


payment_installments,payment_type,count
1,voucher,5775
1,boleto,19784


Les paiements d'une commande peuvent se faire avec plusieurs modes de paiement; chaque type de paiement se voit attribué un numéro de séquence

In [10]:
%%sql
select * from order_payments 
where order_id = (select distinct order_id from order_payments where payment_sequential = (select max(payment_sequential) from order_payments LIMIT 1) LIMIT 1)
order by payment_sequential

 * postgresql://benj@localhost/olist
29 rows affected.


order_id,payment_sequential,payment_type,payment_installments,payment_value,id
fa65dad1b0e818e3ccc5cb0e39231352,1,voucher,1,3.71,14322
fa65dad1b0e818e3ccc5cb0e39231352,2,voucher,1,8.51,23075
fa65dad1b0e818e3ccc5cb0e39231352,3,voucher,1,2.95,65642
fa65dad1b0e818e3ccc5cb0e39231352,4,voucher,1,29.16,9986
fa65dad1b0e818e3ccc5cb0e39231352,5,voucher,1,0.66,28331
fa65dad1b0e818e3ccc5cb0e39231352,6,voucher,1,5.02,29649
fa65dad1b0e818e3ccc5cb0e39231352,7,voucher,1,0.32,82594
fa65dad1b0e818e3ccc5cb0e39231352,8,voucher,1,26.02,68854
fa65dad1b0e818e3ccc5cb0e39231352,9,voucher,1,1.08,17275
fa65dad1b0e818e3ccc5cb0e39231352,10,voucher,1,12.86,19566


Nombre de paiements par voucher

In [11]:
%%sql
select count(*) from customers c
join orders o on c.customer_id = o.customer_id
join order_payments pa on pa.order_id = o.order_id
where pa.payment_type = 'voucher'

 * postgresql://benj@localhost/olist
1 rows affected.


count
5775


Nombre de commandes sans commentaires

In [12]:
%%sql
select count(distinct order_id) from orders o
where not exists(select 1 from order_reviews r where r.order_id = o.order_id)

 * postgresql://benj@localhost/olist
1 rows affected.


count
768


In [13]:
order_dates = %sql select date_trunc('day',min(order_delivered_customer_date)),date_trunc('day',max(order_delivered_customer_date)) + INTERVAL '1 day' from ordersa
print(f'order_dates: \n{order_dates}')
review_dates = %sql select min(review_creation_date),max(review_creation_date) from order_reviews
print(f'review_dates: \n{review_dates}')

 * postgresql://benj@localhost/olist
(psycopg2.errors.UndefinedTable) relation "ordersa" does not exist
LINE 1: ...er_delivered_customer_date)) + INTERVAL '1 day' from ordersa
                                                                ^

[SQL: select date_trunc('day',min(order_delivered_customer_date)),date_trunc('day',max(order_delivered_customer_date)) + INTERVAL '1 day' from ordersa]
(Background on this error at: https://sqlalche.me/e/14/f405)
order_dates: 
None
 * postgresql://benj@localhost/olist
1 rows affected.
review_dates: 
+---------------------+---------------------+
|         min         |         max         |
+---------------------+---------------------+
| 2016-10-02 00:00:00 | 2018-08-31 00:00:00 |
+---------------------+---------------------+


Nombre de dates de livraison en foncttion du statut de la commande

In [14]:
%%sql 
select count( distinct order_delivered_customer_date),order_status,count(*) from orders group by order_status


 * postgresql://benj@localhost/olist
8 rows affected.


count,order_status,count_1
0,approved,2
6,canceled,625
0,created,5
95658,delivered,96478
0,invoiced,314
0,processing,301
0,shipped,1107
0,unavailable,609


Ordres annulés

In [15]:
%%sql
select distinct order_purchase_timestamp,order_approved_at,order_delivered_customer_date from orders where order_status = 'canceled'

 * postgresql://benj@localhost/olist
625 rows affected.


order_purchase_timestamp,order_approved_at,order_delivered_customer_date
2016-09-05 00:15:34,2016-10-07 13:17:15,
2016-09-13 15:24:19,2016-10-07 13:16:46,
2016-10-02 22:07:52,2016-10-06 15:50:56,
2016-10-03 21:01:41,2016-10-04 10:18:57,2016-11-08 10:58:34
2016-10-04 10:05:45,2016-10-04 10:26:40,
2016-10-04 11:44:01,2016-10-05 08:45:09,
2016-10-04 14:51:15,2016-10-06 15:56:40,
2016-10-04 19:41:32,,
2016-10-04 20:41:45,2016-10-06 15:57:38,
2016-10-05 11:23:13,,


In [16]:
#def payment_types(main_table,agg_table):
#    """
#    """
#
#    req = """
#        select c.customer_unique_id,pa.payment_type,count(*) 
#        from customers c
#        join orders o  on c.customer_id = o.customer_id
#        join order_payments pa on pa.order_id = o.order_id
#        group by c.customer_unique_id,pa.payment_type
#        order by c.customer_unique_id    
#    """
#
#    agg = f"""{agg_table} as ({req})"""
#
#    return (agg,
#            list(
#            map( 
#                lambda payment_type : (
#                    f"select {agg_table}.count from {agg_table} where {agg_table}.payment_type = '{payment_type}' and {agg_table}.customer_unique_id = {main_table}.customer_unique_id",
#                    f'nb_{payment_type}'
#                ),
#                selected_payment_types()
#            )
#            )
#    )

#def payment_type_max_amount(main_table,agg_table):
#
#    req = """
#        select c.customer_unique_id,pa.payment_type,max(payment_value) 
#        from customers c
#        join orders o  on c.customer_id = o.customer_id
#        join order_payments pa on pa.order_id = o.order_id
#        group by c.customer_unique_id,pa.payment_type
#        order by c.customer_unique_id
#    """
#
#    
#
#    return (agg,
#            list(
#            map( 
#                lambda payment_type : (
#                    f"select {agg_table}.count from {agg_table} where {agg_table}.payment_type = '{payment_type}' and {agg_table}.customer_unique_id = {main_table}.customer_unique_id",
#                    f'nb_{payment_type}'
#                ),
#                selected_payment_types()
#            )
#            )
#    )

In [17]:
#payment types
def take(n,res):
    return list(map(lambda t: t[n],res))

def make_sub_reqs(agg_req,fields_reqs):
    def f (main_table,agg_table,histo_date):
        agg = f"{agg_table} as ({agg_req((histo_date,))})"
        return (
            agg,
            [ fields_req((agg_table,main_table)) for fields_req in fields_reqs]
        )

    return f

def build_req(*args,histo_date,limit=None):
    def make_args(*args,main_table):
        return list(zip(
                    args,
                    map(
                        lambda r: {'main_table':f'{main_table}','agg_table':f'agg_{str(r)}','histo_date':histo_date},
                        range(len(args))
                    )
                )
        )

    def join_sub_req(reqs):
        return ',\n'.join(map(lambda t: f"({t[0]}) as {t[1]}",reqs))
    
    def call_f(farg):
        f    = farg[0]
        args = farg[1]
        return f(**args) 

    # main request
    main_table = 'M'
    
    #generate sub queries
    fargs = make_args(*args, main_table = main_table)
    sub_reqs = list(map(lambda farg: call_f(farg),fargs))

    # Common table expressions
    ctes  =  "WITH\n" + ',\n'.join(take(0,sub_reqs)) + "\n"
    # fields
    fields   =  "\n" + join_sub_req(itertools.chain(*take(1,sub_reqs))) + "\n"
    #limits
    limits_e =  f'LIMIT {limit}' if limit is not None else ''

    return f"""
    {ctes} select {main_table}.c_customer_unique_id,{main_table}.c_customer_city,{main_table}.c_customer_state,{fields} from olist_view {main_table}
    where {main_table}.o_order_purchase_timestamp <= '{histo_date}'
    {limits_e}
    """


In [18]:
# configs = [(req_agg, [field_req(t)]]
# req_agg is an agregation request (that will be executed as a CTE)
# field_req a callable that: 
# -expects a tuple as parameter t[0]: name of the cte t[1]:name of the main table 
# -returns (req,field_name)
#  where req is a request that should join with the previous CTE
#        and  field_name is the feature name in the final Dataframe

def selected_payment_types():
    types = %sql select distinct payment_type from order_payments
    return list(filter(lambda p : p != 'not_defined',take(0,types)))

def make_payment_type(payment_type):
    return lambda t: (
        f"select {t[0]}.count from {t[0]} where {t[0]}.pa_payment_type = '{payment_type}' and {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"
        ,
        f'nb_{payment_type}'
    )

configs = [ 
    (
    lambda t:
    f"""
        select c_customer_unique_id,pa_payment_type,count(*) 
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered' 
        group by c_customer_unique_id,pa_payment_type
    """,
        [ make_payment_type(payment_type) for payment_type in selected_payment_types() ]
    ),
    (
    lambda t:    
    f"""
        select t.c_customer_unique_id,t.pa_payment_type,max(t.sum) from (
		select c_customer_unique_id,pa_payment_type,sum(pa_payment_value)
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered' 
        group by c_customer_unique_id,pa_payment_type
	  ) t group by t.c_customer_unique_id,t.pa_payment_type
    """,
        [
            lambda t: (
                f"select {t[0]}.pa_payment_type from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "payment_type_max"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,max(pa_payment_installments) as m1,max(pa_payment_sequential) as m2
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id
    """,
        [
            lambda t: (
                f"select {t[0]}.m1 from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"
                ,
                "payment_installments_max"
            ),
            lambda t: (
                f"select {t[0]}.m2 from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"
                ,
                "payment_sequential_max"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,sum(i_price+i_freight_value) as order_value
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id,i_order_id

    """,
        [
            lambda t: (
                f"select max({t[0]}.order_value) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "order_value_max"
            ),
            lambda t: (
                f"select min({t[0]}.order_value) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "order_value_min"
            ),
            lambda t: (
                f"select avg({t[0]}.order_value) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "order_value_mean"
            ),
            lambda t: (
                f"select stddev_pop({t[0]}.order_value) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "order_value_stddev"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,count(*)
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id
    """,
        [
            lambda t: (
                f"select {t[0]}.count from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"
                ,
                "nb_orders"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,count(*)
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id,i_order_id
    """,
        [
            lambda t: (
                f"select avg({t[0]}.count) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"
                ,
                "nb_items_order_avg"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,i_freight_value/i_price as r
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered' 
    """,
        [
            lambda t: (
                f"select max({t[0]}.r) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"
                ,
                "r_freight_price_max"
            )
        ]
    ),
    (
    lambda t:
    f"""
        select c_customer_unique_id,p_product_category_name,sum(i_price)
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id,p_product_category_name

    """,
        [
            lambda t: (
                f"""select {t[0]}.p_product_category_name from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id and
                {t[0]}.sum = (select max({t[0]}.sum) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1) 
                LIMIT 1
                """
                ,
                "product_cat_total_max"
            )
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,r_review_score
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered' and r_review_creation_date <= '{t[0]}' 
    """,
        [
            lambda t: (
                f"""select min({t[0]}.r_review_score) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "review_score_min"
            ),
            lambda t: (
                f"""select avg({t[0]}.r_review_score) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "review_score_avg"
            ),
            lambda t: (
                f"""select max({t[0]}.r_review_score) from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "review_score_max"
            )            
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,count(*)
        from olist_view
        where o_order_purchase_timestamp <= '{t[0]}' and o_order_status = 'cancel' 
        group by c_customer_unique_id
    """,
        [
            lambda t: (
                f"""select {t[0]}.count from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id"""
                ,
                "nb_orders_canceled"
            )      
        ]
    ), 
    (
    lambda t:    
    f"""
        select c_customer_unique_id,'{t[0]}'::date - min(o_order_purchase_timestamp)::date as R
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id
    """,
        [
            lambda t: (
                f"""select {t[0]}.R from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "nb_days_since_last_order"
            )      
        ]
    ),
    (
    lambda t:    
    f"""
        select v.c_customer_unique_id,sum(v.pa_payment_value)
        from olist_view v
        join (
            select c_customer_unique_id,max(o_order_delivered_customer_date)
            from olist_view
            where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
            group by c_customer_unique_id
        ) t on t.c_customer_unique_id = v.c_customer_unique_id
        where v.o_order_delivered_customer_date = t.max and v.o_order_status = 'delivered'    
        group by v.c_customer_unique_id
    """,
        [
            lambda t: (
                f"""select {t[0]}.sum from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "amount_last_order"
            )      
        ]
    ),
    (
    lambda t:    
    f"""
        select c_customer_unique_id,max(p_product_weight_g) as weight,max(p_product_length_cm*p_product_height_cm*p_product_width_cm) as volume
        from olist_view
        where o_order_delivered_customer_date <= '{t[0]}' and o_order_status = 'delivered'       
        group by c_customer_unique_id
    """,
        [
            lambda t: (
                f"""select {t[0]}.weight from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "weight_max"
            ),
            lambda t: (
                f"""select {t[0]}.volume from {t[0]} where {t[1]}.c_customer_unique_id = {t[0]}.c_customer_unique_id LIMIT 1"""
                ,
                "volume_max"
            )     
        ]
    ),         
]


 * postgresql://benj@localhost/olist
5 rows affected.


In [19]:
def dates_histo():
    raw_date = %sql select date_trunc('day',min(order_delivered_customer_date))+ INTERVAL '1 month',date_trunc('day',max(order_delivered_customer_date)) + INTERVAL '1 day' from orders
    for row in  raw_date:
        (min_date,max_date) = row
    return [dt for dt in rrule(MONTHLY, dtstart=min_date, until=max_date)] 

def generate_df(histo_date,limit=None):
    main_req = build_req(*list(map(lambda config:make_sub_reqs(*config), configs)),histo_date=histo_date,limit=limit)
    #print(main_req)
    r = %sql $main_req
    if r is not None:
        return r.DataFrame()

def generate_full_df(limit=None):
    d = dates_histo()[-1]
    return generate_df(d,limit=limit)

In [22]:

full_df = generate_full_df(limit=100)

 * postgresql://benj@localhost/olist
1 rows affected.
 * postgresql://benj@localhost/olist
100 rows affected.


Dataset complet