<a href="https://colab.research.google.com/github/dave-killough/databricks-colab/blob/main/Databricks_Parallel_Charting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Databricks Pipeline in Colab
This notebook is a local development environment for code that will also run in Databricks.  The table of contents on the left can be used as an index for each step in the pipeline.  


# Setup

In [None]:
%pip install pyspark==3.5.0

# EO 990 Ingest CSV

In [None]:
# EO 990 Ingest CSV
import requests
import logging
import os

if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
    dbfs = "/dbfs"
    folder_out = "/mnt/eo990pipeline"
else: # local setup - no cluster charges!!
    dbfs = ""
    folder_out = "."

logging.basicConfig(level=logging.INFO) # Initialize logging
logger = logging.getLogger("EO-990-Ingest-Master")

def ingest(url, filename=None):
    if filename is None:
        filename = url.split("/")[-1]  # Extract filename from URL
    response = requests.get(url) # Download CSV file
    if response.status_code == 200:
        with open(f"{dbfs}{folder_out}/{filename}", "wb") as file:
            file.write(response.content)
        logger.info(f"Successfully downloaded {url}")
    else:
        logger.error(f"Failed to download {url}")

eo_urls = [
    "https://www.irs.gov/pub/irs-soi/eo1.csv",
    "https://www.irs.gov/pub/irs-soi/eo2.csv",
    "https://www.irs.gov/pub/irs-soi/eo3.csv",
    "https://www.irs.gov/pub/irs-soi/eo4.csv"
]
for url in eo_urls:
    ingest(url)
bucket = "https://storage.googleapis.com/benevolentmachines"
ingest(f"{bucket}/e990_extract.csv", "eo990extract.csv")
ingest(f"{bucket}/gcst.csv")
# end

# EO 990 Prepare E990

In [None]:
# EO 990 Prepare E990
from pyspark.sql import SparkSession
import os

appName = "eo990-prepare-e990"
spark = SparkSession.builder.appName(appName).getOrCreate()
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    folder_in = "/mnt/eo990pipeline"
else: # local spark - no cluster charges!!
    folder_in = "."

from pyspark.sql.types import \
    StructType, StructField, StringType, IntegerType, LongType
schema = StructType([
    StructField("BLOBI", StringType(), True),
    StructField("EIN", StringType(), True),
    StructField("ReturnTypeCd", StringType(), True),
    StructField("TaxPeriodEndDt", StringType(), True),
    StructField("BusinessName", StringType(), True),
    StructField("BusinessStreet", StringType(), True),
    StructField("CityNm", StringType(), True),
    StructField("StateAbbreviationCd", StringType(), True),
    StructField("ZIPCd", StringType(), True),
    StructField("WebsiteAddressTxt", StringType(), True),
    StructField("TotalEmployeeCnt", IntegerType(), True),
    StructField("TotalVolunteersCnt", IntegerType(), True),
    StructField("GrossReceiptsAmt", LongType(), True),
    StructField("PYContributionsGrantsAmt", LongType(), True),
    StructField("CYContributionsGrantsAmt", LongType(), True),
    StructField("PYProgramServiceRevenueAmt", LongType(), True),
    StructField("CYProgramServiceRevenueAmt", LongType(), True),
    StructField("PYInvestmentIncomeAmt", LongType(), True),
    StructField("CYInvestmentIncomeAmt", LongType(), True),
    StructField("PYOtherRevenueAmt", LongType(), True),
    StructField("CYOtherRevenueAmt", LongType(), True),
    StructField("PYTotalRevenueAmt", LongType(), True),
    StructField("CYTotalRevenueAmt", LongType(), True),
    StructField("PYGrantsAndSimilarPaidAmt", LongType(), True),
    StructField("CYGrantsAndSimilarPaidAmt", LongType(), True),
    StructField("PYBenefitsPaidToMembersAmt", LongType(), True),
    StructField("CYBenefitsPaidToMembersAmt", LongType(), True),
    StructField("PYSalariesCompEmpBnftPaidAmt", LongType(), True),
    StructField("CYSalariesCompEmpBnftPaidAmt", LongType(), True),
    StructField("PYTotalProfFndrsngExpnsAmt", LongType(), True),
    StructField("CYTotalProfFndrsngExpnsAmt", LongType(), True),
    StructField("CYTotalFundraisingExpenseAmt", LongType(), True),
    StructField("PYOtherExpensesAmt", LongType(), True),
    StructField("CYOtherExpensesAmt", LongType(), True),
    StructField("PYTotalExpensesAmt", LongType(), True),
    StructField("CYTotalExpensesAmt", LongType(), True),
    StructField("PYRevenuesLessExpensesAmt", LongType(), True),
    StructField("CYRevenuesLessExpensesAmt", LongType(), True),
    StructField("TotalAssetsBOYAmt", LongType(), True),
    StructField("TotalAssetsEOYAmt", LongType(), True),
    StructField("TotalLiabilitiesBOYAmt", LongType(), True),
    StructField("TotalLiabilitiesEOYAmt", LongType(), True),
    StructField("NetAssetsOrFundBalancesBOYAmt", LongType(), True),
    StructField("NetAssetsOrFundBalancesEOYAmt", LongType(), True),
    StructField("ActivityOrMissionDesc", StringType(), True)
])

