# High-Performance Customer Product Big Data Analysis on Azure Databricks

Suppose you are a business owner with several product categories used by numerous customers across various locations within the USA. Assuming you have a table containing the geographic coordinates of your customers and another table summarizing all the services available for those customers, the following script provides insights into the product quality experienced by your customers at different Census geographic levels (i.e., State, County, Tract, Block Group, and Block). This template is designed in a modular form to be executed as a scheduled notebook in Azure Databricks, utilizing the high-performance Delta table format. In this scenario, it is assumed that the products are categorized as A, B, or C, with A being the highest quality and C being the lowest. The customer_id is a unique identifier representing individual customers across the USA, identified by their latitude and longitude, facilitating the association between the product and customer tables before aggregation and analysis. The product power is considered an indicator of its quality for each product (A, B, or C), and we set the following standards for products consumed by users: Bronze products have a maximum power greater than or equal to 100 and a minimum power greater than or equal to 50, Gold products have a maximum power greater than or equal to 1000 and a minimum power greater than or equal to 200, while Silver products fall between Bronze and Gold. The script has been tested on terabyte-sized data tables running on a 16-node cluster with 128 GB RAM on Azure Databricks, and the entire aggregation process for all Census geographic levels, including the time required to write the CSV output results to Azure Blob storage, was completed in approximately 45 minutes. 

In [0]:
#imports
import time
import pandas as pd 
import numpy as np
import os
from itertools import chain

from glob import glob
from pyspark.sql.types import *
import pyspark.sql.functions as f
import matplotlib.pyplot as plt

%matplotlib inline
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.precision', 2) 
pd.set_option('display.max_columns', None)
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

#product model with their corresponding code, in terms of quality A > B > C! 
product_model_dict = {4:'C', 2:'B', 1: 'A'}
#also define the geographic leves and their number of digits! 
subgeo_dict = {'Blocks':15, 'Block Groups':12, 'Tracts':11, 'Counties':5, 'States':2}
#specify the version of products dataset!
product_data_version = '211'
#specify the version of customers data!
customer_data_version = 'v2'
#All means the entire USA will be processed. To do only one state write state fips code like 11 for DC.
state_fips = 'All'

#if you did not have delta in hand and want to generate it for future faster runs below should be True!
#otherwise if you prefer to run it from csvs, make below False and make sure all CSV sources in section below are valid!
generateDelta = True

############Specify input and output directories within mounted drives
#path to all delta tables!
deltaPath = '/mnt/Delta'
#Specify the mounted folder in which you like your outputs to be stored in! 
outputCSV_location = '/mnt/output/%s'%product_data_version
baseDataCSV_location = "/mnt/base/baselines"
#specify the mounted folder in which the potnetial customers data are located
customerDataCSV_location = "/mnt/customer/%s"%customer_data_version
#specify the mounted folder in which your products data is located
productDataCSV_location = "/mnt/product/v%s"%product_data_version

