In [4]:
import requests as rs
from zipfile import ZipFile
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
import json
import datetime as dt
import warnings
import duckdb as db
import time


def main():
    script_start_time = dt.datetime.now().strftime("%d %b %Y %I:%M %p")
    print(f'\n\n\n\nmain script started execution at {script_start_time}')

    # Global variables
    base_url = 'https://api.partnercenter.microsoft.com'
    relative_invoices_url = '/v1/invoices'
    relative_invoice_line_items_url = '/v1/invoices/OneTime-<invoiceID>/lineitems/OneTime/BillingLineItems?size=5000'

    # Define functions

    def set_display_options():
        """Set display options for Pandas and warnings."""
        pd.set_option('max_colwidth', None)
        warnings.filterwarnings('ignore')
        pd.set_option('display.max_columns', None)
        pd.set_option('display.max_rows', None)

    def get_duckdb_client(db_file_path: str) -> db.DuckDBPyConnection:
        """Connect to DuckDB and return the connection object."""
        return db.connect(db_file_path)

    def parse_secrets(secrets_file_path: str) -> dict:
        """Parse the secrets.json file and return the secrets as a dictionary."""
        with open(secrets_file_path) as f:
            secrets = json.load(f)
        return secrets

    def execute_query(sql_string: str) -> db.DuckDBPyRelation:
        """Execute a SQL query using DuckDB global connection."""
        try:
            return cxn.query(sql_string)
        except Exception as e:
            print(f"Query execution failed: {e}")

    def print_query(sql_string: str) -> None:
        """Execute a SQL query using DuckDB global connection and print the results."""
        try:
            return cxn.query(sql_string).show(max_width=100000, max_rows=100000)
        except Exception as e:
            print(f"Query execution failed: {e}")

    def get_access_token(refresh_token: str, app_id: str, app_secret: str) -> str:
        """Get an access token using the provided refresh token, app ID, and app secret."""
        request_body = {
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "scope": "openid",
            "resource": "https://api.partnercenter.microsoft.com",
            "client_id": app_id,
            "client_secret": app_secret,
        }

        response = rs.post(
            "https://login.windows.net/6e75cca6-47f0-47a3-a928-9d5315750bd9/oauth2/token",
            data=request_body
        )
        
        response.raise_for_status()
        return response.json()['access_token']

    def get_invoices(base_url: str, headers: dict) -> dict:
        """Get all invoices from partner center."""
        response = rs.get(f"{base_url}{relative_invoices_url}", headers=headers)
        response.raise_for_status()
        return response.json().get('items', [])
    
    def get_invoice_line_items(base_url: str, headers: dict, invoice_id: str) -> dict:
        """Get line items for a specific invoice."""
        response = rs.get(f"{base_url}{relative_invoice_line_items_url.replace('<invoiceID>',invoice_id)}", headers=headers)
        response.raise_for_status()
        return response.json().get('items', [])
    
    def get_daily_rated_line_items(base_url: str, invoice_id: str):
        """Create DuckDB temp table so that intermediary responses can be stored. API sends continuation Token in the first call's response which can be passed to the next call and get the next set of line items. When there is no cToken returned, iteration ends"""
        
        iterate = False
        cToken = None
        iterationCount = 1
        billing_month = '2024-06-01'

        while iterate == True:
                
                print(f"iterationCount : {iterationCount}, cToken : {cToken}")

                HTTPheaders = {'Authorization': 'Bearer ' + access_token,
                                'MS-ContinuationToken': cToken if cToken else None}         # get cToken from response then pass it to the next call
                
                newCommerceOneTimeBillingURL = f'/v1/invoices/{invoice_id}/lineitems?provider=onetime&invoiceLineItemType=usagelineitems&currencyCode=inr&period=current&size=5000' \
                if  iterationCount == 1 \
                else f'/v1/invoices/{invoice_id}/lineitems?provider=onetime&invoiceLineItemType=usagelineitems&currencyCode=inr&period=current&size=5000&seekoperation=next'
                
                response : dict = json.loads(
                rs.get(
                        f"{base_url}{newCommerceOneTimeBillingURL}",
                        headers=HTTPheaders)
                .content)
                
                df = pd.DataFrame(response['items'])

                execute_query(f"create temp table t_daily_rated as select *,'{billing_month}' as billing_month from df") if iterationCount == 1 else execute_query(f"insert into t_daily_rated select *,'{billing_month}' as billing_month from df")
                
                try:
                        cToken = response['continuationToken']            # get cToken from response
                except:
                        break                                             # if no ctoken sent by api, break out of the loop

                iterationCount += 1




    

    set_display_options()

    # Parse secrets file
    print('Trying to parse secrets file')
    try:
        secrets = parse_secrets('../secrets.json')
        print('Secrets file found')
    except (FileNotFoundError, json.JSONDecodeError) as e:
        print(f'Secrets file not found or some error in file: {e}')
        return
    
    refresh_token = secrets['refresh_token']
    app_id = secrets['app_id']
    app_secret = secrets['app_secret']

    # Get DuckDB client
    print('Trying to connect to DuckDB')
    try:
        cxn = get_duckdb_client('../duckdb.db')
        print('Connected to DuckDB')
    except Exception as ex:
        print(f'Unable to connect to DuckDB: {ex}')
        return



    # Get access token
    print('Trying to obtain access token')
    try:
        access_token = get_access_token(refresh_token, app_id, app_secret)
        print(f'Refresh Token valid, access token obtained. \nAccess token: {access_token[:20]}...')
    except Exception as ex:
        print(f'Unable to obtain access token: {ex}')
        return

    http_headers = {'Authorization': 'Bearer ' + access_token}



    # Fetch invoices
    print('Fetching invoices')
    try:
        invoices = get_invoices(base_url, http_headers)
        print(f'Fetched {len(invoices)} invoices')
    except Exception as ex:
        print(f'Unable to fetch invoices: {ex}')
        return
    
    invoices_df = pd.DataFrame(invoices)
    print_query("SELECT * FROM invoices_df")

    # Make a stream of id and invoiceDate pair
    invoice_zip = zip(invoices_df['id'],invoices_df['billingPeriodStartDate'])

    # Convert it to dict
    invoice_dict = dict(invoice_zip)

    # Convert datetime to date
    invoice_dict = {str(key): value[:10] for key, value in invoice_dict.items()}
    print(invoice_dict)

    

    # Iterate over dict items and call invoice line items API
    res = get_invoice_line_items(base_url, http_headers, 'G049360432')
    line_items_df = pd.DataFrame(res)

    jul_line_items = get_invoice_line_items(base_url, http_headers, 'G055364858')
    aug_line_items = get_invoice_line_items(base_url, http_headers, 'G058476717')
    jul_df = pd.DataFrame(jul_line_items)
    aug_df = pd.DataFrame(aug_line_items)

    final_tbl = execute_query(f"""
with jul as (
                select customername,sum(totalForCustomer) total from jul_df group by customername
)
                ,
                aug as (
                    select customername,sum(totalForCustomer) total from aug_df group by customername
                )
                
                select 
                              ifnull(jul.customername,aug.customername) cx_name,round(jul.total,2) as july_total, round(aug.total,2) august_total, round(jul.total - aug.total,2) as diff 
                from jul full join aug on jul.customername = aug.customername order by diff desc
""")
    
    # print_query("SELECT * FROM line_items_df limit 5")
    # for invoice_id,billing_date in invoice_dict.items():
    #     if invoice_id.startswith('G') and dt.datetime.strptime(billing_date, '%Y-%m-%d').date() >= dt.datetime.strptime('2023-04-01', '%Y-%m-%d').date():
    #         res = get_invoice_line_items(base_url, http_headers, invoice_id)
    #         line_items_df = pd.DataFrame(res)
    #         line_items_df['billing_month'] = billing_date
    #         line_items_df.to_parquet(f'../NCE Recon Files/{invoice_id}_{billing_date}.parquet')
    #         time.sleep(20)
    execute_query("select * from final_tbl").to_csv('./recon_line_items.csv')
    execute_query("select round(sum(july_total),2) jt, round(sum(august_total),2) aug_total, round(sum(diff),2) dt from final_tbl").to_csv('./recon_header.csv')
    return (final_tbl, jul_df, aug_df)

    
    
    

    

