In [35]:
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime
from src.common.cloud_storage_connector import CloudStorage
from src.common.bigquery_connector import BigQueryManager
from src.config import settings
import json

In [60]:
import aiofiles as aio
aio.__version__ 


AttributeError: module 'aiofiles' has no attribute '__version__'

In [22]:
test_data = {
  "access_token": None,
  "client_id": "4959083987776428",
  "client_secret": "Hw9wWSydd8PMvMEJewWoMvKGYMAWyKEw",
  "seller_id": 189643563,
  "store_name": "hubsmarthome"
}

In [43]:
async def process_date(date, storage, bucket_name, blob_shipping_cost):
    try:
        print(f'Processing date: {date}')
        blob_prefix = blob_shipping_cost + f'date={date}/'

        # Use run_in_executor to call synchronous methods without blocking
        loop = asyncio.get_event_loop()
        blobs = await loop.run_in_executor(None, storage.list_blobs, bucket_name, blob_prefix)

        df_processed_data = pd.DataFrame()

        for blob in blobs:
            print(f"Reading file: {blob.name}")
            content = await loop.run_in_executor(None, storage.download_json, bucket_name, blob.name)

            for json_content in content:
                df_ = await process_orders(json_content['results'])
                df_processed_data = pd.concat([df_processed_data, df_], ignore_index=True)

        if not df_processed_data.empty:
            df_processed_data['processed_json'] = pd.to_datetime(date)
            df_processed_data['process_time'] = datetime.now()

        print(f'*** Finished treating all data for date {date}. {df_processed_data.shape[0]} sales ***')

        return df_processed_data
    except Exception as e:
        print(f'Error processing date {date}: {e}')
        return pd.DataFrame()

def process_orders(json_data):
    try:
        df_ = pd.DataFrame()

        for sale in json_data:
            structured_sale = {
                # Your structured_sale dictionary here
            }

            # Create a new DataFrame for the structured sale
            df_sale = pd.DataFrame([structured_sale])

            # Concatenate only if df_ is not empty
            if df_.empty:
                df_ = df_sale  # Assign the first entry directly if df_ is empty
            else:
                df_ = pd.concat([df_, df_sale], ignore_index=True)

        return df_
    except Exception as e:
        print(f'Error processing json: {e}')
        return pd.DataFrame()



In [24]:
data = test_data
store_name = data.get('store_name')
seller_id = data.get('seller_id')

print('** Connecting to storage and BigQuery... **')
# Initialize storage and BigQuery
storage = CloudStorage(credentials_path=settings.PATH_SERVICE_ACCOUNT)
bigquery = BigQueryManager(credentials_path=settings.PATH_SERVICE_ACCOUNT)
# Define paths and table names from the config
bucket_name = settings.BUCKET_STORES
table_management = settings.TABLE_MANAGEMENT
destiny_table = settings.TABLE_ORDERS
blob_shipping_cost = settings.BLOB_ORDERS(store_name)
# Get dates to treat
list_dates_to_process = bigquery.get_list_dates_to_process(seller_id, table_management, destiny_table)
list_dates_to_process = [date.strftime('%Y-%m-%d') for date in list_dates_to_process]
print(f'*** Starting to process dates: {len(list_dates_to_process)} dates to process  ***')
# Use asyncio.gather to process dates asynchronously
tasks = [process_date(date, storage, bucket_name, blob_shipping_cost) for date in list_dates_to_process]
results = await asyncio.gather(*tasks)
# Combine all DataFrames
df_all_processed_data = pd.concat(results, ignore_index=True)
print(f'*** Finished processing all dates. Total sales: {df_all_processed_data.shape[0]} ***')

