These pipelines extract supermarket sales data from the Imonggo API as per https://help.imonggo.com/en/collections/154448-api, cleans the data, transforms it, and then convert it to a format more amenable for analytics. The data is then stored in an Azure Blob Storage container. The code (ex installation i.e. pip install ...) can be implemented manually as ipynb script, a local .py executable, or on Azure Functions, Azure Synapse Spark pools, or Databricks (analogous services on other cloud providers can be used as well. 

#Code for extracting invoices from Imonggo and storing in Azure Storage as flat file
For each page of products (50 invoices per page):

1)Retrieve the pages of data using Imonggo API as XML

2)Convert XML to JSON

3)Select columns to extract and transform

4)Flatten JSON

5)Convert JSON to CSV

6)Upload to Azure Blob Storage container

In [None]:
IMONGGO_UNAME = "9c59fd765e471bf004c85a218454780d29b582ac"
AZR_CONN_STR = 'DefaultEndpointsProtocol=https;AccountName=shopmobetl;AccountKey=pYhOEPZXRNUO3cxrXeCvPMaPSHuid2bWyHAnLGnCAoisRgB8Mw97eAsl4tgINm6Mh4TLqzTNyxVP+AStQdDPSg==;EndpointSuffix=core.windows.net' #os.getenv('AZURE_STORAGE_CONNECTION_STRING')

In [None]:
'''IMONGGO Extract Invoices into Staging Area PIPELINE'''
#install relevant packages
!pip install xmltodict
!pip install azure-storage-blob
#get the data
import requests
import os
#set Imonggo auth details
username = IMONGGO_UNAME#Check imonggo API support site on how to get your username
password = "x"

# Create a local directory to hold blob data
try:
  local_path = "/content/invoices"
  os.mkdir(local_path)
except Exception as e:
  print('Local path creation exception:')
  print(e)

#retrieve, flatten, and convert data
try:
  for i in range(1, 10000):
    r = requests.get(url = "https://shopmob1.imonggo.com/api/invoices.xml?after=2021-09-01%2023:59&page="+str(i), auth=(username, password) )
    #convert xml to json
    import xmltodict
    dict_data = xmltodict.parse(r.content)
    #clean the data
    if dict_data['invoices'] != None:
      try:
        data_arr = dict_data['invoices']['invoice']
        clean_data = []
        for item in data_arr:
          clean_item = {}
          item_obj = item['invoice_lines']['invoice_line']
          if not isinstance(item_obj, list):
            keys = []
            for key in item_obj.keys():
              keys.append(key)
            relevant_keys = keys[-12:]
            for key in relevant_keys:
              clean_item[key] = item_obj[key]
          else:
            for sub_item in item_obj:
              keys = []
              for key in sub_item.keys():
                keys.append(key)
              relevant_keys = keys[-12:]
              for key in relevant_keys:
                clean_item[key] = sub_item[key]
          clean_item['date'] = item['utc_invoice_date'] 
          try:
            clean_item['payment_type'] = item['payments']['payment']['payment_type_name'] 
          except Exception as e:
            print('Data cleaning error:')
            print('Failed to set payment type')
            clean_item['payment_type'] = 'unknown' 
          clean_data.append(clean_item)
        #flatten json - cleaned data
        from pandas.io.json import json_normalize
        flat_data = json_normalize(clean_data)
      except Exception as e:
        print('Data Cleaning Error: skipping...')
        print(e)
        #flatten json - no cleaning
        from pandas.io.json import json_normalize
        flat_data = json_normalize(dict_data)
      #load json to csv
      flat_data.to_csv('invoices/invoices'+str(i)+'.csv')
      print('downloaded invoices page '+ str(i))
    else:
      print('Max number of invoice pages exceeded at '+ str(i-1))
      max_page_count = i
      break
except Exception as e:
  print(e)
  


#upload the data to Azure Storage
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__

try:
    print("Starting upload process...")
    # Retrieve the connection string for use with the application. The storage
    connect_str = AZR_CONN_STR
    # Create the BlobServiceClient object which will be used to create a container client
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    # Create a unique name for the container
    container_name = "shopmobetlcont" #+ str(uuid.uuid4())
    try:
      container_client = blob_service_client.create_container(container_name)
    except Exception as e:
      print('Storage container creation exception:')
      print(e)
    
    #upload the files
    for i in range(1, max_page_count):
      try:
        # Create a file in the local data directory to upload and download
        local_file_name = "invoices" + str(i) + ".csv"
        upload_file_path = os.path.join(local_path, local_file_name)

        # Create a blob client using the local file name as the name for the blob
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=upload_file_path)

        print("\nUploading to Azure Storage as blob:\n\t" + local_file_name)

        # Upload the created file
        with open(upload_file_path, "rb") as data:
            blob_client.upload_blob(data)
            
      except Exception as e:
        print('Upload to container exception:')
        print(e)


    # List the blobs in the container
    print("\nListing blobs...")
    blob_list = container_client.list_blobs()
    for blob in blob_list:
        print("\t" + blob.name)
        
except Exception as e:
    print('Exception: ')
    print(e)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Local path creation exception:
[Errno 17] File exists: '/content/invoices'
Max number of invoice pages exceeded at 450
Starting upload process...
Storage container creation exception:
The specified container already exists.
RequestId:3bc1874f-101e-0007-4f24-ba4e06000000
Time:2022-08-27T14:51:00.3526758Z
ErrorCode:ContainerAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>ContainerAlreadyExists</Code><Message>The specified container already exists.
RequestId:3bc1874f-101e-0007-4f24-ba4e06000000
Time:2022-08-27T14:51:00.3526758Z</Message></Error>

