In [None]:
from dataclasses import dataclass
from pathlib import Path
import pandas as pd


In [168]:
answer_key = pd.read_excel(
    Path.cwd() / "conf" / "nvi_answer_key.xlsx", 
    dtype={"indicator_id": pd.Int64Dtype()}
)

recoded = pd.read_csv(
    Path.cwd() / "output" / "nvi_2024_analysis_source.csv", low_memory=False
)


In [220]:
@dataclass
class DocumentedDataFrame:
    df: pd.DataFrame
    dictionary: pd.DataFrame

    def __getattr__(self, name):
        return getattr(self.df, name)


In [None]:
@dataclass
class Survey:
    # The survey data is recoded and geocoded with the geographies of interest included
    survey_data: pd.DataFrame

    # The answer key is an answer-level table, so multiple rows for each question
    # unless that question is a numeric response or a free-text field
    answer_key: pd.DataFrame

    @property
    def questions(self):
        return self.answer_key[["group", "question"]].drop_duplicates()

    def top_of_q(self, question, question_group=None):
        try:
            question_meta = self.answer_key[
                (self.answer_key["question"] == question)
                & (self.answer_key["group"] == question_group)
            ].iloc[0]

            return question_meta

        except IndexError:
            raise IndexError(f"No rows with question '{question}' and question group '{question_group}' found in the answer key.")

    def make_renamer(self, question, question_group, ignore=list()):
        column_rename = {}

        for _, row in self.answer_key[
            (self.answer_key["group"] == question_group)
            & (self.answer_key["question"] == question)
        ].iterrows():
            if row["survey_code"] not in ignore:
                column_rename[row["survey_code"]] = row["answer"]
            else:
                column_rename[row["survey_code"]] = row["survey_code"]

        return lambda v: column_rename.get(v, "Not answered")
    
    def make_col_to_answer_renamer(self, question_group, question, ignore=list()):
        answers = self.answer_key[
            (self.answer_key["group"] == question_group)
            & (self.answer_key["question"] == question)
        ]

        if len(answers) == 0:
            raise KeyError(
                f"No rows for renamer columns found for group '{question_group}, and question '{question}'"
            )

        mapper = {
            row["full_column"]: row["answer"]
            for _, row in answers.iterrows()
        }

        for ignorable in ignore:
            mapper[ignorable] = ignorable

        return lambda v: mapper.get(v, "No alias for this column")

    def tabulate_question(self, question, question_group=None, group_var=None):
        """
        This will look up the question type in the answer key and decide the best
        strategy for aggregation. It also has ignorable group var if you just
        want to show all the responses on a single line.
        """

        question_meta = self.top_of_q(question, question_group)

        if question_meta["response_type"] in ("SINGLE", "GROUPED-SINGLE"):
            return self.tabulate_single_question(group_var, question, question_group)

        if question_meta["response_type"] == "MULTI-SELECT":
            return self.tabulate_multiselect(group_var, question, question_group)
        
        if question_meta["response_type"] == "GRID-OF-DEATH":
            return self.tabulate_grid_of_death()
        
        raise ValueError(f"{question_meta["response_type"]} not a valid response-type for tabulation.")

    def tabulate_single_question(self, group_var, question, question_group=None):
        if not question_group:
            question_group = question

        question_meta = self.top_of_q(question, question_group)
        index_renamer = self.make_renamer(group_var, group_var) # Right now only single-selects can be index renamers
        column_renamer = self.make_renamer(question, question_group)

        return (
            self.survey_data[[group_var, question_meta["full_column"]]]
            .astype({
                group_var: pd.Int64Dtype(),
                question_meta["full_column"]: pd.Int64Dtype(),
            })
            .value_counts(dropna=False)
            .reset_index()
            .pivot(
                columns=question_meta["full_column"],
                index=group_var,
                values="count"
            )
            .rename(
                columns=column_renamer, 
                index=index_renamer
            )
            .assign(**{"Total Responses": lambda df: df.sum(axis=1)})
            .fillna(0)
        )

    def tabulate_multiselect(self, group_var, question_group):
        questions = self.answer_key[
            self.answer_key["group"] == question_group
        ]

        index_rename = self.make_renamer(group_var, group_var)

        aggregations = []
        for _, question in questions.iterrows():
            try:
                aggregations.append(
                    self.survey_data
                    .groupby(group_var, dropna=False)[question["full_column"]]
                    .count()
                    .rename(question["question"])
                )
            except KeyError:
                print(f"{question["full_column"]} missing")

        aggregations.append(
            self.survey_data
            .groupby(group_var, dropna=False)
            .size()
            .rename("Total Responses")
        )

        return (
            pd.concat(aggregations, axis=1)
            .rename(index_rename)
        )

    def tabulate_grid_of_death(self, group_var, question_group):
        dig_eq_rows = answer_key[answer_key["group"] == question_group]

        index_renamer = self.make_renamer(group_var, group_var)

        frames = []
        for q, q_group in dig_eq_rows.groupby("question"):

            column_renamer = self.make_col_to_answer_renamer(
                "DigitalEquity_Sources_Information", q, ignore=["Question", "Total Responses"]
            )

            rolled = (
                self.survey_data[[group_var] + list(q_group["full_column"])]
                .groupby(group_var)
                .count()
            )

            rolled["Total Responses"] = self.survey_data.groupby(group_var).size()
            rolled.insert(0, "Question", q)

            frames.append(rolled.rename(index=index_renamer, columns=column_renamer))

        return pd.concat(frames)