In [0]:
def create_state_sdf():
    """
    This function creates a spark dataframe containing state names, abbreviation, and fips code!

    INPUTS:
    This function has no inputs as the inputs are defined inside it. 

    OUTPUTS: 
    a spark dataframe containing the state info!
    """
    # This includes US state names and fips code. Source: https://www.census.gov/library/reference/code-lists/ansi.html
    state_dict = {'01': 'Alabama', '02': 'Alaska', '04': 'Arizona', '05': 'Arkansas', '06': 'California', '08': 'Colorado', '09': 'Connecticut', '10': 'Delaware', 
                '11': 'District of Columbia', '12': 'Florida', '13': 'Georgia', '15': 'Hawaii', '16': 'Idaho', '17': 'Illinois', '18': 'Indiana', '19': 'Iowa', 
                '20': 'Kansas', '21': 'Kentucky', '22': 'Louisiana', '23': 'Maine', '24': 'Maryland', '25': 'Massachusetts', '26': 'Michigan', '27': 'Minnesota', 
                '28': 'Mississippi', '29': 'Missouri', '30': 'Montana', '31': 'Nebraska', '32': 'Nevada', '33': 'New Hampshire', '34': 'New Jersey', '35': 'New Mexico', 
                '36': 'New York', '37': 'North Carolina', '38': 'North Dakota', '39': 'Ohio', '40': 'Oklahoma', '41': 'Oregon', '42': 'Pennsylvania', '44': 'Rhode Island',
                '45': 'South Carolina', '46': 'South Dakota', '47': 'Tennessee', '48': 'Texas', '49': 'Utah', '50': 'Vermont', '51': 'Virginia', '53': 'Washington', 
                '54': 'West Virginia', '55': 'Wisconsin', '56': 'Wyoming', '60': 'American Samoa', '66': 'Guam', '69': 'Commonwealth of the Northern Mariana Islands', 
                '72': 'Puerto Rico', '78': 'United States Virgin Islands'}

    state_abbr_dict = {'01': 'AL', '02': 'AK', '04': 'AZ', '05': 'AR', '06': 'CA', '08': 'CO', '09': 'CT', '10': 'DE', 
                '11': 'DC', '12': 'FL', '13': 'GA', '15': 'HI', '16': 'ID', '17': 'IL', '18': 'IN', '19': 'IA', 
                '20': 'KS', '21': 'KY', '22': 'LA', '23': 'ME', '24': 'MD', '25': 'MA', '26': 'MI', '27': 'MN', 
                '28': 'MS', '29': 'MO', '30': 'MT', '31': 'NE', '32': 'NV', '33': 'NH', '34': 'NJ', '35': 'NM', 
                '36': 'NY', '37': 'NC', '38': 'ND', '39': 'OH', '40': 'OK', '41': 'OR', '42': 'PA', '44': 'RI',
                '45': 'SC', '46': 'SD', '47': 'TN', '48': 'TX', '49': 'UT', '50': 'VT', '51': 'VA', '53': 'WA', 
                '54': 'WV', '55': 'WI', '56': 'WY', '60': 'AS', '66': 'GU', '69': 'MP', 
                '72': 'PR', '78': 'VI'}
    state_sdf = spark.createDataFrame(
        pd.DataFrame({'STATEFP20': state_dict.keys(),
                    'state_name': state_dict.values(),
                    'state_abbr': state_abbr_dict.values()}))
    return state_sdf

In [0]:
def convert_sdf_to_delta(sdf, deltaTablePath):
    """
    This function converts the spark data table to delta table format to improve the performance! 

    INPUTS:
    sdf: The spark df to be converted to delta format!  
    delta_path: path to the delta table to be written!

    OUTPUTS: 
    a delta table equivalent of the spark df! 
    """
    # write table to delta 
    sdf.write.format('delta').save(deltaTablePath)
    return spark.read.load(deltaTablePath)

In [0]:
def read_base_data(subgeo, generateDelta = False):
    """
    This function reads the base information for a specific subgeography using data previously generated from 
    past version of business dataset and is static. Pre loading this base data significantly reduces the 
    computation time since most of these parameters are static and won't change in a long period of time.

    INPUTS:
    subgeo: the target subgeography from a list of keys in subgeo_dict! 
    generateDelta: if delta doesn't exists, this allows the user to generate it and take advantage of it!

    OUTPUTS: 
    a spark dataframe containing the base data for the target subgeography!
    """
    
    delta_location = deltaPath + "/Base/%sBaseData.delta"%subgeo
    csv_location = baseDataCSV_location + "/%sBaseData.csv"%subgeo
    try:
        base_sdf = spark.read.load(delta_location)
    except:  
        
        #read columns and customize schema 
        cols = spark.read.format('csv') \
                    .options(header = True, inferSchema = False) \
                    .option("sep", ",") \
                    .load(csv_location).columns

        #define data types!
        df_schema = StructType([StructField(c, StringType()) for c in cols])

        base_sdf = spark.read.format('csv') \
                        .schema(df_schema) \
                        .options(header = True, inferSchema = False) \
                        .option("sep", ",") \
                        .load(csv_location) 
        #now if generate request is in place: 
        if generateDelta:
            base_sdf = convert_sdf_to_delta(base_sdf, delta_location)
            
    return base_sdf

