In [None]:
# -------------------------
# ----- IMPORT ALL VARIABLES -----
# -------------------------
import os
import json
import time
import requests
import pandas as pd
from dotenv import load_dotenv
from google.cloud import bigquery
import pandas_gbq
from tqdm import tqdm

import config
# Magento 2 Credentials
M2_BASE_URL = config.M2_BASE_URL            
M2_ACCESS_TOKEN = config.M2_ACCESS_TOKEN
M2_USERNAME = config.M2_USERNAME
M2_PASSWORD = config.M2_PASSWORD

# BigQuery Credentials and Table Information
BQ_PATH_KEY = config.BQ_PATH_KEY
BQ_PROJECT_ID = config.BQ_PROJECT_ID
BQ_DATASET_ID = config.BQ_DATASET_ID
BQ_TABLE_ID = config.BQ_TABLE_ID

# Date Range for Data Fetching
FROM_DATE = config.FROM_DATE
TO_DATE = config.TO_DATE

# Reset BQ Tables (True to reset data in BigQuery, False if incremental load)
RESET = config.RESET

In [None]:

# -------------------------
# ----- 1. GET NEW TOKEN -----
# -------------------------

# Function to fetch data from Magento (with OTP)
def get_magento_token():
    # Get OTP code from user
    M2_OTP_CODE = input("Enter the current 6-digit OTP code from your Google Authenticator app: ")
    
    # Prepare the payload for 2FA
    payload = {
        "username": M2_USERNAME,
        "password": M2_PASSWORD,
        "otp": M2_OTP_CODE
    }

    # Make the POST request to the 2FA authentication endpoint
    response = requests.post(f"{M2_BASE_URL}/rest/V1/tfa/provider/google/authenticate", headers={"Content-Type": "application/json"}, data=json.dumps(payload))

    # Check if the authentication is successful
    if response.status_code == 200:
        access_token = response.json()  # This will be the token you need for subsequent requests
        print("Access token received.")
        return access_token
    else:
        print("Error fetching token:", response.text)
        return None
    
    
M2_ACCESS_TOKEN = get_magento_token()


In [None]:
# -------------------------------------------
# ----- 2. FETCH ORDER AND ITEM DETAILS --------
# -------------------------------------------

# Headers for authentication
HEADERS = {
    "Authorization": f"Bearer {M2_ACCESS_TOKEN}",
    "Content-Type": "application/json"
}

def fetch_orders(from_date, to_date, page=1):
    """
    Fetches orders created between two dates.
    Uses pagination to fetch results.
    """
    url = (
        f"{M2_BASE_URL}/rest/V1/orders?"
        f"searchCriteria[filter_groups][0][filters][0][field]=created_at&"
        f"searchCriteria[filter_groups][0][filters][0][value]={from_date} 00:00:00&"
        f"searchCriteria[filter_groups][0][filters][0][condition_type]=from&"
        f"searchCriteria[filter_groups][1][filters][0][field]=created_at&"
        f"searchCriteria[filter_groups][1][filters][0][value]={to_date} 23:59:59&"
        f"searchCriteria[filter_groups][1][filters][0][condition_type]=to&"
        f"searchCriteria[pageSize]=50&"
        f"searchCriteria[currentPage]={page}"
    )

    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        return response.json()
    else:
        print("Error fetching orders:", response.text)
        return None

def format_order_data(orders_data):
    """
    Formats the retrieved order data into a structured dataframe.
    Each row corresponds to a single item in the order.
    """
    formatted_data = []

    for order in orders_data.get('items', []):
        order_id = order.get('entity_id')
        created_at = order.get('created_at')
        grand_total = order.get('grand_total')
        currency = order.get('order_currency_code')
        status = order.get('status')

        # Customer details
        customer_name = f"{order.get('customer_firstname', '')} {order.get('customer_lastname', '')}".strip()
        customer_email = order.get('customer_email')
        billing_address = order.get('billing_address', {})
        city = billing_address.get('city', '')
        country = billing_address.get('country_id', '')

        # Order details
        payment = order.get('payment', {})
        payment_method = payment.get('method', 'N/A')

        # Extract individual items and create a row for each
        items = order.get('items', [])
        for item in items:
            formatted_data.append({
                "Order_ID": order_id,
                "Date": created_at,
                "Order_Total": f"{grand_total} {currency}",
                "Order_Status": status,
                "Customer_Name": customer_name,
                "Customer_Email": customer_email,
                "City": city,
                "Country": country,
                "Payment_Method": payment_method,
                "Item_Name": item.get('name'),
                "SKU": item.get('sku'),
                "Quantity": item.get('qty_ordered'),
                "Price_per_Unit": f"{item.get('price')} {currency}",
                "Total_Item_Price": f"{item.get('row_total')} {currency}",
            })

    return pd.DataFrame(formatted_data)

def fetch_all_orders(from_date, to_date):
    """
    Fetches all orders between a given date range in an iterative way.
    It checks the last date from the fetched orders and continues until all orders are retrieved.
    """
    all_orders_data = []
    current_date = from_date
    page = 1
    while True:
        print(f"Fetching orders for {current_date} (page {page})...")
        orders_data = fetch_orders(current_date, to_date, page)
        
        if not orders_data or not orders_data.get('items'):
            print("No more orders found.")
            break

        all_orders_data.append(orders_data)
        # Check the last order date from the current batch of orders to adjust the current_date
        last_order_date = orders_data['items'][-1]['created_at']
        current_date = last_order_date.split('T')[0]  # Date part of the last order date
        
        page += 1
        time.sleep(1)

    # Combine all fetched data into a single DataFrame
    all_formatted_data = []
    for orders_data in all_orders_data:
        all_formatted_data.append(format_order_data(orders_data))
    
    return pd.concat(all_formatted_data, ignore_index=True) if all_formatted_data else pd.DataFrame()

# Fetch all orders with a progress bar
print("Starting to fetch orders...")
df_new = fetch_all_orders(FROM_DATE, TO_DATE)

# Save the result to a CSV file
if not df_new.empty:
    df_new.to_csv("magento_orders.csv", index=False)
    print("Formatted orders saved to CSV file.")
else:
    print("No orders found.")


In [None]:
# -------------------------------------------
# -----      3. SEND DATA TO BQ         --------
# -------------------------------------------
from dotenv import load_dotenv
import os
import pandas as pd 
from google.cloud import bigquery

# Magento API Credentials
BQ_PATH_KEY = os.getenv("BQ_PATH_KEY")
BQ_PROJECT_ID = os.getenv("BQ_PROJECT_ID")
BQ_DATASET_ID = os.getenv("BQ_DATASET_ID")
BQ_TABLE_ID = os.getenv("BQ_TABLE_ID")

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = BQ_PATH_KEY

# Define BigQuery parameters
project_id = BQ_PROJECT_ID
dataset_id = BQ_DATASET_ID
table_id = BQ_TABLE_ID

# Full table ID in the format 'project.dataset.table'
table_full_id = f"{project_id}.{dataset_id}.{table_id}"

df_magento_orders = pd.read_csv("magento_orders.csv")
df_magento_orders
# Upload the DataFrame to BigQuery
df_magento_orders.to_gbq(destination_table=table_full_id, project_id=project_id, if_exists='replace')
print('Data uploaded successfully!')