In [None]:
# Read the Spark configuration for credentials, used in following cells.
#
# See Azure docs: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-azure-create-spark-configuration
# See Spark docs: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkConf.html

import os
import traceback

from pyspark.conf import SparkConf
from pyspark.context import SparkContext

storage_acct, storage_key = None, None

try:
    # Read the SparkConf for the Azure Storage credentials.
    conf = SparkConf()
    storage_acct = conf.get("spark.storage.acct")
    storage_key  = conf.get("spark.storage.key")
except Exception as e:
    print("Exception reading SparkConf: {}".format(str(e)))
    traceback.print_exc()

print(f'storage_acct:       {storage_acct}')
print(f'storage_key prefix: {storage_key[0:10]}')


In [None]:
# Define Python classes in this cell, used in following cells.

from azure.core.exceptions import ResourceExistsError
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from io import StringIO
import os
import os.path
import pandas as pd
import sys
import traceback

# ==============================================================================

class Storage(object):
    """ This class is an interface to Azure Storage. """

    def __init__(self, opts={}):
        acct_name = opts['acct']
        acct_key  = opts['key']
        acct_url  = 'https://{}.blob.core.windows.net/'.format(acct_name)

        self.blob_service_client = BlobServiceClient(
            account_url=acct_url, credential=acct_key)

    def account_info(self):
        return self.blob_service_client.get_account_information()

    def list_containers(self):
        clist = list()
        try:
            containers = self.blob_service_client.list_containers(include_metadata=True)
            for container in containers:
                clist.append(container)
            return clist
        except ResourceExistsError:
            return clist

    def create_container(self, cname):
        try:
            container_client = self.blob_service_client.get_container_client(cname)
            container_client.create_container()
            print('create_container: {}'.format(cname))
        except ResourceExistsError:
            pass

    def delete_container(self, cname):
        try:
            container_client = self.blob_service_client.get_container_client(cname)
            container_client.delete_container()
            print('delete_container: {}'.format(cname))
        except ResourceNotFoundError:
            pass

    def list_container(self, cname):
        try:
            container_client = self.blob_service_client.get_container_client(cname)
            return container_client.list_blobs()
        except ResourceExistsError:
            return list()

    def upload_blob_from_file(self, local_file_path, cname, blob_name, overwrite=True):
        try:
            blob_client = self.blob_service_client.get_blob_client(container=cname, blob=blob_name)
            with open(local_file_path, "rb") as data:
                blob_client.upload_blob(data, overwrite=overwrite)
            print('upload_blob_from_file: {} -> {} {}'.format(local_file_path, cname, blob_name))
            return True
        except ResourceNotFoundError:
            return False

    def upload_blob_from_string(self, string_data, cname, blob_name, overwrite=True):
        try:
            blob_client = self.blob_service_client.get_blob_client(container=cname, blob=blob_name)
            print('upload_blob_from_string: {} {}'.format(cname, blob_name))
            blob_client.upload_blob(string_data, overwrite=overwrite)
            return True
        except ResourceNotFoundError:
            return False

    def download_blob(self, cname, blob_name, local_file_path):
        try:
            blob_client = self.blob_service_client.get_blob_client(container=cname, blob=blob_name)
            with open(local_file_path, "wb") as download_file:
                download_file.write(blob_client.download_blob().readall())
            print('download_blob: {} {} -> {}'.format(cname, blob_name, local_file_path))
        except ResourceNotFoundError:
            pass

    def download_blob_to_string(self, cname, blob_name):
        blob_client = self.blob_service_client.get_blob_client(container=cname, blob=blob_name)
        downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')
        return downloader.readall()

# ==============================================================================