In [217]:
survey = Survey(recoded, answer_key)

In [228]:
table

Age,Not answered,18-24 years old,25-34 years old,35-44 years old,45-54 years old,55-64 years old,65 years or older,Prefer not to answer,Total Responses
Household_Annual_Income_Before_Taxes,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
Not answered,22.0,2.0,2.0,3.0,4.0,7.0,17.0,2.0,59.0
"Less than $10,000",0.0,73.0,127.0,127.0,101.0,84.0,69.0,5.0,586.0
"$10,000 to $14,999",0.0,17.0,34.0,41.0,51.0,54.0,78.0,4.0,279.0
"$15,000 to $19,999",0.0,7.0,23.0,23.0,30.0,41.0,53.0,2.0,179.0
"$20,000 to $24,999",0.0,4.0,26.0,24.0,36.0,41.0,48.0,0.0,179.0
"$25,000 to $29,999",2.0,3.0,13.0,30.0,25.0,25.0,47.0,0.0,145.0
"$30,000 to $34,999",0.0,3.0,15.0,12.0,26.0,24.0,43.0,0.0,123.0
"$35,000 to $39,999",0.0,22.0,69.0,62.0,71.0,56.0,68.0,7.0,355.0
"$40,000 to $44,999",0.0,6.0,46.0,48.0,34.0,35.0,63.0,2.0,234.0
"$45,000 to $49,999",0.0,5.0,19.0,33.0,24.0,33.0,49.0,1.0,164.0


In [None]:
answer_groups = answer_key[
    answer_key["included_in_arpa"]
][[
    "group", 
    "question", 
    "answer", 
    "response_type"
]].groupby(["group", "response_type"])

tables = []
for (group, response_type), group_rows in answer_groups:
    print(group, response_type)
    if response_type == "GROUPED-SINGLE":
        question_groups = group_rows.groupby("question")
        for q, question_rows in question_groups:
            print("    ", q)

            citywide = survey.tabulate_single_question("citywide", q, group)
            districts = survey.tabulate_single_question("district", q, group)
            zones = survey.tabulate_single_question("zone", q, group)

            all_geos = pd.concat([citywide, districts, zones])

            tables.append((q, all_geos))

    elif response_type == "MULTI-SELECT":
        print("    ", "\n     ".join(group_rows["answer"]))

        citywide = survey.tabulate_multiselect("citywide", group)
        district = survey.tabulate_multiselect("district", group)
        zone = survey.tabulate_multiselect("zone", group)

        all_geos = pd.concat([citywide, districts, zones])

        tables.append((group, all_geos))

    elif response_type == "GRID-OF-DEATH":
        print("    ", "\n     ".join(group_rows["question"] + ":" + group_rows["answer"]))

        citywide = survey.tabulate_grid_of_death("citywide", group)
        district = survey.tabulate_grid_of_death("district", group)
        zone = survey.tabulate_grid_of_death("zone", group)

        all_geos = pd.concat([citywide, districts, zones])

        tables.append((group, all_geos))


Access_Device GROUPED-SINGLE
     Cell_phone
     Computer
     Tablet
Comfortability_Using_Devices_Task MULTI-SELECT
     I do not feel comfortable doing any of these tasks
     Passing a basic computer skills test
     Printing and scanning documents
     Submitting an online job application
     Uploading documents
     Using a website builder
     Using communication and collaboration tools (Slack, Teams)
     Using email applications (Microsoft Outlook, Gmail)
     Using Microsoft Office, or Google applications (Word, Excel, PowerPoint, Docs, Sheets, or Slides)
     Using photo or video editing software
     Using software to analyze data
     Using video conferencing software (Zoom, Google Meet)
     Writing code
Concerns_Using_Devices MULTI-SELECT
     Concerns about being able to determine if online content is real
     Concerns about my ability to protect personal information online
     Concerns about protecting my financial information
     Fear of being hacked or getting a 

In [261]:
with pd.ExcelWriter(
    "C:\\Users\\mike\\Desktop\\digital_equity_questions_20250512.xlsx",
    engine="xlsxwriter"
) as w:
    for i, (label, table) in enumerate(tables):
        table.to_excel(w, sheet_name=f"{i}-{label}"[:30])