# Notebook with Code to Clean the Data Scraped off the Census of India [Website](https://censusindia.gov.in/census.website/data/census-tables)

By way of "cleaning" this notebook does the following:

1. Fix column header structure of each data file according to the [Metadata file](https://github.com/SocratusCollective/gender-polyconflict/blob/main/Metadata_2011.xlsx).
1. Strip leading spaces and replace "-" with " to " in Age Group columns, for e.g., replace " 5-12" by "5 to 12", to prevent spreadsheet software from misinterpreting the age range "5-12" as the date, the 5th of December, for example.
1. Standardise hyphenation in the Area Name column,
1. Remove rows where any primary key columns are empty,
1. Perform meta-data checks, which consist of:
```
a) Checking for primary key uniqueness
b) Data type match
c) Checking if columns expected to have 1:1 mappings do
d) Checking levels of categorical variables (if any)
```
1. In special cases, the following processing steps will additionally be run _before_ meta-data checks:
```
a) If Census file type is "H-01" or "HH-01-Total", delete all rows where
"Tehsil Code" <> "00000" or "Town Code" <> "000000", or,
b) If Census file type is "HL-14-Total" or "HL-14-SC-ST", delete all rows where
"Tehsil Code" <> "00000" or "Town Code/Village code" <> "000000" or
"Ward No <> "0000", and ensure values across proportion columns sum up
to 100 in each row.
c) If "F-04", "F-08" or "F-12", remove leading whitespaces from "Economic Activity" column.
```

1. Collate similar datasets (e.g., for the "B-01" type of files, there is one file for each state with caste granularity "Total", "SC" and "ST"; so the task is to obtain one collated file for each caste category, "Total", "SC" and "ST") and output to [this](https://drive.google.com/drive/folders/1N7hefMlzIodrufTz7xHGM1boKv0Wvozq) folder.
  - Note, however, that if the collated dataset size exceeds Google Sheets's limit of 10 million cells per file, each dataset will be stored as a separate file.

In [None]:
!pip install --quiet gspread google-auth google-api-python-client \
  pandas==2.2.2 google-auth==2.27.0 openpyxl xlrd

In [None]:
from google.colab import auth
from google.auth import default
from googleapiclient.discovery import build
import gspread
import pandas as pd
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
import io
from google.colab import drive

## Util Function: Get ID of a folder (a unique identifier for the folder in Google Drive) given its path

In [None]:
def get_folder_id(path):
    """
    Retrieves the Google Drive folder ID for a given path.

    Args:
    -----
        path (str): Path in Google Drive (e.g., "RootFolder/SubFolder/SubSubFolder").

    Returns:
    --------
        str: Folder ID if found, else None.
    """
    path = path.strip('/')  # Normalize path
    folder_id = 'root'  # Start from the root folder

    for folder_name in path.split('/'):
        query = (f"'{folder_id}' in parents and mimeType='application/"
                 f"vnd.google-apps.folder' and name='{folder_name}'")
        results = drive_service.files().list(q=query, fields="files(id)").execute()
        folders = results.get('files', [])

        if not folders:
            return None  # Return None if the folder is not found

        folder_id = folders[0]['id']  # Assume unique folder names

    return folder_id

# Function to cache Metadata for a given dataset from the Metadata Google Sheet

In [None]:
def cache_metadata_from_sheet(sheet, dataset_name, start_cell):
    """
    Extract metadata for a given dataset name from a Google Sheet.

    Parameters:
    - sheet (gspread.Spreadsheet): The Google Sheet object.
    - dataset_name (str): Dataset name, which is also the name of the tab
                          containing the dataset's metadata.
    - start_cell (str): Starting cell of the metadata (e.g., 'F1').

    Returns:
    - dict: Metadata dictionary with column names, primary keys, types, and levels.
    """
    if dataset_name in metadata_cache:
        print(f"Metadata for {dataset_name} already cached.")
        return

    try:
        # Access the specific tab
        tab = sheet.worksheet(dataset_name)

        # Fetch all tab content
        all_values = tab.get_all_values()

        # Calculate row and column indices for start_cell;
        # e.g., D3 corresponds to row 3 and column 4
        start_row, start_col = gspread.utils.a1_to_rowcol(start_cell)

        # Filter all tab content to get content from the given start_cell
        filtered_values = [row[start_col - 1 :] \
                           for row in all_values[start_row - 1 :] if any(row)]

        # Determine the last column and row dynamically
        last_col_index = max(len(row) for row in filtered_values)
        metadata = [row[:last_col_index] for row in filtered_values]

        # Convert metadata into a dictionary
        metadata_dict = {}
        for row in metadata[1:]:  # Skip header row
            column_name = row[1].strip() if len(row) > 1 else None
            if column_name:  # Only process rows with a valid column name
                primary_key = row[2] == "Y" if len(row) > 2 else False
                map_1_1 = row[3] if len(row) > 3 else None
                data_type = row[4] if len(row) > 4 else None
                levels = [level for level in row[5:] if level]

                if data_type == "constant string":
                  assert len(levels) == 1, (f"Encountered Constant String"
                                            f" variable {column_name} with"
                                            f"multiple levels {levels}")

                metadata_dict[column_name] = {
                    "Primary Key": primary_key,
                    "Map 1:1": map_1_1,
                    "Type": data_type,
                    "Levels": levels,
                }

        # Cache the metadata for this dataset name
        metadata_cache[dataset_name] = metadata_dict
        print(f"Metadata for {dataset_name} cached successfully.")

    except Exception as e:
        print(f"Error retrieving metadata for dataset name {dataset_name}: {e}")

## Function to get IDs and names of relevant files
This function fetches the IDs and full names of all files in a given a google drive `folder_id` and beggining with the `input_prefix` (e.g., "B-01"). If `caste` is specified, relevant files will be deemed to be those corresponding to the specified caste (i.e., files which end with `f"_{caste.lower()}.xlsx"` or `f"_{caste.lower()}.xls"`).

Notes:
- File ID is a unique identifier for a given file in Google Drive
- Function excludes those files which begin with `f"{input_prefix}_India"`

In [None]:
def get_file_ids_and_names(folder_id, input_prefix, caste=None):
    """
    Retrieves ID and names of files starting with the specified prefix, excluding
    files starting with f"{input_prefix}_India". If caste is specified, filters
    further to files ending with f"_{caste.lower()}.xlsx" or f"_{caste.lower()}.xls".

    Parameters:
    - folder_id (str): ID of Google Drive folder containing the files.
    - input_prefix (str): Prefix of file names to process.
    - caste (str, optional): Specific caste filter for file names.

    Returns:
    - List[dict]: A list of ID/name dictionaries for relevant files.
    """
    query = f"'{folder_id}' in parents"
    files = []
    page_token = None

    while True:
        response = drive_service.files().list(
            q=query,
            spaces='drive',
            fields="nextPageToken, files(id, name)",
            pageToken=page_token
        ).execute()

        files.extend(response.get('files', []))
        page_token = response.get('nextPageToken', None)

        if not page_token:
            break

    # Filter files by prefix and exclude f"{input_prefix}_India"
    relevant_files = [
        file for file in files
        if file['name'].startswith(input_prefix) and not \
        file['name'].startswith(f"{input_prefix}_India")
    ]

    # Apply caste filter if specified
    if caste:
        caste_lower = caste.lower()
        relevant_files = [
            file for file in relevant_files if file['name'].lower().endswith(
                (f"_{caste_lower}.xlsx", f"_{caste_lower}.xls"))
        ]

        print((f"{len(relevant_files)} files found starting with the prefix "
              f"'{input_prefix}' (excluding '{input_prefix}_India') "
              f"and matching caste '{caste}'."))
    else:

        print((f"{len(relevant_files)} files found starting with the prefix "
              f"'{input_prefix}' (excluding '{input_prefix}_India')."))
    return relevant_files

# Functions to download a given single Census data file and return a processed `pandas` dataframe containing the Census data:

In [None]:
def normalise_dataframe(df, file_name):
    """
    Normalises values in predefined sets of columns so that they sum to 100.
    """
    df_normalised = df.copy()

    try:

      # Normalizing set 1
      columns_set_1 = [
          "Census House Condition - Usetype: Residence: Good",
          "Census House Condition - Usetype: Residence: Livable",
          "Census House Condition - Usetype: Residence: Dilapidated",
          "Census House Condition - Usetype: Residence-cum-other use: Good",
          "Census House Condition - Usetype: Residence-cum-other use: Livable",
          "Census House Condition - Usetype: Residence-cum-other use: Dilapidated"
          ]
      row_sums_1 = df[columns_set_1].sum(axis=1, skipna=True)
      row_sums_1[row_sums_1 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_1] = \
       (df[columns_set_1].div(row_sums_1, axis=0)) * 100

      # Normalizing set 2
      columns_set_2 = ["Material of Roof: Grass/ Thatch/ Bamboo/ Wood/Mud etc.",
                      "Material of Roof: Plastic/ Polythene",
                      "Material of Roof: Hand made Tiles",
                      "Material of Roof: Machine made Tiles",
                      "Material of Roof: Burnt Brick",
                      "Material of Roof: Stone/ Slate",
                      "Material of Roof: G.I./Metal/ Asbestos sheets",
                      "Material of Roof: Concrete",
                      "Material of Roof: Any other material"]
      row_sums_2 = df[columns_set_2].sum(axis=1, skipna=True)
      row_sums_2[row_sums_2 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_2] = \
       (df[columns_set_2].div(row_sums_2, axis=0)) * 100

      # Normalizing set 3
      columns_set_3 = ["Material of Wall: Grass/ Thatch/ Bamboo etc.",
                      "Material of Wall: Plastic/ Polythene",
                      "Material of Wall: Mud/Unburnt brick",
                      "Material of Wall: Wood",
                      "Material of Wall: Stone not packed with mortar",
                      "Material of Wall: Stone packed with mortar",
                      "Material of Wall: G.I./ Metal/ Asbestos sheets",
                      "Material of Wall: Burnt brick",
                      "Material of Wall: Concrete",
                      "Material of Wall: Any other material"]
      row_sums_3 = df[columns_set_3].sum(axis=1, skipna=True)
      row_sums_3[row_sums_3 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_3] = \
       (df[columns_set_3].div(row_sums_3, axis=0)) * 100

      # Normalizing set 4
      columns_set_4 = ["Material of Floor: Mud",
                      "Material of Floor: Wood/ Bamboo",
                      "Material of Floor: Burnt Brick",
                      "Material of Floor: Stone",
                      "Material of Floor: Cement",
                      "Material of Floor: Mosaic/ Floor tiles",
                      "Material of Floor: Any other material"]
      row_sums_4 = df[columns_set_4].sum(axis=1, skipna=True)
      row_sums_4[row_sums_4 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_4] = \
       (df[columns_set_4].div(row_sums_4, axis=0)) * 100

      # Normalizing set 5
      columns_set_5 = ["Number of Dwelling Rooms: No exclusive room",
                      "Number of Dwelling Rooms: One room",
                      "Number of Dwelling Rooms: Two rooms",
                      "Number of Dwelling Rooms: Three rooms",
                      "Number of Dwelling Rooms: Four rooms",
                      "Number of Dwelling Rooms: Five rooms",
                      "Number of Dwelling Rooms: Six rooms and above"]
      row_sums_5 = df[columns_set_5].sum(axis=1, skipna=True)
      row_sums_5[row_sums_5 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_5] = \
       (df[columns_set_5].div(row_sums_5, axis=0)) * 100

      # Normalizing set 6
      columns_set_6 = ["Household size: 1", "Household size: 2",
                      "Household size: 3", "Household size: 4",
                      "Household size: 5", "Household size: 6-8",
                      "Household size: 9+"]
      row_sums_6 = df[columns_set_6].sum(axis=1, skipna=True)
      row_sums_6[row_sums_6 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_6] = \
       (df[columns_set_6].div(row_sums_6, axis=0)) * 100

      # Normalizing set 7
      columns_set_7 = ["Ownership status: Owned", "Ownership status: Rented",
                      "Ownership status: Any others"]
      row_sums_7 = df[columns_set_7].sum(axis=1, skipna=True)
      row_sums_7[row_sums_7 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_7] = \
       (df[columns_set_7].div(row_sums_7, axis=0)) * 100

      # Normalizing set 8
      columns_set_8 = ["Married couple: None", "Married couple: 1",
                      "Married couple: 2", "Married couple: 3",
                      "Married couple: 4", "Married couple: 5+"]
      row_sums_8 = df[columns_set_8].sum(axis=1, skipna=True)
      row_sums_8[row_sums_8 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_8] = \
       (df[columns_set_8].div(row_sums_8, axis=0)) * 100

      # Normalizing set 9
      columns_set_9 = [
          "Main Source of Drinking Water: Tapwater from treated source",
          "Main Source of Drinking Water: Tapwater from un-treated source",
          "Main Source of Drinking Water: Covered well",
          "Main Source of Drinking Water: Un-covered well",
          "Main Source of Drinking Water: Handpump",
          "Main Source of Drinking Water: Tubewell/Borehole",
          "Main Source of Drinking Water: Spring",
          "Main Source of Drinking Water: River/Canal",
          "Main Source of Drinking Water: Tank/Pond/Lake",
          "Main Source of Drinking Water: Other sources"]
      row_sums_9 = df[columns_set_9].sum(axis=1, skipna=True)
      row_sums_9[row_sums_9 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_9] = \
       (df[columns_set_9].div(row_sums_9, axis=0)) * 100

      # Normalizing set 10
      columns_set_10 = ["Location of drinking water source: Within premises",
                        "Location of drinking water source: Near premises",
                        "Location of drinking water source: Away"]
      row_sums_10 = df[columns_set_10].sum(axis=1, skipna=True)
      row_sums_10[row_sums_10 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_10] = \
       (df[columns_set_10].div(row_sums_10, axis=0)) * 100

      # Normalizing set 11
      columns_set_11 = ["Main Source of lighting: Electricity",
                        "Main Source of lighting: Kerosene",
                        "Main Source of lighting: Solar energy",
                        "Main Source of lighting: Other oil",
                        "Main Source of lighting: Any other",
                        "Main Source of lighting: No lighting"]
      row_sums_11 = df[columns_set_11].sum(axis=1, skipna=True)
      row_sums_11[row_sums_11 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_11] = \
       (df[columns_set_11].div(row_sums_11, axis=0)) * 100

      # Normalizing set 12
      columns_set_12 = [
        "Latrine in-premises: Flush/pour flush latrine connected to: Piped sewer system",
        "Latrine in-premises: Flush/pour flush latrine connected to: Septic tank",
        "Latrine in-premises: Flush/pour flush latrine connected to: Other system",
        "Latrine in-premises: Pit latrine: With slab/ventilated improved pit",
        "Latrine in-premises: Pit latrine: Without slab/ open pit",
        "Latrine in-premises: Night soil disposed into open drain",
        "Latrine in-premises: Service Latrine: Night soil removed by human",
        "Latrine in-premises: Service Latrine: Night soil serviced by animal",
        "No latrine in-premises: Public latrine", "No latrine in-premises: Open"]
      row_sums_12 = df[columns_set_12].sum(axis=1, skipna=True)
      row_sums_12[row_sums_12 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_12] = \
       (df[columns_set_12].div(row_sums_12, axis=0)) * 100

      # Normalizing set 13
      columns_set_13 = ["Bathing facility in premises: Yes: Bathroom",
                        "Bathing facility in premises: Yes: Enclosure without roof",
                        "Bathing facility in premises: No"]
      row_sums_13 = df[columns_set_13].sum(axis=1, skipna=True)
      row_sums_13[row_sums_13 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_13] = \
       (df[columns_set_13].div(row_sums_13, axis=0)) * 100

      # Normalizing set 14
      columns_set_14 = ["Waste water outlet connected to: Closed drainage",
                        "Waste water outlet connected to: Open drainage",
                        "Waste water outlet connected to: No drainage"]
      row_sums_14 = df[columns_set_14].sum(axis=1, skipna=True)
      row_sums_14[row_sums_14 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_14] = \
       (df[columns_set_14].div(row_sums_14, axis=0)) * 100

      # Normalizing set 15
      columns_set_15 = ["Type of Fuel used for Cooking: Fire-wood",
                        "Type of Fuel used for Cooking: Crop residue",
                        "Type of Fuel used for Cooking: Cowdung cake",
                        "Type of Fuel used for Cooking: Coal,Lignite,Charcoal",
                        "Type of Fuel used for Cooking: Kerosene",
                        "Type of Fuel used for Cooking: LPG/PNG",
                        "Type of Fuel used for Cooking: Electricity",
                        "Type of Fuel used for Cooking: Biogas",
                        "Type of Fuel used for Cooking: Any other",
                        "Type of Fuel used for Cooking: No cooking"]
      row_sums_15 = df[columns_set_15].sum(axis=1, skipna=True)
      row_sums_15[row_sums_15 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_15] = \
       (df[columns_set_15].div(row_sums_15, axis=0)) * 100

      # Normalizing set 16
      columns_set_16 = ["Cooking inside house: Has Kitchen",
                        "Cooking inside house: Does not have kitchen",
                        "Cooking outside house: Has Kitchen",
                        "Cooking outside house: Does not have kitchen",
                        "No Cooking"]
      row_sums_16 = df[columns_set_16].sum(axis=1, skipna=True)
      row_sums_16[row_sums_16 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_16] = \
       (df[columns_set_16].div(row_sums_16, axis=0)) * 100

      # Normalizing set 17
      columns_set_17 = ["Type of Census Houses: Permanent",
                        "Type of Census Houses: Semi-Permanent",
                        "Type of Census Houses: Total Temporary",
                        "Type of Census Houses: Temporary - Serviceable",
                        "Type of Census Houses: Temporary - Non-Serviceable",
                        "Type of Census Houses: Unclassifiable"]
      row_sums_17 = df[columns_set_17].sum(axis=1, skipna=True)
      row_sums_17[row_sums_17 == 0] = 1  # Avoid division by zero
      df_normalised[columns_set_17] = \
       (df[columns_set_17].div(row_sums_17, axis=0)) * 100

    except Exception as e:
        print(f"Error normalizing dataframe from file {file_name}: {str(e)}")
        raise

    return df_normalised

In [None]:
def process_single_df(df, file_prefix, file_name):
    """
    Process an input `pandas` dataframe to:
    - strip leading spaces and replace "-" with " to " in Age Group columns,
    - standardise hyphenation in the Area Name column,
    - remove rows where any primary key columns are empty,
    - perform meta-data checks, which consist of:
          a) Checking for primary key uniqueness
          b) Data type match
          c) Checking if columns expected to have 1:1 mappings do
          d) Checking levels of categorical variables (if any)

    In special cases, the following processing steps will additionally be run
    before meta-data checks:
          a) If "H-01" or "HH-01-Total", delete all rows where "Tehsil Code" <>
          "00000" or "Town Code" <> "000000".
          b) If "HL-14-Total" or "HL-14-SC-ST", delete all rows where
          "Tehsil Code" <> "00000" or "Town Code/Village code" <> "000000" or
          "Ward No <> "0000", and ensure values across proportion columns sum up
          to 100 in each row.
          c) If "F-04", "F-08" or "F-12", remove leading whitespaces from "Economic Activity"
          column.

    Failure to satisfy any meta-data check will result in an error.

    Parameters:
    - df (str): Input `pandas` dataframe.
    - file_prefix (str): Excel file name prefix.

    Returns:
    - DataFrame: Processed DataFrame.
    """
    # Replace "-" with " to " in column with names "Age Group", "Age-Group" etc.
    age_col_names = ["Age Group", "Age group", "Age-Group", "Age-group",
                     "Age_Group", "Age_group"]
    age_cols = [col for col in df.columns if col in age_col_names]
    df[age_cols] = df[age_cols].apply(
        lambda x: x.astype(str).str.replace("-", " to ").str.lstrip())

    # Standardise Hyphenation in the Area Name column
    df['Area Name'] = df['Area Name'].str.replace(
        r'(?<=\S)-(?=\S)', ' - ', regex=True)
    df['Area Name'] = df['Area Name'].str.replace(
        r'(?<=\S)- (?=\S)', ' - ', regex=True)
    df['Area Name'] = df['Area Name'].str.replace(
        r'(?<=\S) -(?=\S)', ' - ', regex=True)

    metadata = metadata_cache[file_prefix]
    # --- Step 1: Special Case Processing ---
    if file_prefix in ["H-01", "HH-01-Total"]:
        df = df[(df["Tehsil Code"] == "00000") & (df["Town Code"] == "000000")]

    if file_prefix in ["HL-14-Total", "HL-14-SC-ST"]:
        df = df[
            (df["Tehsil Code"] == "00000") &
            (df["Town Code/Village code"] == "000000") &
            (df["Ward No"] == "0000")
        ]

        # Ensure proportion columns sum to 100
        df = normalise_dataframe(df, file_name)

    if file_prefix in ["F-04", "F-08", "F-12"]:
        df["Economic Activity"] = df["Economic Activity"].str.lstrip()

    # --- Step 2: Remove rows where any primary key columns are empty ---
    primary_keys = [col for col, meta in metadata.items()
                    if meta.get("Primary Key")]
    df = df.dropna(subset=primary_keys)

    # --- Step 3: Metadata Checks ---
    # (a) Primary Key Uniqueness
    if df.duplicated(subset=primary_keys).any():
        raise ValueError(f"File {file_name}: Primary key constraint violated.")

    # Loop for Data Type Check, 1:1 Mapping Check, Categorical Levels Check
    map_1to1_groups = {}  # Dict to store columns grouped by mapping symbol

    for col, meta in metadata.items():
        expected_type = meta.get("Type")
        expected_levels = meta.get("Levels")
        mapping_symbol = meta.get("Map 1:1")

        # --- (b) Data Type Validation ---
        if expected_type == "numerical":
            if not pd.api.types.is_numeric_dtype(df[col]):
                raise ValueError((f"File {file_name}: Column '{col}' expected "
                            f"to be numerical, but found non-numeric values."))

        elif expected_type == "constant string":
            actual_levels = df[col].dropna().unique()
            if set(actual_levels) != set(expected_levels):
                raise ValueError((f"File {file_name}: Column '{col}' expected "
                  f"to have level{expected_levels}, but found {actual_levels}."))

        elif expected_type == "constant string within":
            if df[col].nunique() > 1:
                raise ValueError((f"File {file_name}: Column '{col}' expected "
                                  f"to have the same value across all rows."))

        # --- (c) Collect 1:1 Mapping Groups ---
        if mapping_symbol:
            if mapping_symbol in map_1to1_groups:
                map_1to1_groups[mapping_symbol].append(col)
            else:
                map_1to1_groups[mapping_symbol] = [col]

        # --- (d) Categorical Levels Check ---
        if expected_levels and expected_type not in ["constant string",
                                                     "constant string within"]:
            actual_levels = df[col].dropna().unique()
            unexpected_levels = set(actual_levels) - set(expected_levels)
            if unexpected_levels:
                raise ValueError((f"File {file_name}: Unexpected levels found "
                                  f"in column '{col}':{unexpected_levels}"))

    # --- Step 3(c) Validate 1:1 Mappings ---
    for mapping_symbol, columns in map_1to1_groups.items():
        if len(columns) != 2:
            raise ValueError((f"File {file_name}: Mapping symbol '{mapping_symbol}' "
              f"should be assigned to exactly two columns, but found {columns}."))

        col1, col2 = columns
        mapping_pairs = df[[col1, col2]].dropna().drop_duplicates()

        if mapping_pairs[col1].duplicated().any() or \
          mapping_pairs[col2].duplicated().any():
            raise ValueError((f"File {file_name}: Columns '{col1}' and '{col2}'"
                              f" do not have a 1:1 mapping."))

    return df

In [None]:
def download_and_process_file(file_info, start_row, file_prefix,
                              direct_file_name=None, direct_folder_id=None):
    """
    Downloads and processes a single file.

    Parameters:
    - file_info (dict): Metadata of the file to download.
    - start_row (int): Row in file where the data table starts (1-based index).
    - file_prefix (str): Prefix of file names to process.
    - direct_file_name (str): Name of the file (with extension, e.g., ".xlsx") to
        be downloaded and processed. If provided, will be directly downloaded
        and processed. No search for filename based on `file_prefix` will be done.
    - direct_folder_id (str): ID of the Google Drive folder where the "direct"
        file can be found.

    Returns:
    - pd.DataFrame: The processed DataFrame, or None if an error occurred.
    """
    column_names = metadata_cache[file_prefix].keys()
    str_cols_dict = {k:'str' for k, v in metadata_cache[file_prefix].items() \
        if v['Type'] in ['string', 'constant string within', 'constant string']}

    # Determine file ID
    if direct_file_name and direct_folder_id:
        try:
            query = f"'{direct_folder_id}' in parents and name = '{direct_file_name}'"
            response = drive_service.files().list(
                q=query, spaces='drive', fields="files(id)").execute()
            files = response.get("files", [])
            if not files:
                print(f"Error finding file {direct_file_name} in given folder.")
                return None
            file_id = files[0]["id"]
        except Exception as e:
            print(f"Error finding file {direct_file_name}: {e}")
            return None
    else:
        file_id = file_info['id']
        file_name = file_info['name']

    # Download the file
    try:
        request = drive_service.files().get_media(fileId=file_id)
        fh = io.BytesIO()
        downloader = MediaIoBaseDownload(fh, request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
        fh.seek(0)  # Reset the pointer in BytesIO
        df = pd.read_excel(fh, skiprows=start_row - 1, names=column_names,
                        dtype=str_cols_dict, usecols = range(len(column_names)))
    except Exception as e:
        print(f"Error downloading file {direct_file_name or file_name}: {e}")
        return None

    # Apply processing logic
    processed_df = process_single_df(df, file_prefix, direct_file_name or file_name)
    return processed_df

## Function to upload single processed dataframe as a spreadsheet in a specified Google Drive folder

In [None]:
def upload_as_gsheet(df, op_sheet_name, op_folder_id):
    """
    Uploads input dataframe to a specified Google Drive folder as a Google Sheet

    Parameters:
    - df (pd.DataFrame): Dataframe to Upload
    - op_sheet_name (str): Name of the output Google Sheet
    - op_folder_id (str): ID of the Google Drive folder where the sheet will be saved
    """
    try:
        # Save to a temporary CSV
        temp_csv = "temp_combined_data.csv"
        df.to_csv(temp_csv, index=False)

        # Prepare file metadata with folder ID
        file_metadata = {
            "name": op_sheet_name,
            "mimeType": "application/vnd.google-apps.spreadsheet",
            "parents": [op_folder_id]
        }

        # Use resumable upload by setting the resumable flag to True
        media = MediaFileUpload(temp_csv, mimetype="text/csv", resumable=True)
        request = drive_service.files().create(body=file_metadata, media_body=media, fields="id")

        response = None
        while response is None:
            status, response = request.next_chunk()
            if status:
                print("Uploaded {}%".format(int(status.progress() * 100)))
        print("Data uploaded to Google Sheet: {} , ID: {}".format(op_sheet_name, response.get("id")))
    except Exception as e:
        print("Error during data upload: {}".format(e))

## Function to combine (SQL `union` style) a list of pandas dataframes, checking uniqueness of records in combined dataframe and upload the result as a spreadsheet in a specified Google Drive folder

- Since each individual dataframe to be combined has already been checked for uniqueness of records, the only reason there could be duplicates in the combined dataframe is if there are duplicate individual dataframes. In this case, an error will be thrown.

In [None]:
def union_and_upload_to_gsheet(df_list, op_sheet_name, op_folder_id,
                               primary_keys, individual_df_file_name_list):
    """
    Combines DataFrames provided in a list and uploads the result to a specified
    Google Drive folder as a single Google Sheet. If, however, the combined
    DataFrame exceeds Google Sheets' size limit (10 million cells), each
    individual DataFrame is uploaded separately as an individual Google Sheet.

    Parameters:
    - df_list (List[pd.DataFrame]): List of DataFrames to combine.
    - op_sheet_name (str): Name of the output Google Sheet, in case the combined
                           dataframe can be uploaded as a single sheet.
    - op_folder_id (str): ID of the Google Drive folder where the sheet will be
                          saved.
    - primary_keys (list): List of names of columns comprising the primary key.
    - individual_df_file_name_list(list): List of filenames corresponding to each
                                         DataFrame in `df_list` that will be used
                                         in case individual DataFrames are to
                                         be uploaded separately.
    """
    try:
        # Combine all DataFrames into one
        combined_df = pd.concat(df_list, ignore_index=True)

        # Check for duplicates - just to ensure there are no duplicate dfs in themselves
        if combined_df.duplicated(subset=primary_keys).any():
            raise ValueError(f"Primary key constraint violated in the combined DataFrame.")
    except Exception as e:
        print(f"Error during data union: {e}")

    if combined_df.shape[0]*combined_df.shape[1] < 1e7: # 10 million cells limit
      upload_as_gsheet(combined_df, op_sheet_name, op_folder_id)
    else:
      for df_idx, df in enumerate(df_list):
        upload_as_gsheet(df, individual_df_file_name_list[df_idx], op_folder_id)

# Main Workflow

## Setup

In [None]:
# Authenticate connect to Drive / Sheets
auth.authenticate_user()
creds, _ = default()
drive_service = build('drive', 'v3', credentials=creds)
drive.mount('/content/drive')
gc = gspread.authorize(creds)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Set global variables
metadata_cache = {}
metadata_sheet_id = "1E3GAnfUaiAhIlU-F2v-9EGlXFymngRmQ-K1pMsawX9s" # replace with your metadata sheet ID
metadata_sheet = gc.open_by_key(metadata_sheet_id)
input_folder_path = "Socratus/Census Data/2011 Data" # replace with your Google Drive input data folder path
input_folder_id = get_folder_id(input_folder_path)
output_folder_path = "Socratus/Gender/Data" # replace with your Google Drive output folder path
output_folder_id = get_folder_id(output_folder_path)

## Handle Different Types of Data Files

| Filetype / Code | Explanation                                                                                   |
|------------------|-----------------------------------------------------------------------------------------------|
| Exclude          | Not used for analysis                                                                        |
| 1                | Instances - 1 per state - and are available with caste granularity - Total, SC, ST           |
| 2                | Singletons                                                                                   |
| 3                | Instances available with caste granularity - Total, SC, ST - but are aggregated spatially    |
| 4                | Instances - 1 per state - but caste granularity not available                                |
| 5                | Instances - 1 per district - but aggregated over caste granularity                          |
| 6                | Instances - 1 per state - with caste granularity - SC and ST alone                          |


In [None]:
castes = ['Total', 'SC', 'ST']

# File Type 1
type1_files_start_with = ['B-01', 'B-03', 'B-07', 'C-02', 'C-20', 'D-02',
                          'F-01', 'F-05', 'F-09']
type1_files_data_start_row = [9, 8, 9, 8, 6, 6, 11, 11, 11]
type1_files_metadata_start_cell = ['F3']*len(type1_files_start_with)

# File Type 2
type2_files_start_with = ['B-02', 'B-05', 'B-17', 'C-03A', 'C-23']
type2_file_names = ['DDW-0000B-02.xlsx', 'DDW-B05-0000.xlsx', 'DDW-B17-0000.xlsx',
  'DDW-0000C-03A.xlsx', 'DDW-0000C-23.xlsx']
type2_files_data_start_row = [8, 9, 8, 8, 8]
type2_files_metadata_start_cell = ['A1']*len(type2_files_start_with)

# File Type 3
type3_files_start_with = ['B-08', 'HH-04']
type3_files_data_start_row = [9, 8]
type3_file_names = [['DDW-B08-0000.xlsx', 'DDW-B08SC-0000.xlsx',
                    'DDW-B08ST-0000.xlsx'], ['DDW-HH04-0000-2011.XLSx',
                    'DDW-HH04SC-0000-2011.XLSx', 'DDW-HH04ST-0000-2011.XLSx']]
type3_files_metadata_start_cell = ['A1']*len(type3_files_start_with)

# File Type 4
type4_files_start_with = ['B-04-Total', 'B-06-Total', 'B-09', 'B-16', 'B-28',
  'C-03', 'D-04', 'D-05', 'F-02', 'F-03', 'F-04', 'F-06', 'F-07', 'F-08',
  'F-10', 'F-11', 'F-12', 'H-01', 'HH-01-Total', 'HH-02']
type4_files_data_start_row = [9, 9, 8, 8, 7, 8, 7, 6, 10, 10, 8, 10, 10, 8, 10,
                              10, 8, 8, 7, 7]
type4_files_metadata_start_cell = ['D3']*len(type4_files_start_with)

# File Type 5
type5_files_start_with = ['HL-14-Total']
type5_files_data_start_row = [8]
type5_files_metadata_start_cell = ['E3']*len(type5_files_start_with)

# File Type 6
type6_files_start_with = ['B-04-SC-ST', 'B-06-SC-ST', 'HH-01-SC-ST',
                          'HL-14-SC-ST']
type6_files_data_start_row = [9, 9, 7, 8]
type6_files_metadata_start_cell = ['E3']*len(type6_files_start_with)

## Cleaning Workflow for each Census data dataset:

1. Get and Cache Metadata
2. Get Dataset Primary Keys (if needed)
3. Get Names and IDs of all relevant files (if needed)
4. Process the relevant files
5. Merge similar datasets (if needed) and output as single google sheet

In [None]:
# File Type 1
for file_prefix_idx, file_prefix in enumerate(type1_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type1_files_metadata_start_cell[file_prefix_idx])
  # Get Dataset Primary Keys
  primary_keys = [col for col, meta in metadata_cache[file_prefix].items()
                    if meta.get("Primary Key")]
  for caste in castes:
    print("---\n")
    # Get Names and IDs of all relevant files
    files = get_file_ids_and_names(input_folder_id, file_prefix, caste)
    # Process relevant files
    processed_dfs = [download_and_process_file(
        file, type1_files_data_start_row[file_prefix_idx], file_prefix)
        for file in files]
    # Output as single google sheet
    output_file_name = f"{file_prefix}_caste_{caste.lower()}"
    individual_df_file_name_list = [
        file['name'].rsplit('.', 1)[0] if file['name'].lower().endswith(
            ('.xls', '.xlsx')) else file['name'] for file in files]
    union_and_upload_to_gsheet(processed_dfs, output_file_name, output_folder_id,
                               primary_keys, individual_df_file_name_list)

---

Metadata for B-01 cached successfully.
---

35 files found starting with the prefix 'B-01' (excluding 'B-01_India') and matching caste 'Total'.
Data uploaded to Google Sheet: B-01_caste_total , ID: 1a2avF8CvT3bE3ITmz_jOIkW-ox1Ssncfi0nF1BV22qs
---

31 files found starting with the prefix 'B-01' (excluding 'B-01_India') and matching caste 'SC'.
Data uploaded to Google Sheet: B-01_caste_sc , ID: 1rJ0JGpUYHei2VPOG_O4qyWw3zQ0JSRw7K5TT388DurQ
---

30 files found starting with the prefix 'B-01' (excluding 'B-01_India') and matching caste 'ST'.
Data uploaded to Google Sheet: B-01_caste_st , ID: 1Sf9PjgNpR3Ry1TOns1CtlNcdBaZN4pTalGQ0pSyrLgE
---

Metadata for B-03 cached successfully.
---

35 files found starting with the prefix 'B-03' (excluding 'B-03_India') and matching caste 'Total'.
Data uploaded to Google Sheet: B-03_caste_total , ID: 1cjOjOm-A88nygEKlrE1e90ewzuOjvQCc2EWv6VK3KDY
---

31 files found starting with the prefix 'B-03' (excluding 'B-03_India') and matching caste 'SC'.
Data u

In [None]:
# File Type 2
for file_prefix_idx, file_prefix in enumerate(type2_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type2_files_metadata_start_cell[file_prefix_idx])
  # Process relevant files
  processed_df = download_and_process_file(
      None, type2_files_data_start_row[file_prefix_idx], file_prefix,
      type2_file_names[file_prefix_idx], input_folder_id)
  # Output as single google sheet
  output_file_name = f"{file_prefix}_caste_total"
  upload_as_gsheet(processed_df, output_file_name, output_folder_id)

---

Metadata for B-02 cached successfully.
Data uploaded to Google Sheet: B-02_caste_total , ID: 1E9N_K9Rswh_G9CUk3QDdmgnX5P0aUBoq33NZaWWixyA
---

Metadata for B-05 cached successfully.
Data uploaded to Google Sheet: B-05_caste_total , ID: 1Vjfo1ixyirwW7UkB0Zuiyr_1FPYqWhJzL-1xfvIJF_s
---

Metadata for B-17 cached successfully.
Data uploaded to Google Sheet: B-17_caste_total , ID: 1jaqKtInQ6MX9Ft9TOAf8w_T9d0SpN8zdHlnDf_kHLfU
---

Metadata for C-03A cached successfully.
Data uploaded to Google Sheet: C-03A_caste_total , ID: 1H8kJPlZFK_7E2atOMdPpbAhFZ-lDJCM44Hlo_z7jvtM
---

Metadata for C-23 cached successfully.
Data uploaded to Google Sheet: C-23_caste_total , ID: 1D_wmQqWD2XzT9JlrEU790Su2QBv7WIhl_uhRXUecZzk


In [None]:
# File Type 3
for file_prefix_idx, file_prefix in enumerate(type3_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type3_files_metadata_start_cell[file_prefix_idx])
  for caste_idx, caste in enumerate(castes):
    print("---\n")
    # Process relevant files
    processed_df = download_and_process_file(
        None, type3_files_data_start_row[file_prefix_idx], file_prefix,
        type3_file_names[file_prefix_idx][caste_idx], input_folder_id)
    # Output as single google sheet
    output_file_name = f"{file_prefix}_caste_{caste}"
    upload_as_gsheet(processed_df, output_file_name, output_folder_id)

---

Metadata for B-08 cached successfully.
---

Data uploaded to Google Sheet: B-08_caste_Total , ID: 1fAF8zAzcs_9nYWQmqU8TtP33EwOD3na8KDVT22HSK9Q
---

Data uploaded to Google Sheet: B-08_caste_SC , ID: 1exnuJ8ZIf77Iea9S6GHW9B_qWo1BHUZ1l4S7fchrL54
---

Data uploaded to Google Sheet: B-08_caste_ST , ID: 1fc8xqA7UfOTZ4_8F_1YC8pFqIxq41L5jMVabkj-UM0k
---

Metadata for HH-04 cached successfully.
---

Data uploaded to Google Sheet: HH-04_caste_Total , ID: 1L1eKdOFmqeVu2eGLWf-7FA5C9hAWp2ZrRz2vY18hC90
---

Data uploaded to Google Sheet: HH-04_caste_SC , ID: 18_2iiEbV50HWF6WFpBhQ6fFeRBmnNj74jb5JX2cHgtM
---

Data uploaded to Google Sheet: HH-04_caste_ST , ID: 166xJ-m2ZBbURaYsP8ReFrR4onJRCK-x95DqGXz-Dlsc


In [None]:
# File Type 4
for file_prefix_idx, file_prefix in enumerate(type4_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type4_files_metadata_start_cell[file_prefix_idx])
  # Get Dataset Primary Keys
  primary_keys = [col for col, meta in metadata_cache[file_prefix].items()
                    if meta.get("Primary Key")]
  # Get Names and IDs of all relevant files
  files = get_file_ids_and_names(input_folder_id, file_prefix)
  # Process relevant files
  processed_dfs = [download_and_process_file(
      file, type4_files_data_start_row[file_prefix_idx], file_prefix)
      for file in files]
  # Output as single google sheet
  output_file_name = f"{file_prefix}_caste_total"
  individual_df_file_name_list = [
    file['name'].rsplit('.', 1)[0] if file['name'].lower().endswith(
        ('.xls', '.xlsx')) else file['name'] for file in files]
  union_and_upload_to_gsheet(processed_dfs, output_file_name, output_folder_id,
                              primary_keys, individual_df_file_name_list)

---

Metadata for HH-01-Total cached successfully.
35 files found starting with the prefix 'HH-01-Total' (excluding 'HH-01-Total_India').
Data uploaded to Google Sheet: HH-01-Total_caste_total , ID: 1rKThtCVfugQJO1SwUMu-O3msllGNrWsOHRvwNfu4D6w
---

Metadata for HH-02 cached successfully.
34 files found starting with the prefix 'HH-02' (excluding 'HH-02_India').
Data uploaded to Google Sheet: HH-02_caste_total , ID: 1KID9TFXcE3Uw9icjhaigFw7JddgPXCF2_2Gr9mm-6n8


In [None]:
# File Type 5
for file_prefix_idx, file_prefix in enumerate(type5_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type5_files_metadata_start_cell[file_prefix_idx])
  # Get Dataset Primary Keys
  primary_keys = [col for col, meta in metadata_cache[file_prefix].items()
                    if meta.get("Primary Key")]
  # Get Names and IDs of all relevant files
  files = get_file_ids_and_names(input_folder_id, file_prefix)
  # Process relevant files
  processed_dfs = [download_and_process_file(
      file, type5_files_data_start_row[file_prefix_idx], file_prefix)
      for file in files]
  # Output as single google sheet
  output_file_name = f"{file_prefix}_caste_total"
  individual_df_file_name_list = [
    file['name'].rsplit('.', 1)[0] if file['name'].lower().endswith(
        ('.xls', '.xlsx')) else file['name'] for file in files]
  union_and_upload_to_gsheet(processed_dfs, output_file_name, output_folder_id,
                              primary_keys, individual_df_file_name_list)

---

Metadata for HL-14-Total cached successfully.
640 files found starting with the prefix 'HL-14-Total' (excluding 'HL-14-Total_India').
Data uploaded to Google Sheet: HL-14-Total_caste_total , ID: 1JR6NvAuI0fz3T7gjzurdGTnN0vmJTnfUCZ8LiY2hrro


In [None]:
# File Type 6
for file_prefix_idx, file_prefix in enumerate(type6_files_start_with):
  print("---\n")
  # Get and Cache Metadata
  cache_metadata_from_sheet(metadata_sheet, file_prefix,
                            type6_files_metadata_start_cell[file_prefix_idx])
  # Get Dataset Primary Keys
  primary_keys = [col for col, meta in metadata_cache[file_prefix].items()
                    if meta.get("Primary Key")]
  for caste in ["sc", "st"]:
    print("---\n")
    # Get Names and IDs of all relevant files
    files = get_file_ids_and_names(input_folder_id, file_prefix, caste)
    # Process relevant files
    processed_dfs = [download_and_process_file(
        file, type6_files_data_start_row[file_prefix_idx], file_prefix)
        for file in files]
    # Output as single google sheet
    output_file_name = f"{file_prefix}_caste_{caste.lower()}"
    individual_df_file_name_list = [
      file['name'].rsplit('.', 1)[0] if file['name'].lower().endswith(
          ('.xls', '.xlsx')) else file['name'] for file in files]
    union_and_upload_to_gsheet(processed_dfs, output_file_name, output_folder_id,
                                primary_keys, individual_df_file_name_list)

---

Metadata for B-04-SC-ST cached successfully.
---

31 files found starting with the prefix 'B-04-SC-ST' (excluding 'B-04-SC-ST_India') and matching caste 'sc'.
Data uploaded to Google Sheet: B-04-SC-ST_caste_sc , ID: 1eApyTG_-IPIpm8enF2CGfojv5W_VEA_ORoyeYV_27_Q
---

30 files found starting with the prefix 'B-04-SC-ST' (excluding 'B-04-SC-ST_India') and matching caste 'st'.
Data uploaded to Google Sheet: B-04-SC-ST_caste_st , ID: 1jrtXTohr9T8SE_pAjX4iIh3H8Xm6qhWTpe73aHz9cXw
---

Metadata for B-06-SC-ST cached successfully.
---

31 files found starting with the prefix 'B-06-SC-ST' (excluding 'B-06-SC-ST_India') and matching caste 'sc'.
Data uploaded to Google Sheet: B-06-SC-ST_caste_sc , ID: 1Hy7oFSBFDd96-5F3Yn0UeWGPZ-mRIosPdoqLZucVDso
---

30 files found starting with the prefix 'B-06-SC-ST' (excluding 'B-06-SC-ST_India') and matching caste 'st'.
Data uploaded to Google Sheet: B-06-SC-ST_caste_st , ID: 1TZ9Cb39rV_jF7x4R_-L-WsrFbKdPG-si8qsKiHUryFw
---

Metadata for HH-01-SC-ST cached