Uploading to Azure Storage as blob:
	invoices1.csv
Upload to container exception:
The specified blob already exists.
RequestId:3bc187a7-101e-0007-1a24-ba4e06000000
Time:2022-08-27T14:51:00.4696083Z
ErrorCode:BlobAlreadyExists


#Code for extracting products from Imonggo and storing in Azure Storage as flat file
For each page of products (50 products per page):


1)Retrieve the pages of data using Imonggo API as XML

2)Convert XML to JSON

3)Select columns to extract and transform

4)Flatten JSON

5)Convert JSON to CSV

6)Upload to Azure Blob Storage container

In [None]:
'''IMONGGO Extract Products into Staging Area PIPELINE'''
#install relevant packages
!pip install xmltodict
!pip install azure-storage-blob
#get the data
import requests
import os
#set Imonggo auth details
username = "9c59fd765e471bf004c85a218454780d29b582ac"#Check imonggo API support site on how to get your username
password = "x"

# Create a local directory to hold blob data
try:
  local_path = "/content/products"
  os.mkdir(local_path)
except Exception as e:
  print('Local path creation exception:')
  print(e)

#retrieve, flatten, and convert data
try:
  for i in range(1, 10000):
    r = requests.get(url = "https://shopmob1.imonggo.com/api/products.xml?active_only=1&page="+ str(i), auth=(username, password) )
    #convert xml to json
    import xmltodict
    dict_data = xmltodict.parse(r.content)
    #flatten json
    if dict_data['products'] != None:
      try:
        data_arr = dict_data['products']['product']
        keys = []
        for key in data_arr[0]['inventories']['inventory'].keys():
          keys.append(key)
        #print(keys[:])
        clean_data = []
        for item in data_arr:
          clean_item = {}
          item_obj = item['inventories']['inventory']
          keys = []
          for key in item_obj.keys():
            keys.append(key)
          relevant_keys = keys[:]
          for key in relevant_keys:
            clean_item[key] = item_obj[key]
          clean_data.append(clean_item)
          from pandas.io.json import json_normalize
          flat_data = json_normalize(clean_data)
      except Exception as e:
        print('Data Cleaning Error: skipping...')
        print(e)
        #flatten json - no cleaning
        from pandas.io.json import json_normalize
        flat_data = json_normalize(dict_data)
      #load json to csv
      flat_data.to_csv('products/products'+str(i)+'.csv')
      print('downloaded products page '+ str(i))
    else:
      print('Max number of product pages exceeded at '+ str(i-1))
      max_page_count = i
      break
except Exception as e:
  print(e)


#upload the data to Azure Storage
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__

try:
    print("Starting upload process...")
    # Retrieve the connection string for use with the application. The storage
    connect_str = 'DefaultEndpointsProtocol=https;AccountName=shopmobetl;AccountKey=pYhOEPZXRNUO3cxrXeCvPMaPSHuid2bWyHAnLGnCAoisRgB8Mw97eAsl4tgINm6Mh4TLqzTNyxVP+AStQdDPSg==;EndpointSuffix=core.windows.net' #os.getenv('AZURE_STORAGE_CONNECTION_STRING')
    # Create the BlobServiceClient object which will be used to create a container client
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    # Create a unique name for the container
    container_name = "shopmobetlcont" #+ str(uuid.uuid4())
    try:
      container_client = blob_service_client.create_container(container_name)
    except Exception as e:
      print('Storage container creation exception:')
      print(e)
    
    #upload the files
    for i in range(1, max_page_count):
      try:
        # Create a file in the local data directory to upload and download
        local_file_name = "products" + str(i) + ".csv"
        upload_file_path = os.path.join(local_path, local_file_name)

        # Create a blob client using the local file name as the name for the blob
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=upload_file_path)

        print("\nUploading to Azure Storage as blob:\n\t" + local_file_name)

        # Upload the created file
        with open(upload_file_path, "rb") as data:
            blob_client.upload_blob(data)
            
      except Exception as e:
        print('Upload to container exception:')
        print(e)


    # List the blobs in the container
    print("\nListing blobs...")
    blob_list = container_client.list_blobs()
    for blob in blob_list:
        print("\t" + blob.name)
        
except Exception as e:
    print('Exception: ')
    print(e)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Local path creation exception:
[Errno 17] File exists: '/content/products'




downloaded products page 1
downloaded products page 2
Data Cleaning Error: skipping...
'inventory'
downloaded products page 3




downloaded products page 4
downloaded products page 5
downloaded products page 6
downloaded products page 7
downloaded products page 8
Data Cleaning Error: skipping...
'inventory'
downloaded products page 9
Max number of product pages exceeded at 9
Starting upload process...
Storage container creation exception:
The specified container already exists.
RequestId:a8ce58e9-301e-002f-1457-be2fae000000
Time:2022-09-01T23:03:27.6506949Z
ErrorCode:ContainerAlreadyExists
Content: <?xml version="1.0" encoding="utf-8"?><Error><Code>ContainerAlreadyExists</Code><Message>The specified container already exists.
RequestId:a8ce58e9-301e-002f-1457-be2fae000000
Time:2022-09-01T23:03:27.6506949Z</Message></Error>

Uploading to Azure Storage as blob:
	products1.csv

Uploading to Azure Storage as blob:
	products2.csv

Uploading to Azure Storage as blob:
	products3.csv

Uploading to Azure Storage as blob:
	products4.csv

Uploading to Azure Storage as blob:
	products5.csv

Uploading to Azure Storage as blob

1