if __name__ == "__main__":
    keep_trying = 0
    wait_time = 10
    keep_going = True
    (final_tbl, jdf, adf ) = main()

    # If main() succeeds, it will internally modify the global variable keep_going to False, which will terminate the loop. The code will only go in Except block if any exceptions arise which are not handled in main().
    while keep_going:
        try:
            final_tbl = main()
            print(type(final_tbl))
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
            wait_time *= 2  # Exponential backoff
            keep_going = False
            
        except Exception as ex:
            print(f"Error occurred: {ex}")
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
            wait_time *= 2  # Exponential backoff






main script started execution at 07 Sep 2024 07:47 PM
Trying to parse secrets file
Secrets file found
Trying to connect to DuckDB
Connected to DuckDB
Trying to obtain access token
Refresh Token valid, access token obtained. 
Access token: eyJ0eXAiOiJKV1QiLCJh...
Fetching invoices
Fetched 36 invoices
┌────────────┬──────────────────────────────┬────────────────────────┬──────────────────────────────┬──────────────┬─────────────┬──────────────┬────────────────┬──────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

In [12]:
import duckdb
def print_query(sql_string: str) -> None:
    """Execute a SQL query using DuckDB global connection and print the results."""
    try:
        return duckdb.query(sql_string).show(max_width=100000, max_rows=100000)
    except Exception as e:
        print(f"Query execution failed: {e}")
