In [2]:
import json
import requests

# Load Shopify credentials
with open("config.json", "r") as f:
    config = json.load(f)

SHOPIFY_STORE = config["shopify_store"]
API_KEY = config["api_key"]
PASSWORD = config["password"]

# Use the latest API version dynamically
SHOPIFY_API_VERSION = "2024-01"  # Update as needed
SHOPIFY_API_URL = f"https://{SHOPIFY_STORE}.myshopify.com/admin/api/{SHOPIFY_API_VERSION}/orders.json"

def fetch_shopify_orders():
    """Fetch sample orders from Shopify API and print response."""
    response = requests.get(SHOPIFY_API_URL, params={"status": "any", "limit": 5}, auth=(API_KEY, PASSWORD))
    
    if response.status_code != 200:
        print(f"Error: {response.status_code}, {response.text}")
        return []

    orders = response.json().get("orders", [])
    print(json.dumps(orders, indent=4))  # Pretty print sample data

fetch_shopify_orders()

[
    {
        "id": 6002412257506,
        "admin_graphql_api_id": "gid://shopify/Order/6002412257506",
        "app_id": 580111,
        "browser_ip": "172.56.179.126",
        "buyer_accepts_marketing": true,
        "cancel_reason": null,
        "cancelled_at": null,
        "cart_token": "Z2NwLXVzLXdlc3QxOjAxSk1KNTgxR1lCM1dRNEo4QVFGRVZSNkgy",
        "checkout_id": 35677846601954,
        "checkout_token": "b196101e74ae5c56567a12094ff992f6",
        "client_details": {
            "accept_language": "en-US",
            "browser_height": null,
            "browser_ip": "172.56.179.126",
            "browser_width": null,
            "session_hash": null,
            "user_agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Mobile Safari/537.36"
        },
        "closed_at": null,
        "confirmation_number": "JKEY5AWXP",
        "confirmed": true,
        "contact_email": "sportdad4life@yahoo.com",
        "created_at": "2025-0

In [1]:
import json
import requests
import pandas as pd

# Load Shopify credentials
with open("config.json", "r") as f:
    config = json.load(f)

SHOPIFY_STORE = config["shopify_store"]
API_KEY = config["api_key"]
PASSWORD = config["password"]

# Use the latest API version dynamically
SHOPIFY_API_VERSION = "2024-01"  # Update as needed
SHOPIFY_API_URL = f"https://{SHOPIFY_STORE}.myshopify.com/admin/api/{SHOPIFY_API_VERSION}/orders.json"

def fetch_shopify_orders():
    """Fetch sample orders from Shopify API and return a pandas DataFrame."""
    response = requests.get(SHOPIFY_API_URL, params={"status": "any", "limit": 250}, auth=(API_KEY, PASSWORD)) # increased limit for better dataframe

    if response.status_code != 200:
        print(f"Error: {response.status_code}, {response.text}")
        return pd.DataFrame() # Return an empty DataFrame on error

    orders_json = response.json().get("orders", [])

    if not orders_json:
        print("No orders found.")
        return pd.DataFrame() # Return an empty DataFrame if no orders

    # Normalize the JSON data into a DataFrame
    df = pd.json_normalize(orders_json)

    return df

orders_df = fetch_shopify_orders()

if not orders_df.empty:
    print(orders_df.head()) # Print the first few rows of the DataFrame
    print(orders_df.info()) # Print information about the DataFrame
    # Now you can perform pandas operations on orders_df
    # Example: orders_df['total_price'] = pd.to_numeric(orders_df['total_price'])
    # Example: print(orders_df['total_price'].sum())
else:
    print("Failed to retrieve or process order data.")

              id               admin_graphql_api_id  app_id  \
0  6002412257506  gid://shopify/Order/6002412257506  580111   
1  6002019369186  gid://shopify/Order/6002019369186  580111   
2  6001818763490  gid://shopify/Order/6001818763490  580111   
3  6001784914146  gid://shopify/Order/6001784914146  580111   
4  6001626972386  gid://shopify/Order/6001626972386  580111   

                               browser_ip  buyer_accepts_marketing  \
0                          172.56.179.126                     True   
1  2601:648:4200:95b0:750c:14a6:7df5:7238                     True   
2                            12.75.243.83                     True   
3                            68.67.95.188                     True   
4  2601:206:8381:5850:f98b:26a0:6ff0:bc9f                     True   

  cancel_reason cancelled_at  \
0          None         None   
1          None         None   
2          None         None   
3          None         None   
4          None         None   

       

In [3]:
orders_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250 entries, 0 to 249
Columns: 191 entries, id to customer.email_marketing_consent
dtypes: bool(9), float64(7), int64(9), object(166)
memory usage: 357.8+ KB


In [4]:
df_columns = orders_df.columns.to_list()
df_columns

['id',
 'admin_graphql_api_id',
 'app_id',
 'browser_ip',
 'buyer_accepts_marketing',
 'cancel_reason',
 'cancelled_at',
 'cart_token',
 'checkout_id',
 'checkout_token',
 'closed_at',
 'confirmation_number',
 'confirmed',
 'contact_email',
 'created_at',
 'currency',
 'current_subtotal_price',
 'current_total_additional_fees_set',
 'current_total_discounts',
 'current_total_duties_set',
 'current_total_price',
 'current_total_tax',
 'customer_locale',
 'device_id',
 'discount_codes',
 'email',
 'estimated_taxes',
 'financial_status',
 'fulfillment_status',
 'landing_site',
 'landing_site_ref',
 'location_id',
 'merchant_of_record_app_id',
 'name',
 'note',
 'note_attributes',
 'number',
 'order_number',
 'order_status_url',
 'original_total_additional_fees_set',
 'original_total_duties_set',
 'payment_gateway_names',
 'phone',
 'po_number',
 'presentment_currency',
 'processed_at',
 'reference',
 'referring_site',
 'source_identifier',
 'source_name',
 'source_url',
 'subtotal_price',

In [None]:
# Script version 2: fetched 12322 recrods since 2024-12-28
'''

import json
import requests
import google.auth
import time
from datetime import datetime, timezone
from google.cloud import bigquery

# Load Shopify credentials
with open("config.json", "r") as f:
    config = json.load(f)

SHOPIFY_STORE = config["shopify_store"]
API_KEY = config["api_key"]
PASSWORD = config["password"]
SHOPIFY_API_VERSION = "2024-01"
SHOPIFY_API_URL = f"https://{SHOPIFY_STORE}.myshopify.com/admin/api/{SHOPIFY_API_VERSION}/orders.json"

# BigQuery Setup
client = bigquery.Client()
DATASET_ID = "Shopify"
TABLE_ID = "shopify_orders"
CONTROL_TABLE = "pipeline_metadata.sync_control"

class ShopifyETLPipeline:
    def __init__(self):
        self.control_table = CONTROL_TABLE

    def get_last_sync_timestamp(self):
        """Retrieve the last successful sync timestamp from BigQuery."""
        query = f"""
        SELECT last_sync_timestamp 
        FROM `{self.control_table}`
        WHERE table_name = 'shopify_orders'
        ORDER BY last_sync_timestamp DESC
        LIMIT 1
        """
        query_job = client.query(query)
        results = list(query_job.result())
        return results[0]["last_sync_timestamp"] if results else None

    def update_sync_timestamp(self, records_processed):
        """Update the last sync timestamp in BigQuery."""
        query = f"""
        MERGE INTO `{self.control_table}` T
        USING (SELECT 'shopify_orders' AS table_name, CURRENT_TIMESTAMP() AS last_sync_timestamp, {records_processed} AS records_processed) S
        ON T.table_name = S.table_name
        WHEN MATCHED THEN UPDATE SET T.last_sync_timestamp = S.last_sync_timestamp, T.records_processed = S.records_processed
        WHEN NOT MATCHED THEN INSERT (table_name, last_sync_timestamp, records_processed) VALUES (S.table_name, S.last_sync_timestamp, S.records_processed)
        """
        client.query(query).result()
        print(f"Updated last sync timestamp: {datetime.now(timezone.utc)}")

    def fetch_shopify_data(self, start_date=None):
        """Fetch all Shopify orders, handling pagination."""
        params = {
            "status": "any",
            "limit": 250,
            "order": "asc"
        }
        if start_date:
            params["updated_at_min"] = start_date

        all_orders = []
        page_count = 1
        next_page_info = None

        while True:
            print(f"Fetching page {page_count}...")

            if next_page_info:
                # If paginating, remove conflicting params and use page_info
                params = {"limit": 250, "page_info": next_page_info}

            response = requests.get(SHOPIFY_API_URL, params=params, auth=(API_KEY, PASSWORD))

            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 60))
                print(f"Rate limit hit. Retrying in {retry_after} seconds...")
                time.sleep(retry_after)
                continue

            if response.status_code != 200:
                print(f"API Error: {response.status_code}, {response.text}")
                break

            orders = response.json().get("orders", [])
            if not orders:
                print("No more orders found, exiting pagination.")
                break  # No more records, exit loop

            for order in orders:
                all_orders.append({
                    "order_id": str(order["id"]),
                    "created_at": order["created_at"],
                    "updated_at": order["updated_at"],
                    "processed_at": order.get("processed_at"),
                    "total_price": float(order["total_price"]),
                    "subtotal_price": float(order["subtotal_price"]),
                    "total_tax": float(order["total_tax"]),
                    "total_discounts": float(order["total_discounts"]),
                    "currency": order["currency"],
                    "financial_status": order.get("financial_status", ""),
                    "fulfillment_status": order.get("fulfillment_status", ""),
                    "customer_locale": order.get("customer_locale", ""),
                    "customer_email": order.get("contact_email", ""),
                    "customer_id": str(order["customer"]["id"]) if order.get("customer") else None,
                    "buyer_accepts_marketing": order.get("buyer_accepts_marketing", False),
                    "discount_codes": json.dumps(order.get("discount_codes", [])),
                    "payment_gateway_names": json.dumps(order.get("payment_gateway_names", [])),
                    "source_name": order.get("source_name"),
                    "referring_site": order.get("referring_site")
                })

            print(f"Fetched {len(orders)} orders on page {page_count}. Total collected: {len(all_orders)}")

            # Extract `page_info` from Shopify API response for pagination
            link_header = response.headers.get("Link")
            if link_header and "rel=\"next\"" in link_header:
                next_page_info = link_header.split("page_info=")[-1].split(">")[0]
                page_count += 1
            else:
                print("Pagination complete.")
                break  # No more pages, exit loop

        return all_orders

    def load_to_bigquery(self, orders):
        """Upsert orders into BigQuery."""
        table_ref = client.dataset(DATASET_ID).table(TABLE_ID)
        errors = client.insert_rows_json(table_ref, orders)

        if errors:
            print(f"BigQuery insertion errors: {errors}")
        else:
            print(f"Successfully inserted {len(orders)} rows into {TABLE_ID}")

    def execute(self, force_full_load=False):
        """Main execution function to handle historical & incremental loads."""
        last_sync = self.get_last_sync_timestamp() if not force_full_load else None

        if last_sync is None:
            print("Performing full historical load from 2024-01-01")
            orders = self.fetch_shopify_data(start_date="2024-01-01")
        else:
            print(f"Performing incremental load since {last_sync}")
            orders = self.fetch_shopify_data(start_date=last_sync)

        if orders:
            self.load_to_bigquery(orders)
            self.update_sync_timestamp(len(orders))
        else:
            print("No new orders to load.")

# Usage:
pipeline = ShopifyETLPipeline()

# For historical load (only run once):
pipeline.execute(force_full_load=True)

# For regular incremental runs (new data only):
pipeline.execute()
'''

In [5]:
orders_df.describe()

Unnamed: 0,id,app_id,checkout_id,number,order_number,total_weight,user_id,billing_address.latitude,billing_address.longitude,customer.id,customer.sms_marketing_consent,customer.default_address.id,customer.default_address.customer_id,shipping_address.latitude,shipping_address.longitude,customer.email_marketing_consent
count,250.0,250.0,250.0,250.0,250.0,250.0,6.0,91.0,91.0,250.0,0.0,250.0,250.0,228.0,228.0,0.0
mean,5987192000000.0,625188.1,35636680000000.0,12203.5,13203.5,521.28,94605990000.0,38.403551,-117.650261,7839510000000.0,,9413559000000.0,7839510000000.0,38.501571,-118.393124,
std,9235861000.0,316952.9,25287340000.0,72.312977,72.312977,2068.184005,173880500.0,2.295584,11.595448,46718710000.0,,23212180000.0,46718710000.0,2.638087,10.004304,
min,5972349000000.0,580111.0,35573610000000.0,12079.0,13079.0,0.0,94251060000.0,30.548465,-123.355566,7794139000000.0,,9349154000000.0,7794139000000.0,30.589738,-124.246177,
25%,5978110000000.0,580111.0,35613390000000.0,12141.25,13141.25,0.0,94676980000.0,37.744138,-122.103994,7794656000000.0,,9409519000000.0,7794656000000.0,37.783883,-122.01742,
50%,5987117000000.0,580111.0,35635570000000.0,12203.5,13203.5,0.0,94676980000.0,38.35919,-121.376799,7844440000000.0,,9417958000000.0,7844440000000.0,38.444121,-121.321099,
75%,5995434000000.0,580111.0,35658330000000.0,12265.75,13265.75,0.0,94676980000.0,38.748221,-120.98816,7867402000000.0,,9428522000000.0,7867402000000.0,38.910004,-120.522773,
max,6002412000000.0,3890849.0,35677850000000.0,12328.0,13328.0,21047.0,94676980000.0,47.561335,-71.849987,7925031000000.0,,9436685000000.0,7925031000000.0,49.121667,-69.774546,


In [6]:
orders_df['line_items']

0      [{'id': 14577930404066, 'admin_graphql_api_id'...
1      [{'id': 14577224876258, 'admin_graphql_api_id'...
2      [{'id': 14576878026978, 'admin_graphql_api_id'...
3      [{'id': 14576813310178, 'admin_graphql_api_id'...
4      [{'id': 14576518758626, 'admin_graphql_api_id'...
                             ...                        
245    [{'id': 14523626881250, 'admin_graphql_api_id'...
246    [{'id': 14523417264354, 'admin_graphql_api_id'...
247    [{'id': 14523388428514, 'admin_graphql_api_id'...
248    [{'id': 14523160264930, 'admin_graphql_api_id'...
249    [{'id': 14522158383330, 'admin_graphql_api_id'...
Name: line_items, Length: 250, dtype: object

In [7]:
orders_df['line_items'].to_list()

[[{'id': 14577930404066,
   'admin_graphql_api_id': 'gid://shopify/LineItem/14577930404066',
   'attributed_staffs': [],
   'current_quantity': 2,
   'fulfillable_quantity': 2,
   'fulfillment_service': 'manual',
   'fulfillment_status': None,
   'gift_card': False,
   'grams': 0,
   'name': 'Trix Rigging Kit',
   'pre_tax_price': '11.00',
   'pre_tax_price_set': {'shop_money': {'amount': '11.00',
     'currency_code': 'USD'},
    'presentment_money': {'amount': '11.00', 'currency_code': 'USD'}},
   'price': '5.50',
   'price_set': {'shop_money': {'amount': '5.50', 'currency_code': 'USD'},
    'presentment_money': {'amount': '5.50', 'currency_code': 'USD'}},
   'product_exists': True,
   'product_id': 8788626440418,
   'properties': [],
   'quantity': 2,
   'requires_shipping': True,
   'sku': '',
   'taxable': True,
   'title': 'Trix Rigging Kit',
   'total_discount': '0.00',
   'total_discount_set': {'shop_money': {'amount': '0.00',
     'currency_code': 'USD'},
    'presentment_mone

In [12]:
orders_df['line_items'][0]

[{'id': 14576518758626,
  'admin_graphql_api_id': 'gid://shopify/LineItem/14576518758626',
  'attributed_staffs': [],
  'current_quantity': 1,
  'fulfillable_quantity': 1,
  'fulfillment_service': 'manual',
  'fulfillment_status': None,
  'gift_card': False,
  'grams': 0,
  'name': 'Trix Rigging Kit',
  'pre_tax_price': '5.50',
  'pre_tax_price_set': {'shop_money': {'amount': '5.50',
    'currency_code': 'USD'},
   'presentment_money': {'amount': '5.50', 'currency_code': 'USD'}},
  'price': '5.50',
  'price_set': {'shop_money': {'amount': '5.50', 'currency_code': 'USD'},
   'presentment_money': {'amount': '5.50', 'currency_code': 'USD'}},
  'product_exists': True,
  'product_id': 8788626440418,
  'properties': [],
  'quantity': 1,
  'requires_shipping': True,
  'sku': '',
  'taxable': True,
  'title': 'Trix Rigging Kit',
  'total_discount': '0.00',
  'total_discount_set': {'shop_money': {'amount': '0.00',
    'currency_code': 'USD'},
   'presentment_money': {'amount': '0.00', 'currency

In [13]:
orders_df.columns

Index(['id', 'admin_graphql_api_id', 'app_id', 'browser_ip',
       'buyer_accepts_marketing', 'cancel_reason', 'cancelled_at',
       'cart_token', 'checkout_id', 'checkout_token',
       ...
       'shipping_address.latitude', 'shipping_address.longitude',
       'shipping_address.name', 'shipping_address.country_code',
       'shipping_address.province_code',
       'customer.sms_marketing_consent.state',
       'customer.sms_marketing_consent.opt_in_level',
       'customer.sms_marketing_consent.consent_updated_at',
       'customer.sms_marketing_consent.consent_collected_from',
       'customer.email_marketing_consent'],
      dtype='object', length=191)

In [14]:
orders_df['created_at']

0      2025-02-20T09:19:23-08:00
1      2025-02-20T02:02:31-08:00
2      2025-02-19T20:11:52-08:00
3      2025-02-19T19:22:24-08:00
4      2025-02-19T15:45:28-08:00
                 ...            
245    2025-01-27T09:13:51-08:00
246    2025-01-27T07:59:18-08:00
247    2025-01-27T07:42:41-08:00
248    2025-01-27T05:21:19-08:00
249    2025-01-26T15:32:28-08:00
Name: created_at, Length: 250, dtype: object

In [15]:
orders_df['processed_at']

0      2025-02-20T09:19:21-08:00
1      2025-02-20T02:02:19-08:00
2      2025-02-19T20:11:49-08:00
3      2025-02-19T19:22:21-08:00
4      2025-02-19T15:45:23-08:00
                 ...            
245    2025-01-27T09:13:50-08:00
246    2025-01-27T07:59:17-08:00
247    2025-01-27T07:42:41-08:00
248    2025-01-27T05:21:15-08:00
249    2025-01-26T15:32:24-08:00
Name: processed_at, Length: 250, dtype: object

In [16]:
orders_df['processed_at']

0      2025-02-20T09:19:21-08:00
1      2025-02-20T02:02:19-08:00
2      2025-02-19T20:11:49-08:00
3      2025-02-19T19:22:21-08:00
4      2025-02-19T15:45:23-08:00
                 ...            
245    2025-01-27T09:13:50-08:00
246    2025-01-27T07:59:17-08:00
247    2025-01-27T07:42:41-08:00
248    2025-01-27T05:21:15-08:00
249    2025-01-26T15:32:24-08:00
Name: processed_at, Length: 250, dtype: object