In [0]:
def read_customer_data(customer_data_version, state_fips = 'All', generateDelta = False):
    """
    This function reads the customer data geographic locations for the specified version of the table! 

    INPUTS:
    customer_data_version: The customers data version!  
    state_fips: this is to ensure processing can be narrowed down to a specific state for the testing phase. 
    Default is All means the entire USA but can be any fips code like 11, 51, etc.
    generateDelta: if delta doesnt exists, this allows the user to generate it and take advantage of it!
    
    OUTPUTS: 
    a spark dataframe containing the customers positions for the specified version 
    """
    ##AllProducts: all business product positions
    delta_location = deltaPath + '/customerLocations%s.delta'%customer_data_version    
    csv_location = customerDataCSV_location + "/%s.csv"%customer_data_version
    try:
        AllCustomers = spark.read.load(delta_location)
    except: 

        #read columns and customize schema 
        cols = spark.read.format('csv') \
                    .options(header = True, inferSchema = False) \
                    .option("sep", ",") \
                    .load(csv_location).columns

        #In data dictionary of this dataset the customer id is mentioned as integer.
        dtype_dict = {'customer_id': StringType(), 'latitude': FloatType(), 'longitude': FloatType()} 

        df_schema = StructType([StructField(c, dtype_dict[c]) if c in dtype_dict.keys() else StructField(
            c, StringType()) for c in cols]) 

        AllCustomers = spark.read.format('csv') \
                        .schema(df_schema) \
                        .options(header = True, inferSchema = False) \
                        .option("sep", ",") \
                        .load(csv_location) \
                        .filter(f.col('customer_flag') == 'True')  \
                        .withColumnRenamed('block_geoid', 'GEOID')
        #now if generate request is in place: 
        if generateDelta:
            AllProducts = convert_sdf_to_delta(AllCustomers, delta_location)
    #if one state is selected, then need to narrow down data to one state! 
    if state_fips != 'All':
        AllCustomers = AllCustomers.filter(f.substring(f.col('GEOID'), 1, 2) == str(state_fips))
    
    return AllCustomers.select('customer_id','GEOID') 

In [0]:
def read_products_data(product_data_version, state_fips = 'All', generateDelta = False):
    """
    This function reads business data for a specific version of the dataset and state. 

    INPUTS:
    product_data_version: The products data version! 
    state_fips: this is to ensure processing can be narrowed down to a specific state. 
    Default is All means the entire USA but can be any fips code like 11, 55, etc.
    generateDelta: if delta doesnt exists, this allows the user to generate it and take advantage of it!

    OUTPUTS: 
    a spark dataframe containing the product details for the desired version of dataset. 
    """
    ##### Load product detailed Data #####
    delta_location = deltaPath + '/product%s.delta'%product_data_version
    csv_location = productDataCSV_location + "/product_*.csv"

    try:
        productSDF = spark.read.load(delta_location)
    except: 
        #read columns and customize schema 
        cols = spark.read.format('csv') \
                    .options(header = True, inferSchema = False) \
                    .option("sep", ",") \
                    .load(csv_location).columns
        #product_quality: 1 means product has a warranty while 0 means it does not come with a warranty.
        dtype_dict= {'business_id': StringType(),'customer_id': StringType(), 'block_geoid': StringType(), 
                          'product_model': IntegerType(), 'product_max_power': IntegerType(),'product_min_power': IntegerType(), 
                          'product_quality': IntegerType()}

        df_schema = StructType([StructField(c, dtype_dict[c]) for  c in cols]) 

        productSDF = spark.read.format('csv') \
                    .schema(df_schema) \
                    .option("mode", "FAILFAST") \
                    .options(header = True, inferSchema = False) \
                    .option("quote", "\"") \
                    .option("escape", "\"") \
                    .load(csv_location) 
                    

        #now if generate request is in place: 
        if generateDelta:
            productSDF = convert_sdf_to_delta(productSDF, delta_location)
    #if one state is selected, then need to narrow down data to one state! 

    if state_fips != 'All':
        productSDF = productSDF.filter(f.substring(f.col('block_geoid'), 1, 2) == str(state_fips))
    return productSDF.select('customer_id', 'block_geoid', 'product_model', 'product_max_power',
                            'product_min_power', 'product_quality')