class AmCdbPartitionKeyStatsAggregator(object):
    """
    This class is used to read and parse the Cosmos DB Partition Key Statistics
    that are in Azure Storage via the automatic copy from Azure Monitor.
    """

    def __init__(self, stor: Storage):
        self.stor = stor
        self.folder_name = 'am-cdbpartitionkeystatistics'
        self.output_columns = 'year,month,day,hour,minute,subscription,resource_group,account,region,database,container,pk,kb,gb'.split(',')
        self.output_tuples = list()

    def read_parse_storage_partition_key_stats(self):
        """
        Return a Pandas DataFrame from the aggregated data of many blobs produced every 5-minutes.
        
        The raw JSON blob data looks like the following; one blob can have n-number of JSON lines:

        { "TimeGenerated": "2023-06-22T17:28:57.8885796Z", "AccountName": "gbbcjcdbnosql", "RegionName": "East US", "PartitionKey": "[\"triple|123\"]", "SizeKb": 13277, "DatabaseName": "dev", "CollectionName": "npm_graph", "_Internal_WorkspaceResourceId": "/subscriptions/xxx/resourcegroups/gbbcjcore/providers/microsoft.operationalinsights/workspaces/gbbcjmonitor", "Type": "CDBPartitionKeyStatistics", "TenantId": "ttt", "_ResourceId": "/SUBSCRIPTIONS/xxx/RESOURCEGROUPS/GBBCJCOSMOS/PROVIDERS/MICROSOFT.DOCUMENTDB/DATABASEACCOUNTS/GBBCJCDBNOSQL"}
        { "TimeGenerated": "2023-06-22T17:29:07.7421709Z", "AccountName": "gbbcjcdbnosql", "RegionName": "East US", "PartitionKey": "[\"movie_seed\"]", "SizeKb": 418946, "DatabaseName": "dev", "CollectionName": "imdb_seed", "_Internal_WorkspaceResourceId": "/subscriptions/xxx/resourcegroups/gbbcjcore/providers/microsoft.operationalinsights/workspaces/gbbcjmonitor", "Type": "CDBPartitionKeyStatistics", "TenantId": "ttt", "_ResourceId": "/SUBSCRIPTIONS/xxx/RESOURCEGROUPS/GBBCJCOSMOS/PROVIDERS/MICROSOFT.DOCUMENTDB/DATABASEACCOUNTS/GBBCJCDBNOSQL"}
        { "TimeGenerated": "2023-06-22T17:31:21.5493742Z", "AccountName": "gbbcjcdbnosql", "RegionName": "East US", "PartitionKey": "[\"triple|123\"]", "SizeKb": 13278, "DatabaseName": "dev", "CollectionName": "npm_graph", "_Internal_WorkspaceResourceId": "/subscriptions/xxx/resourcegroups/gbbcjcore/providers/microsoft.operationalinsights/workspaces/gbbcjmonitor", "Type": "CDBPartitionKeyStatistics", "TenantId": "ttt", "_ResourceId": "/SUBSCRIPTIONS/xxx/RESOURCEGROUPS/GBBCJCOSMOS/PROVIDERS/MICROSOFT.DOCUMENTDB/DATABASEACCOUNTS/GBBCJCDBNOSQL"}
        { "TimeGenerated": "2023-06-22T17:31:11.6379249Z", "AccountName": "gbbcjcdbnosql", "RegionName": "East US", "PartitionKey": "[\"movie_seed\"]", "SizeKb": 418946, "DatabaseName": "dev", "CollectionName": "imdb_seed", "_Internal_WorkspaceResourceId": "/subscriptions/xxx/resourcegroups/gbbcjcore/providers/microsoft.operationalinsights/workspaces/gbbcjmonitor", "Type": "CDBPartitionKeyStatistics", "TenantId": "ttt", "_ResourceId": "/SUBSCRIPTIONS/xxx/RESOURCEGROUPS/GBBCJCOSMOS/PROVIDERS/MICROSOFT.DOCUMENTDB/DATABASEACCOUNTS/GBBCJCDBNOSQL"}
        { "TimeGenerated": "2023-06-22T17:31:26.4979342Z", "AccountName": "gbbcjcdbnosql", "RegionName": "East US", "PartitionKey": "[\"movie_seed\"]", "SizeKb": 418945, "DatabaseName": "dev", "CollectionName": "imdb_seed", "_Internal_WorkspaceResourceId": "/subscriptions/xxx/resourcegroups/gbbcjcore/providers/microsoft.operationalinsights/workspaces/gbbcjmonitor", "Type": "CDBPartitionKeyStatistics", "TenantId": "ttt", "_ResourceId": "/SUBSCRIPTIONS/xxx/RESOURCEGROUPS/GBBCJCOSMOS/PROVIDERS/MICROSOFT.DOCUMENTDB/DATABASEACCOUNTS/GBBCJCDBNOSQL"}
        """
        try:
            containers = self.stor.list_containers()
            columns = self._standard_output_columns()
            seq, download_count, tuples, max_tuples = 0, 0, list(), 1_000_000

            # Iterate the containers, and the blobs in these containers.
            # Read each blob into a Pandas DataFrame.
            # Iterate the rows of each DataFrame, parse and collect into a list of tuples.
            # Create an output DataFrame from the list of tuples, return the DataFrame.
            # The DataFrame is then written to Azure Storage as CSV by the calling method.

            for c in containers:
                if c.name.startswith(self.folder_name):
                    print('container: {}'.format(c.name))
                    blobs = self.stor.list_container(c.name)
                    for blob_idx, blob in enumerate(blobs):
                        if len(tuples) < max_tuples:
                            print('container: {} blob name: {} size: {}'.format(
                                blob.container, blob.name, blob.size))
                            download_count = download_count + 1
                            blob_text = self.stor.download_blob_to_string(c.name, blob.name)
                            sio = StringIO(blob_text)
                            time_values = self._parse_year_month_day_hour_minute_values(blob.name, 'blob')
                            df = pd.read_json(sio, lines=True)
                            for index, df_row in df.iterrows():
                                subs_rg = self._parse_subscription_and_resource_group(df_row['_ResourceId'])
                                # for each blob JSON row produce a CSV row, append to tuples
                                values = list()
                                values.append(str(time_values[0]))
                                values.append(str(time_values[1]))
                                values.append(str(time_values[2]))
                                values.append(str(time_values[3]))
                                values.append(str(time_values[4]))
                                values.append(str(subs_rg[0]))
                                values.append(str(subs_rg[1]))
                                values.append(str(df_row['AccountName']))
                                values.append(str(df_row['RegionName']))
                                values.append(str(df_row['DatabaseName']))
                                values.append(str(df_row['CollectionName']))
                                values.append(self._scrub_partition_key(df_row['PartitionKey']))
                                values.append(str(df_row['SizeKb']))
                                values.append(str(self._kb_to_gb(df_row['SizeKb'])))
                                tuples.append(tuple(values))

            print('blobs processed:  {}'.format(download_count))
            print('parsed row count: {}'.format(len(tuples)))
            return pd.DataFrame(tuples, columns=columns)

        except Exception as e:
            print("exception: {}".format(str(e)))
            traceback.print_exc()

    def _standard_output_columns(self):
        fields = 'year,month,day,hour,minute,subscription,resource_group,account,region,database,container,pk,kb,gb'
        return fields.split(',')

    def _parse_year_month_day_hour_minute_values(self, filename: str, blob_or_file: str):
        if blob_or_file == 'blob':
            int_idx = 2
        else:
            int_idx = 1
        try:
            fits_ymdh_pattern = False
            if '/y' in filename:
                if '/m' in filename:
                    if '/d' in filename:
                        if '/h' in filename:
                            fits_ymdh_pattern = True
            if fits_ymdh_pattern == True:
                tokens = filename.split('/')
                for token_idx, token in enumerate(tokens):
                    if token.startswith('y'):
                        if tokens[token_idx + 1].startswith('m'):
                            if tokens[token_idx + 2].startswith('d'):
                                if tokens[token_idx + 3].startswith('h'):
                                    if tokens[token_idx + 4].startswith('m'):
                                        time_values = list()
                                        time_values.append(int(tokens[token_idx][int_idx:]))
                                        time_values.append(int(tokens[token_idx + 1][int_idx:]))
                                        time_values.append(int(tokens[token_idx + 2][int_idx:]))
                                        time_values.append(int(tokens[token_idx + 3][int_idx:]))
                                        time_values.append(int(tokens[token_idx + 4][int_idx:]))
                                        return time_values
        except:
            pass
        return None

    def _scrub_partition_key(self, pk):
        return pk.replace('[','').replace(']','').replace("\"",'')

    def _parse_subscription_and_resource_group(self, resource_id):
        tokens = resource_id.split('/')
        values = '?,?'.split(',')
        try:
            for idx, token in enumerate(tokens):
                if token.upper() == 'SUBSCRIPTIONS':
                    values[0] = tokens[idx + 1]
                if token.upper() == 'RESOURCEGROUPS':
                    values[1] = tokens[idx + 1].lower()
        except:
            pass
        return values

    def _kb_to_gb(self, kb_str):
        try:
            kb = float(kb_str)
            return kb / 1024.0 / 1024.0
        except:
            return float(-1.0)