e990_df = spark.read.format("csv") \
    .option("delimiter", "|") \
    .option("header", "true") \
    .schema(schema) \
    .load(f"{folder_in}/eo990extract.csv")

spark.sql("DROP TABLE IF EXISTS e990") # make repeatable
e990_df.write.saveAsTable('e990')
# end

# EO 990 Prepare EOMF

In [None]:
# EO 990 Prepare EOMF
from pyspark.sql import SparkSession
import os

appName = "eo990-prepare-eomf"
spark = SparkSession.builder.appName(appName).getOrCreate()
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    folder_in = "/mnt/eo990pipeline"
else: # local spark - no cluster charges!!
    folder_in = "."

from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
    StructField("EIN", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("ICO", StringType(), True),
    StructField("STREET", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("ZIP", StringType(), True),
    StructField("RULING", StringType(), True),
    StructField("TAX_PERIOD", StringType(), True),
    StructField("GROUP", StringType(), True),
    StructField("SUBSECTION", StringType(), True),
    StructField("AFFILIATION", StringType(), True),
    StructField("CLASSIFICATION", StringType(), True),
    StructField("DEDUCTIBILITY", StringType(), True),
    StructField("FOUNDATION", StringType(), True),
    StructField("ACTIVITY", StringType(), True),
    StructField("ORGANIZATION", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("ASSET_CD", StringType(), True),
    StructField("INCOME_CD", StringType(), True),
    StructField("FILING_REQ_CD", StringType(), True),
    StructField("PF_FILING_REQ_CD", StringType(), True),
    StructField("ACCT_PD", StringType(), True),
    StructField("ASSET_AMT", LongType(), True),
    StructField("INCOME_AMT", LongType(), True),
    StructField("REVENUE_AMT", LongType(), True),
    StructField("NTEE_CD", StringType(), True),
    StructField("SORT_NAME", StringType(), True)
])

eomf_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(f"{folder_in}/eo[1234].csv") # Concatenate input files - yes!

from pyspark.sql.functions import col, substring
eomf_df = eomf_df.fillna({'NTEE_CD': 'Z99'})
eomf_df = eomf_df.withColumn("NTEE3", substring(col("NTEE_CD"), 1, 3))
eomf_df = eomf_df.withColumnRenamed("GROUP", "GROUP_NUM") # GROUP is reserved

spark.sql("DROP TABLE IF EXISTS eomf")
eomf_df.write.saveAsTable('eomf')
# end

# EO 990 Prepare GCST

In [None]:
# EO 990 Prepare GCST
from pyspark.sql import SparkSession
import os

appName = "eo990-prepare-gcst"
spark = SparkSession.builder.appName(appName).getOrCreate()
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    folder_in = "/mnt/eo990pipeline"
else: # local spark - no cluster charges!!
    folder_in = "."

from pyspark.sql.types import \
    StructType, StructField, StringType, IntegerType, LongType
schema = StructType([
    StructField("code", StringType(), True),
    StructField("desc", StringType(), True),
])
gcst_df = spark.read.format("csv") \
    .option("delimiter", "|") \
    .option("header", "true") \
    .schema(schema) \
    .load(f"{folder_in}/gcst.csv")
gcst_df = gcst_df.withColumnRenamed("desc", "SECTOR") # desc is reserved in SQL

spark.sql("DROP TABLE IF EXISTS gcst")
gcst_df.write.saveAsTable('gcst')

# EO 990 Transform

In [None]:
# EO 990 Transform
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("eo990-transform").getOrCreate()
eo990_df = spark.sql("""
SELECT
    eomf.EIN, eomf.NAME, eomf.ICO, eomf.STREET, eomf.CITY, eomf.STATE, eomf.ZIP,
    eomf.RULING, eomf.TAX_PERIOD, eomf.GROUP_NUM, eomf.SUBSECTION,
    eomf.AFFILIATION, eomf.CLASSIFICATION, eomf.DEDUCTIBILITY, eomf.FOUNDATION,
    eomf.ACTIVITY, eomf.ORGANIZATION, eomf.STATUS, eomf.ASSET_CD,
    eomf.INCOME_CD, eomf.FILING_REQ_CD, eomf.PF_FILING_REQ_CD, eomf.ACCT_PD,
    eomf.ASSET_AMT, eomf.INCOME_AMT, eomf.REVENUE_AMT, eomf.NTEE_CD,
    eomf.SORT_NAME, eomf.NTEE3, gcst.SECTOR,
    e990.BLOBI, e990.ReturnTypeCd, e990.TaxPeriodEndDt, e990.BusinessName,
    e990.BusinessStreet, e990.CityNm, e990.StateAbbreviationCd, e990.ZIPCd,
    e990.WebsiteAddressTxt, e990.TotalEmployeeCnt, e990.TotalVolunteersCnt,
    e990.GrossReceiptsAmt,
    e990.PYContributionsGrantsAmt, e990.CYContributionsGrantsAmt,
    e990.PYProgramServiceRevenueAmt, e990.CYProgramServiceRevenueAmt,
    e990.PYInvestmentIncomeAmt, e990.CYInvestmentIncomeAmt,
    e990.PYOtherRevenueAmt, e990.CYOtherRevenueAmt,
    e990.PYTotalRevenueAmt, e990.CYTotalRevenueAmt,
    e990.PYGrantsAndSimilarPaidAmt, e990.CYGrantsAndSimilarPaidAmt,
    e990.PYBenefitsPaidToMembersAmt, e990.CYBenefitsPaidToMembersAmt,
    e990.PYSalariesCompEmpBnftPaidAmt, e990.CYSalariesCompEmpBnftPaidAmt,
    e990.PYTotalProfFndrsngExpnsAmt, e990.CYTotalProfFndrsngExpnsAmt,
    e990.CYTotalFundraisingExpenseAmt,
    e990.PYOtherExpensesAmt, e990.CYOtherExpensesAmt,
    e990.PYTotalExpensesAmt, e990.CYTotalExpensesAmt,
    e990.PYRevenuesLessExpensesAmt, e990.CYRevenuesLessExpensesAmt,
    e990.TotalAssetsBOYAmt, e990.TotalAssetsEOYAmt,
    e990.TotalLiabilitiesBOYAmt, e990.TotalLiabilitiesEOYAmt,
    e990.NetAssetsOrFundBalancesBOYAmt, e990.NetAssetsOrFundBalancesEOYAmt,
    e990.ActivityOrMissionDesc
FROM eomf
INNER JOIN e990 ON eomf.EIN = e990.EIN
LEFT JOIN gcst ON eomf.NTEE3 = gcst.code
""")
spark.sql("DROP TABLE IF EXISTS eo990")
eo990_df.write.saveAsTable('eo990')

# EO 990 Learn

In [None]:
# EO 990 Learn
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.sql.functions import log, col
from pyspark.sql.functions import col, lit
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Launch the awesome Spark Engine!!

spark = SparkSession.builder \
    .appName("eo990-cluster-simple") \
    .config("spark.databricks.photon.enabled", "false") \
    .getOrCreate()

# The objective is a Kmeans clustering of public US tax exempt organizations
# based the employee count, volunteer count, and contribution/grant amount
# recorded in their most recent 990 filing. We start by selecting the data:

df = spark.sql("""
    SELECT eo990.EIN, eo990.CYContributionsGrantsAmt,
           eo990.TotalEmployeeCnt, eo990.TotalVolunteersCnt
    FROM eo990
    WHERE TotalVolunteersCnt < 1000000
    AND TotalEmployeeCnt > 0
    AND TotalVolunteersCnt > 0
    AND CYContributionsGrantsAmt > 0
""")

# All of these features exhibit a long-tailed distribution of values; the
# larger the values, the less frequently they occur. As with most machine
# learning tasks, a more uniform distribution is needed for the algorithm
# to function effectively. A log transformation is applied to acheive this

df_transformed = df.withColumn(
    "log_CYContributionsGrantsAmt", log(col("CYContributionsGrantsAmt") + 1))
df_transformed = df_transformed.withColumn(
    "log_TotalEmployeeCnt", log(col("TotalEmployeeCnt") + 1))
df_transformed = df_transformed.withColumn(
    "log_TotalVolunteersCnt", log(col("TotalVolunteersCnt") + 1))

# Null values and outliers are removed:

df_transformed = df_transformed.dropna()
bounds = {
    c: dict(
        zip(["q1", "q3"], df_transformed.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ["log_CYContributionsGrantsAmt",
              "log_TotalEmployeeCnt", "log_TotalVolunteersCnt"]
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower_bound'] = bounds[c]['q1'] - (1.5 * iqr)
    bounds[c]['upper_bound'] = bounds[c]['q3'] + (1.5 * iqr)
for c in bounds:
    df_transformed = df_transformed.filter(
        (col(c) >= bounds[c]['lower_bound']) &
        (col(c) <= bounds[c]['upper_bound'])
    )

# A pipeline assembling the features into a vector is contructed with
# a standard scaler.

assembler = VectorAssembler(
    inputCols=["log_CYContributionsGrantsAmt",
               "log_TotalEmployeeCnt", "log_TotalVolunteersCnt"],
    outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
pipeline = Pipeline(stages=[assembler, scaler])
pipelineModel = pipeline.fit(df_transformed)
df_kmeans_ready = pipelineModel.transform(df_transformed)

# A KMeans function is setup and invoked, and the counts for each K value
# are displayed

def apply_kmeans(df, k):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("scaledFeatures")
    model = kmeans.fit(df)
    return model.transform(df)
sample = df_kmeans_ready.sample(withReplacement=False, fraction=1.0, seed=1)
eo990clustering_df = apply_kmeans(sample, 4)

spark.sql("DROP TABLE IF EXISTS eo990clustering")
eo990clustering_df.write.saveAsTable('eo990clustering')

# EO 990 Hydrate Clustering

In [None]:
from pyspark.sql import SparkSession
import os
import plotly.express as px

appName = "eo990-hydrate"
spark = SparkSession.builder.appName(appName).getOrCreate()

if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
    dbfs = "/dbfs"
    folder_out = "/mnt/eo990pipeline"
else: # local setup - no cluster charges!!
    dbfs = ""
    folder_out = "."

hydrate_df = spark.sql("""
    SELECT CYContributionsGrantsAmt AS ContributionsGrants,
         TotalEmployeeCnt AS Employees,
         TotalVolunteersCnt AS Volunteers,
         CAST(prediction AS STRING) AS Cluster
    FROM eo990clustering
""")
subset_df = hydrate_df.sample(
    withReplacement=False, fraction=0.5, seed=1)
pdf = subset_df.toPandas()
color_map = {
    '0': '#1ac958',  # green
    '1': '#db5d51',  # red
    '2': '#fac828',  # gold
    '3': '#5385f4',  # blue
}
fig = px.scatter_3d(
    pdf,
    x='ContributionsGrants',
    y='Employees',
    z='Volunteers',
    color='Cluster',color_discrete_map=color_map,
    title='Exempt Organization Kmeans Clustering K=4')
fig.write_html(f'{dbfs}{folder_out}/clustering.html', include_plotlyjs='cdn')

# and that ends the pipeline!

fig.show() # can be commented out in production

# EO 990 Hydrate Parallel Charting

## Charting Functions

In [None]:
import numpy as np
import pandas as pd
import plotly.express as px
#from IPython.core.display import HTML

def treemap_replaces(s): # restyle plotly output
    s = s.replace('<body>', '<body style="margin:0; overflow="hidden">')
    s = s.replace('</head>',
        '<style>.svg-container { overflow: hidden; }</style></head>')
    return s

def giving_treemap_nonprofits(df): # called by parallel process

    ntee3 = df['NTEE3'].iloc[0]
    sector_desc = df['SECTOR'].iloc[0]

    # create groupings by contribution amount
    cy_amt_col = 'CYContributionsGrantsAmt'
    df["level"] = pd.cut(df[cy_amt_col],
        [-np.inf, 999_999.999, 2_499_999.999, 9_999_999.999,
               49_999_999.999, np.inf],
        labels=[
            '<b>Under $1M</b>', '<b>$1M-<$2.5M</b>', '<b>$2.5M-<$10M</b>',
            '<b>$10M-<$50M</b>','<b>$50M and Up</b>']
    )

    # limit to the largest organizations
    threshold = 800
    if df.shape[0] < threshold:
        cutoff = 0
    elif df[df[cy_amt_col] >=  1_000_000].shape[0] < threshold:
        cutoff =  1_000_000
    elif df[df[cy_amt_col] >=  2_500_000].shape[0] < threshold:
        cutoff =  2_500_000
    elif df[df[cy_amt_col] >= 10_000_000].shape[0] < threshold:
        cutoff = 10_000_000
    elif df[df[cy_amt_col] >= 50_000_000].shape[0] < threshold:
        cutoff = 50_000_000
    else:
        cutoff = df.at[threshold-1,cy_amt_col]
    df["nonprofit"] = np.where(df[cy_amt_col] >= cutoff,df["EIN"], "Total")

    # regroup to collapse smaller organizations into one box
    adf2 = df.groupby(['level','nonprofit']).agg(
        py_amount=('PYContributionsGrantsAmt', 'sum'),
        cy_amount=('CYContributionsGrantsAmt', 'sum'),
        count=('EIN', 'count'),
    )

    # calculate contribution growth
    adf2 = adf2[adf2["cy_amount"] > 0].copy()
    adf2['py_amount'] = adf2['py_amount'].round()
    adf2['cy_amount'] = adf2['cy_amount'].round()
    adf2["growth"] = np.where(adf2["py_amount"] == 0,0,
        ( adf2["cy_amount"] - adf2["py_amount"]) * 100.0
        / adf2["py_amount"])

    # cap growth to 100% for color balance
    adf2["growth"] = np.where(
        adf2["growth"] > 100, 100, adf2["growth"]).astype(float)

    adf2 = adf2.reset_index()

    adf2 = adf2.merge(
        df[['EIN','BusinessName', 'CITY', 'STATE']],
        left_on='nonprofit', right_on='EIN', how='left')
    adf2['BusinessName'].fillna('', inplace=True)
    adf2['CITY'].fillna('', inplace=True)
    adf2['STATE'].fillna('', inplace=True)
    adf2['EIN'].fillna('Total', inplace=True)
    adf2["link"] = np.where(adf2["nonprofit"] == "Total","","🔗")

    # chart it in plotly!
    sector_desc = f'{sector_desc} ({ntee3})'
    fig = px.treemap(
        adf2,
        path=[px.Constant(sector_desc), 'level', 'EIN'],
        values='cy_amount',
        color='growth',
        color_continuous_scale=['red','white','green'],
        range_color=[-100, 100],
        color_continuous_midpoint=0,
        custom_data=['BusinessName', 'CITY', 'STATE', 'link']
    )

    fig.update_layout(margin = dict(t=0, l=0, r=0, b=0))
    fig.update_traces(marker_line_width = 0, tiling_pad = 1)
    labels = list(fig.data[0].labels)

    fig.data[0].hovertemplate = """\
%{customdata[0]}<br>$%{value:,.0f}}
"""
    fig.data[0].texttemplate = """\
%{customdata[0]}<br>$%{value:,.0f}<br><a
href="https://projects.propublica.org/nonprofits/organizations/%{label}"
style="cursor: help; color: blue;" rel="noopener noreferrer"
>%{customdata[3]}</a>"""

    return fig

## Serial Testing

In [None]:
from pyspark.sql import SparkSession

appName = "eo990-hydrate-parallel-charting-testing"
spark = SparkSession.builder.appName(appName).getOrCreate()

pandas_df = spark.sql("""
    SELECT EIN, NTEE3, ntee1.SECTOR AS SECTOR1, eo990.SECTOR,
        ReturnTypeCd, TaxPeriodEndDt, BusinessName, CITY, STATE,
        PYContributionsGrantsAmt, CYContributionsGrantsAmt,
        PYGrantsAndSimilarPaidAmt, CYGrantsAndSimilarPaidAmt,
        PYTotalProfFndrsngExpnsAmt, CYTotalProfFndrsngExpnsAmt,
        CYTotalFundraisingExpenseAmt,
        TotalEmployeeCnt, TotalVolunteersCnt
    FROM eo990
    JOIN gcst AS ntee1 ON LEFT(eo990.NTEE3,1) = ntee1.code
    JOIN gcst AS ntee3 ON LEFT(eo990.NTEE3,3) = ntee3.code
    WHERE CYContributionsGrantsAmt > 0
    AND NTEE3 = 'K31' -- just one for testing
""").toPandas()

giving_treemap_nonprofits(pandas_df).show()

## Parallel Processing

In [None]:
import os
import shutil
from pyspark.sql import SparkSession

if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
    dbfs = "/dbfs"
    folder_out = "/mnt/eo990pipeline"
else: # local setup - no cluster charges!!
    dbfs = ""
    folder_out = "."

if os.path.isdir(f"{dbfs}{folder_out}/draft"):
    shutil.rmtree(f"{dbfs}{folder_out}/draft")
os.makedirs(f"{dbfs}{folder_out}/draft")

appName = "eo990-hydrate-parallel-charting"
spark = SparkSession.builder.appName(appName).getOrCreate()

chart_df = spark.sql("""
    SELECT EIN, NTEE3, ntee1.SECTOR AS SECTOR1, eo990.SECTOR,
        ReturnTypeCd, TaxPeriodEndDt, BusinessName, CITY, STATE,
        PYContributionsGrantsAmt, CYContributionsGrantsAmt,
        PYGrantsAndSimilarPaidAmt, CYGrantsAndSimilarPaidAmt,
        PYTotalProfFndrsngExpnsAmt, CYTotalProfFndrsngExpnsAmt,
        CYTotalFundraisingExpenseAmt,
        TotalEmployeeCnt, TotalVolunteersCnt
    FROM eo990
    JOIN gcst AS ntee1 ON LEFT(eo990.NTEE3,1) = ntee1.code
    JOIN gcst AS ntee3 ON LEFT(eo990.NTEE3,3) = ntee3.code
    WHERE CYContributionsGrantsAmt > 0
    --AND NTEE3 IN ('K31','B43')
""")

df = chart_df.repartition('NTEE3')
rdd = chart_df.rdd.keyBy(lambda row: row.NTEE3)
grouped_rdd = rdd.groupByKey()
print("Number of NTEE3: ", grouped_rdd.count())
#print("List of NTEE3s: ", sorted(grouped_rdd.keys().collect()))
COLUMNS = spark.sparkContext.broadcast(chart_df.columns)

def process_group(iterator):
    pandas_df = pd.DataFrame(list(iterator), columns=COLUMNS.value)
    ntee3 = pandas_df['NTEE3'].iloc[0]

    fig = giving_treemap_nonprofits(pandas_df) # create chart figure
    path = f'{dbfs}{folder_out}/draft/sector_treemap_{ntee3}.html'
    with open(path, 'w', encoding='utf_8') as f:
        #f.write("testing")
        src = fig.to_html(full_html=True, include_plotlyjs='cdn')
        f.write(treemap_replaces(src))

    return path

result_rdd = grouped_rdd.mapValues(process_group)
collected_results = result_rdd.collect()
#for ntee3, path in collected_results:
#    print(f"{ntee3}: {path}")

# Bottom