In [0]:
def process_subgeo(subgeo):
    """
    This function processes the product quality and availibility for a selected Census subgeography level!  

    INPUTS:
    subgeo: the target subgeography from a list of keys in subgeo_dict! 

    OUTPUTS: 
    a spark dataframe at the subgeo level containing the aggregated fields from products  
    data in addition to the baseline information such as socio economic factors like ACS poverty, income etc. 
    """
    start = time.time()  
    AllCustomers_subgeo = AllCustomers.withColumn('GEOID', f.substring(f.col('GEOID'), 1, subgeo_dict[subgeo])) \
                        .distinct() 
    #calculate aggregate the customers data to get the total bsls in each block! 
    TotalServices_subgeo = AllCustomers_subgeo.groupBy('GEOID').agg(f.count('customer_id').alias('TotalCustomers'))
    #create a list to store all the triples of sdf! TotalCustomers should be there already!
    sdfs = [[TotalServices_subgeo]]
    
    #now go through all the tech codes from model_dict and generate sdfs! 
    for product_model in ['All'] + list(product_model_dict.keys()):
        #build the A/B/C triple!
        quality_sdfs = quality_based_serving(TotalServices_subgeo, product_model)
        #add it to the list!
        sdfs.append(quality_sdfs)

    #flatten the list of lists! 
    sdfs = list(chain(*sdfs))
    #read in the base data for the subgeo!
    base_sdf = read_base_data(subgeo, generateDelta)
    #go in a loop and join all the sdfs 
    for sdf in sdfs:
        base_sdf = base_sdf.join(sdf, on = 'GEOID', how = 'left')
    #now write the output CSV to Azure blob storage!
    _ = generate_output(base_sdf, subgeo)
    
    end = time.time()  
    deltaT = end - start
    print('product data processing for %s was accomplished in %s minutes!'%(
        subgeo, round(deltaT/60, 1)))
    return deltaT

In [0]:
def quality_slicer(sdf, level = 'Bronze', model_code = 'All'):
    """
    This function prepares a quality filter (A/B/C) using any specific spark df!

    INPUTS:
    sdf: the spark df to slice!
    level: e.g. Gold! It can be Bronze, Silver, or Gold from the lowest to highest quality products. Default is Bronze. 
    model_code: the product model code! See the product_model_dict dictionary! The default is All meaning all models!

    OUTPUTS: 
    A logical selector that helps with slicing the products data. 
    """
    sdf = sdf if model_code == 'All' else sdf.filter(sdf['product_model']== model_code) 
    
    ####define quality level Logics By Min and Max Power###
    power_level_dict = {'Bronze':[100, 50], 'Gold':[1000, 200]}
    #here is the condition based on selected level to filter!
    return ((sdf['product_max_power'] >= power_level_dict[level][0]) & 
                    (sdf['product_min_power'] >= power_level_dict[level][1]) & 
                    (sdf['product_quality'] == 1) &
                    (sdf['product_model'].isin(list(product_model_dict.keys())) if model_code == 'All' else sdf[
                        'product_model'] == model_code))

