In [8]:
from pyathena import connect
from pyathena.util import as_pandas
import s3fs
import pandas as pd
import sqlalchemy

In [13]:
# Get gtins from product type
def get_gtins(product_group):
    """
    Queries Athena
    """
    engine = sqlalchemy.create_engine('')

    sql = """
select
    cp.odin,
    cp.product_type,
    cp.product_type_group,
    pi.identifier_type,
    pi."value" 
from catalogs_api.products_catalogproduct as cp 
join catalogs_api.products_identifier as pi
on pi.catalog_product_id = cp.id 
where product_type_group = '{}'
""".format(product_group)

    df = pd.read_sql(sqlalchemy.text(sql), con=engine)


    return df

In [40]:
def get_competition(product_group,gtins):
    
    fs = s3fs.S3FileSystem(anon=False, key='', secret='')
    
    engine = sqlalchemy.create_engine('')
    
    sql = """
select
    ph.updated_at,
    ph.gtin,
    ph.value,
    po.slug
from
    products_api.price_analytics_origin as po
join
    products_api.price_analytics_pricinghistory as ph
    on ph.origin_id = po.id
where
    po.slug like '%b2w%'
    and ph.active = true
    and gtin in {}""".format(gtins)
    
    df = pd.read_sql(sqlalchemy.text(sql), con=engine)
    
    with fs.open('s3://bsa-correlation-one/{}/competition_{}.csv'.format(product_group,product_group), 'w') as f:
        df.to_csv(f)
    
    return df
    
    

In [7]:
def get_history(gtins):
    """
    Queries Athena and gets daily GMV
    """
    cursor = connect(aws_access_key_id='',
                     aws_secret_access_key='',
                     s3_staging_dir='',
                     region_name='').cursor()

    cursor.execute("""
SELECT availability_days, brand, branded_store_slug, canonical_sku, catalog_feed_date, catalog_feed_id, category, category_info, channel_slug, commission_plan, created_at, currency, description, external_id, group_id, gtin, id, name, offer, offer_discount, parent_id, part_number, partition_0, price, price_freight_shift, reject_reason, seller_product_sku, sent_error_reason, status, stock, updated_at
FROM "olist-datalake-athena".channels_api_products_channelproducthistory
where gtin in {}
UNION
SELECT availability_days, brand, branded_store_slug, canonical_sku, catalog_feed_date, catalog_feed_id, category, category_info, channel_slug, commission_plan, created_at, currency, description, external_id, group_id, gtin, id, name, offer, offer_discount, parent_id, part_number, partition_0, price, price_freight_shift, reject_reason, seller_product_sku, sent_error_reason, status, stock, updated_at
FROM "olist-datalake-athena".channels_api_products_channelproducthistory_2019
where gtin in {}
UNION
SELECT availability_days, brand, branded_store_slug, canonical_sku, catalog_feed_date, catalog_feed_id, category, category_info, channel_slug, commission_plan, created_at, currency, description, external_id, group_id, gtin, id, name, offer, offer_discount, parent_id, part_number, partition_0, price, price_freight_shift, reject_reason, seller_product_sku, sent_error_reason, status, stock, updated_at
FROM "olist-datalake-athena".channels_api_products_channelproducthistory_2020
where gtin in {}
""".format(gtins,gtins,gtins))

    df = as_pandas(cursor)


    return df

In [8]:
def get_orders(gtins):
    """
    """
    cursor = connect(aws_access_key_id='',
                     aws_secret_access_key='',
                     s3_staging_dir='',
                     region_name='').cursor()

    cursor.execute("""
SELECT
*
FROM "olist-dw-athena".orders
WHERE
    seller_item_gtin IN {};
""".format(gtins))

    df = as_pandas(cursor)


    return df
    

In [40]:
def batch_execution(product_type, gtins):
    fs = s3fs.S3FileSystem(anon=False)
    # history
    print('EXECUTING HISTORY: {}'.format(product_type))
    df = get_history(gtins)
    print('SAVING HISTORY')

    with fs.open('s3://bsa-correlation-one/electronics/electronics_{}_history.csv'.format(product_type), 'w') as f:
        df.to_csv(f)
    
    print('EXECUTING ORDERS: {}'.format(product_type))
    orders = get_orders(gtins)
    to_drop = ['customer_id', 'payer_id', 'shipment_id',
       'shipping_id', 'olist_shipment_id',
       'olist_shipment_tracking_protocol', 'olist_shipment_tracking_url', 'olist_shipping_id',
       'olist_shipping_shipping_estimate_id',
       'olist_shipping_shipping_method_id',
       'olist_shipping_shipping_method_name',
        'olist_payer_id',
       'olist_payer_name', 'olist_payer_document_number', 'olist_payer_email',
       'olist_payer_birth_date', 'olist_payer_address_id', 'olist_customer_id',
       'olist_customer_name', 'olist_customer_document_number',
       'olist_customer_email', 'olist_customer_address_id', 'olist_order_raw_order']
    
    hashed = orders.drop(columns=to_drop)
    print('SAVING {} ORDERS'.format(hashed.shape[0]))

    with fs.open('s3://bsa-correlation-one/electronics/electronics_{}_orders.csv'.format(product_type), 'w') as f:
        hashed.to_csv(f)
    print('FINISHED {}'.format(product_type))
    print('')
    

In [None]:
### FULL EXECUTION

### Get GTINS
products = get_gtins('eletronicos')

gtins = tuple(products.value)

### GET Competition prices

product_group = 'electronics'
competition = get_competition(product_group, gtins)

types = products.product_type.unique().tolist()

for i, product_type in enumerate(types):
    current = products[products.product_type == product_type]
    gtins = tuple(current.value)
    print('Executing... {}/{}'.format(i, len(types[1:])))
    print('Total gtins: ', len(gtins))
    try:
        batch_execution(product_type, gtins)
    except Exception as e:
        print('Failed {}'.format(product_type))
        print(e)
        print()