In [47]:
date = list_dates_to_process[3]
print(f'Processing date: {date}')
blob_prefix = blob_shipping_cost + f'date={date}/'
# Use run_in_executor to call synchronous methods without blocking
loop = asyncio.get_event_loop()
blobs = await loop.run_in_executor(None, storage.list_blobs, bucket_name, blob_prefix)
df_processed_data = pd.DataFrame()
for blob in blobs:
    print(f"Reading file: {blob.name}")
    content = await loop.run_in_executor(None, storage.download_json, bucket_name, blob.name)
    for json_content in content:
        df_ = process_orders(json_content['results'])
        df_processed_data = pd.concat([df_processed_data, df_], ignore_index=True)
if not df_processed_data.empty:
    df_processed_data['processed_json'] = pd.to_datetime(date)
    df_processed_data['process_time'] = datetime.now()
print(f'*** Finished treating all data for date {date}. {df_processed_data.shape[0]} sales ***')

df_processed_data

Processing date: 2024-04-27
Reading file: hubsmarthome/meli/api_response/orders/date=2024-04-27/total_sales=384__data=2024-04-27__processing-time=2024-09-17T17:42:42.561756-03:00.json
*** Finished treating all data for date 2024-04-27. 8 sales ***


0
1
2
3
4
5
6
7


In [52]:
df_ = pd.DataFrame()
for sale in json_content:
    structured_sale = {
        # Your structured_sale dictionary here
    }
    # Create a new DataFrame for the structured sale
    df_sale = pd.DataFrame([structured_sale])
    # Concatenate only if df_ is not empty
    if df_.empty:
        df_ = df_sale  # Assign the first entry directly if df_ is empty
    else:
        df_ = pd.concat([df_, df_sale], ignore_index=True)

structured_sale

{}

In [44]:
process_orders(json_content['results'])

0


# Fetch historical orders

In [26]:
import aiohttp
import asyncio
from datetime import datetime, timedelta
import pandas as pd
import json
import aiofiles
from concurrent.futures import ThreadPoolExecutor
from flask import escape
import requests
import gc  # Garbage collection to free memory
from src.common.cloud_storage_connector import CloudStorage
from src.common.bigquery_connector import BigQueryManager
from src.common.utils import log_process, authenticate
from src.config import settings

In [27]:
timezone_offset = "-03:00"
limit = 50

def fetch_data(date, offset, access_token, seller_id):
    formatted_date_start = date.strftime(f"%Y-%m-%dT00:00:00.000{timezone_offset}")
    formatted_date_end = (date + timedelta(days=1)).strftime(f"%Y-%m-%dT00:00:00.000{timezone_offset}")
    params = {
        'limit': limit,
        'offset': offset,
        'order.date_created.from': formatted_date_start,
        'order.date_created.to': formatted_date_end
    }
    headers = {
        'Authorization': f'Bearer {access_token}'
    }

    url = settings.URL_ORDERS(seller_id)

    response = requests.get(url, params=params, headers=headers) 
    print(response.json)
    return response

In [28]:
data = test_data
client_id = data.get('client_id')
client_secret = data.get('client_secret')
store_name = data.get('store_name')
seller_id = data.get('seller_id')
access_token = data.get('access_token')
print('** Defining authentication... **')
# Authenticate (assuming this is now centralized in utils.py or a similar file)
if not access_token:
    access_token = authenticate(client_id, client_secret)  # You can add this to a common module
print('** Connecting to storage and BigQuery... **')
# Initialize storage and BigQuery
storage = CloudStorage(credentials_path=settings.PATH_SERVICE_ACCOUNT)
bigquery = BigQueryManager(credentials_path=settings.PATH_SERVICE_ACCOUNT)
# Define paths and table names from the config
bucket_name = settings.BUCKET_STORES
table_management = settings.TABLE_MANAGEMENT
destiny_table = settings.TABLE_ORDERS
start_date = datetime.today() - timedelta(days=365)
end_date = datetime.today() - timedelta(days=1)
blob_name = settings.BLOB_ORDERS(store_name)

In [29]:
offset = 0
responses = []
sales = []
date = end_date
# while True:
response = fetch_data(date, offset, access_token, seller_id)