In [0]:
def model_based_serving(AllCustomers_subgeo, model = 'All'):
    """
    This function prepares a triple of serving sdfs (Bronze/Silver/Gold) for each model (A/B/C). For instance, 
    GoldB, SilverB, and BronzeB are built when model = B. See product_model_dict for all models and their code! 

    INPUTS:
    AllBSLs_subgeo: a spark df containing all customers at the selected subgeography level!
    model: the product model code! See the tech_dict dictionary! Default is All models meaning all models!

    OUTPUTS: 
    three sdfs for the desired model (e.g. GoldB, SilverB, and BronzeB)! 
    """
    #look up the model name for use in attribute name! 
    model_name = '' if model == 'All' else product_model_dict[model]
    ## BronzeCustomers: customers with Bronze products! 
    BronzeCustomers = AllCustomers_subgeo.join(AllCustomers.filter(quality_slicer(AllCustomers, 'Bronze', model_code)),
                                how = 'left_anti',
                                on = 'customer_id') \
                                .groupBy('GEOID') \
                                .agg(f.count('customer_id').alias('BronzeCustomers%s'%model_name))
    ## SilverCustomers: customers with silver products! 
    SilverCustomers = AllCustomers_subgeo.join(AllCustomers.filter(quality_slicer(AllCustomers, 'Bronze', model_code)),
                                how = 'left_semi',
                                on = 'customer_id') \
                                    .join(AllCustomers.filter(quality_slicer(AllCustomers, 'Gold', model_code)),
                                        how = 'left_anti',
                                        on = 'customer_id') \
                                    .groupBy('GEOID') \
                                    .agg(f.count('customer_id').alias('SilverCustomers%s'%tech_name))
    ## GoldCustomers
    GoldCustomers = AllCustomers_subgeo.join(AllCustomers.filter(quality_slicer(AllCustomers, 'Gold', tech_code)),
                                how = 'left_semi',
                                on = 'customer_id') \
                                .groupBy('GEOID') \
                                .agg(f.count('customer_id').alias('GoldCustomers%s'%tech_name))
    return [BronzeCustomers, SilverCustomers, GoldCustomers]   

In [0]:
def generate_output(sdf, subgeo, writeDelta =False, writeCSV = True, splitByState = False):
    """
    This function prepares a serving filter (BronzeCustomers, SilverCustomers, GoldCustomers) using any specific spark df!

    INPUTS:
    sdf: spark df to write as CSV!
    subgeo: the target subgeography from a list of keys in subgeo_dict! 
    writeCSV: if false it writes it as Delta tables!
    OUTPUTS: 
    returns the location of the saved CSV file for the selected subgeography!
    """
    file_location_national = outputCSV_location + '/CustomerProduct%s%s%s.csv'%(product_data_version, subgeo, state_fips)
    file_location_bystate = file_location_national[:-4] + '_ByState.csv'
    #get a list of integer count columns! 
    customer_cols = [c for c in sdf.columns if 'Customer' in c]
    #fill null values with zero and get ready for saving! 
    sdf_final = sdf.fillna(0, subset = bsl_cols).coalesce(1)

    if writeCSV:
        #now save the national file!
        sdf_final.write.mode("overwrite") \
                                .option("header", True) \
                                .option("delimiter", ",") \
                                .csv(file_location_national)
    if writeDelta:
        file_location_national = deltaPath + '/%s/CustomerProduct%s%s%s.delta'%(product_data_version, product_data_version,
                                                                                subgeo, state_fips)
        _ = convert_sdf_to_delta(sdf_final, file_location_national)

     
    if splitByState:
        #and save a parititioned by state version of it!
        sdf_final.write \
                    .option("header", True) \
                    .option("delimiter", ",") \
                    .partitionBy("StateName") \
                    .mode("overwrite") \
                    .csv(file_location_bystate)
    return file_location_national

In [None]:
if __name__ == '__main__':
    print('----Process Started----')
    start = time.time()
    #define below parameter to keep track of total processing time! 
    totalT = 0  
    AllCustomers = read_customer_data(customer_data_version, state_fips, generateDelta).cache()
    print('customer locations data version %s data was read as a spark dataframe!'%customer_data_version)
    productSDF = read_products_data(product_data_version, state_fips,  generateDelta).cache()
    print('products data version %s data was read as a spark dataframe!'%product_data_version)

    #loop through all subgeos and generate results!
    for subgeo in subgeo_dict.keys():
        
        print('Data processing for %s started!'%subgeo)
        totalT += process_subgeo(subgeo)

    end = time.time()  
    deltaT = end - start
    print('----Process Ended----')
    print('----The sub-geography processing for products %s was %s minutes!'%(
        product_data_version, round(totalT/60, 1)))
    print('----The entire products %s process was accomplished in %s minutes!'%(
        product_data_version, round(deltaT/60, 1)))