final_tbl, jdf, adf   = (final_tbl, jdf, adf ) 



In [6]:
duckdb.query("select * from jdf union all select * from adf limit 10")

┌──────────────────────┬──────────────────────┬───┬─────────────────┬─────────────┬──────────────────────┐
│      partnerId       │      customerId      │ … │ billingProvider │ promotionId │      attributes      │
│       varchar        │       varchar        │   │     varchar     │   varchar   │ struct(objecttype …  │
├──────────────────────┼──────────────────────┼───┼─────────────────┼─────────────┼──────────────────────┤
│ 6e75cca6-47f0-47a3…  │ a11bcc1d-8378-4456…  │ … │ one_time        │             │ {'objectType': One…  │
│ 6e75cca6-47f0-47a3…  │ 7e2eeedf-390e-4643…  │ … │ one_time        │             │ {'objectType': One…  │
│ 6e75cca6-47f0-47a3…  │ f89c7ce4-d61f-4e19…  │ … │ one_time        │             │ {'objectType': One…  │
│ 6e75cca6-47f0-47a3…  │ 81c384b6-a5e8-4c99…  │ … │ one_time        │             │ {'objectType': One…  │
│ 6e75cca6-47f0-47a3…  │ 76d5d933-2ddc-40cb…  │ … │ one_time        │             │ {'objectType': One…  │
│ 6e75cca6-47f0-47a3…  │ 04ef952a-d3a

In [23]:
print_query("select customername, subscriptionid, skuname, unitprice,quantity,totalforcustomer from adf where customername like 'Kaveri%'  union all  select customername, subscriptionid, skuname, unitprice,quantity,totalforcustomer from jdf where customername like 'Kaveri%' order by skuname,subscriptionid")

┌────────────────────┬──────────────────────────────────────┬───────────────────────────────────────────┬───────────┬──────────┬──────────────────┐
│    customerName    │            subscriptionId            │                  skuName                  │ unitPrice │ quantity │ totalForCustomer │
│      varchar       │               varchar                │                  varchar                  │  double   │  int64   │      double      │
├────────────────────┼──────────────────────────────────────┼───────────────────────────────────────────┼───────────┼──────────┼──────────────────┤
│ Kaveri University  │ 608900df-5d94-4d5a-de6c-78e0b4c4cdd9 │ Office 365 A1 (Education Faculty Pricing) │       0.0 │      200 │              0.0 │
│ Kaveri University  │ 608900df-5d94-4d5a-de6c-78e0b4c4cdd9 │ Office 365 A1 (Education Faculty Pricing) │       0.0 │      200 │              0.0 │
│ Kaveri University  │ 30d3a82c-89f4-40f0-c2c2-3be495ca06ff │ Office 365 A3 (Education Faculty Pricing) │     21