response.json()

In [30]:
response

In [55]:
# Provided dictionary keys from the code
code_keys = [
    'reason', 'status_code', 'total_paid_amount', 'operation_type', 'transaction_amount', 
    'transaction_amount_refunded', 'date_approved', 'collector_id', 'coupon_id', 'installments',
    'authorization_code', 'taxes_amount', 'payment_id', 'date_last_modified', 'coupon_amount', 
    'payment_shipping_cost', 'installment_amount', 'activation_uri', 'overpaid_amount', 'card_id', 
    'payment_status_detail', 'issuer_id', 'payment_method_id', 'payment_type', 'deferred_period', 
    'atm_transfer_reference_transaction_id', 'atm_transfer_reference_company_id', 'site_id', 
    'payer_id', 'order_id', 'currency_id', 'payment_status', 'shipping_id', 'fulfilled', 'seller_id', 
    'buyer_id', 'item_id', 'item_title', 'item_category_id', 'item_variation_id', 'seller_custom_field', 
    'global_price', 'net_weight', 'warranty', 'condition', 'seller_sku', 'quantity', 'unit_price', 
    'full_unit_price', 'manufacturing_days', 'requested_quantity_measure', 'requested_quantity_value', 
    'sale_fee', 'listing_type_id', 'base_exchange_rate', 'base_currency_id', 'bundle', 'element_id', 
    'date_created', 'date_closed', 'status', 'expiration_date', 'date_last_updated', 'last_updated', 
    'comment', 'pack_id', 'coupon_amount', 'coupon_id', 'shipping_cost', 'pickup_id', 'status_detail', 
    'total_amount', 'paid_amount', 'context_application', 'context_product_id', 'context_channel', 
    'context_site'
]

# Keys from the schema
schema_keys = [
    'reason', 'status_code', 'total_paid_amount', 'operation_type', 'transaction_amount',
    'transaction_amount_refunded', 'date_approved', 'collector_id', 'coupon_id', 'installments',
    'authorization_code', 'taxes_amount', 'payment_id', 'date_last_modified', 'coupon_amount', 
    'shipping_cost', 'installment_amount', 'activation_uri', 'overpaid_amount', 'card_id', 
    'status_detail', 'issuer_id', 'payment_method_id', 'payment_type', 'deferred_period', 
    'atm_transfer_reference_transaction_id', 'atm_transfer_reference_company_id', 'site_id', 
    'payer_id', 'order_id', 'currency_id', 'payment_status', 'shipping_id', 'fulfilled', 'seller_id', 
    'buyer_id', 'item_id', 'item_title', 'item_category_id', 'item_variation_id', 'seller_custom_field', 
    'global_price', 'net_weight', 'warranty', 'condition', 'seller_sku', 'quantity', 'unit_price', 
    'full_unit_price', 'manufacturing_days', 'requested_quantity_measure', 'requested_quantity_value', 
    'sale_fee', 'listing_type_id', 'base_exchange_rate', 'base_currency_id', 'bundle', 'element_id', 
    'date_created', 'date_closed', 'status', 'expiration_date', 'date_last_updated', 'last_updated', 
    'comment', 'pack_id', 'pickup_id', 'total_amount', 'paid_amount', 'context_application', 
    'context_product_id', 'context_channel', 'context_site', 'processed_json', 'process_time'
]

# Find keys that are in the dictionary but not in the schema
extra_keys = set(code_keys) - set(schema_keys)
extra_keys = set(schema_keys) - set(code_keys)  

extra_keys

{'process_time', 'processed_json'}

In [58]:
extra_keys_ = set(code_keys) - set(schema_keys)
extra_keys = set(schema_keys) - set(code_keys)  
extra_keys

{'process_time', 'processed_json'}

In [59]:
extra_keys_

{'payment_shipping_cost', 'payment_status_detail'}