def read_parse_storage_partition_key_stats():
    stor = _get_storage_object()
    agg = AmCdbPartitionKeyStatsAggregator(stor)
    df = agg.read_parse_storage_partition_key_stats()
    cname, blobname = 'wrangled', 'partition_key_stats.csv'
    print(f'writing DataFrame to container: {cname} blob: {blobname}')
    stor.upload_blob_from_string(df.to_csv(), cname, blobname)
    print(f'writing DataFrame to tmp/{blobname}')
    df.to_csv(f'tmp/{blobname}', encoding='utf-8', index=False)

# "private" methods, not directly invoked by the command-line, below:

def _get_storage_object():
    opts = dict()
    opts['acct'] = storage_acct  # os.environ['AZURE_AML_STORAGE_NAME']
    opts['key']  = storage_key   # os.environ['AZURE_AML_STORAGE_KEY']
    stor = Storage(opts)
    print(stor.account_info())
    return stor

def _overwrite():
    return True


print(f'storage_acct: {storage_acct}')
print('define classes cell completed')


In [None]:
# Main logic. Use the classes and code defined in the above cells.
# Read the many Azure Storage Blobs populated from Azure Monitor
# regarding Cosmos DB partition key stats.
# Merge these into a Spark DataFrame (df).

stor = _get_storage_object()
agg = AmCdbPartitionKeyStatsAggregator(stor)
df = agg.read_parse_storage_partition_key_stats()



In [None]:
# Display the shape of the DataFrame, and upload it to another
# Azure Storage blob in CSV format.

print('df shape: {}'.format(df.shape))
cname, blobname = 'wrangled', 'partition_key_stats_from_synapse.csv'
print(f'writing DataFrame to container: {cname} blob: {blobname}')
stor.upload_blob_from_string(df.to_csv(), cname, blobname)
