In [1]:
# File: report_lookups.py

import csv

# ---------------------------------------------------------------------------
# ADJUST THESE IMPORTS TO MATCH YOUR OWN FOLDER/FILE STRUCTURE
# ---------------------------------------------------------------------------
# Example (assuming you have your code in packages named exactly "DHW", "Elec", etc.):
# from DHW.dhw_lookup import dhw_lookup
# from Elec.lighting_lookup import lighting_lookup
# from ventilation.ventilation_lookup import ventilation_lookup
# from setzone.zone_sizing_lookup import zone_sizing_lookup
# from tempground.groundtemp_lookup import groundtemp_lookup

# If your code is not set up as Python packages, you might do:
# from .dhw_lookup import dhw_lookup
# or
# import sys
# sys.path.append("D:/Documents/E_Plus_2026_py/DHW")
# from dhw_lookup import dhw_lookup
#
# For now, let's assume you can just import them directly:
try:
    from DHW.dhw_lookup import dhw_lookup
    from Elec.lighting_lookup import lighting_lookup
    from ventilation.ventilation_lookup import ventilation_lookup
    from setzone.zone_sizing_lookup import zone_sizing_lookup
    from tempground.groundtemp_lookup import groundtemp_lookup
except ImportError:
    # If you can't import them like this, comment these out and adapt as needed
    print("[WARNING] Could not import from your modules. Please adjust imports!")
    dhw_lookup = {}
    lighting_lookup = {}
    ventilation_lookup = {}
    zone_sizing_lookup = {}
    groundtemp_lookup = {}

# ---------------------------------------------------------------------------
# REPORTING FUNCTIONS - each function focuses on one dictionary structure
# ---------------------------------------------------------------------------

def report_dhw_lookup(writer):
    """
    Example for dhw_lookup, which might have the structure:
      dhw_lookup = {
        "pre_calibration": {
            "Residential_SingleFamily_Small": {
                "occupant_density_m2_per_person_range": (None, None),
                "liters_per_person_per_day_range": (45.0, 55.0),
                ...
            },
            "Office": { ... }
        },
        "post_calibration": { ... }
      }

    We'll write rows with columns:
      [lookup_name, stage, bldg_key, param_name, min_val, max_val]
    """
    lookup_name = "dhw_lookup"
    for stage, stage_dict in dhw_lookup.items():
        for bldg_key, param_dict in stage_dict.items():
            for param_name, val_range in param_dict.items():
                # Some param ranges may be (None, None), or a single number/schedule
                if isinstance(val_range, tuple) and len(val_range) == 2:
                    min_val, max_val = val_range
                    writer.writerow([lookup_name, stage, bldg_key, param_name, min_val, max_val])
                else:
                    # If it's not a tuple or is a single fixed value
                    writer.writerow([lookup_name, stage, bldg_key, param_name, val_range, val_range])


def report_lighting_lookup(writer):
    """
    Example for lighting_lookup, with structure:
      lighting_lookup = {
        "pre_calibration": {
          "residential": {
            "LIGHTS_WM2_range": (0.0, 0.0),
            "PARASITIC_WM2_range": (0.0, 0.0),
            ...
          },
          "Meeting Function": {...}
        },
        "post_calibration": {...}
      }
    """
    lookup_name = "lighting_lookup"
    for stage, stage_dict in lighting_lookup.items():
        for bldg_subkey, param_dict in stage_dict.items():
            for param_name, val_range in param_dict.items():
                if isinstance(val_range, tuple) and len(val_range) == 2:
                    writer.writerow([lookup_name, stage, bldg_subkey, param_name, val_range[0], val_range[1]])
                else:
                    writer.writerow([lookup_name, stage, bldg_subkey, param_name, val_range, val_range])


def report_ventilation_lookup(writer):
    """
    ventilation_lookup often has a deeper nested structure, e.g.:
      ventilation_lookup = {
        "pre_calibration": {
          "residential_infiltration_range": { "A_corner": (1.0,1.2), ... },
          "non_res_infiltration_range": {...},
          "year_factor_range": {...},
          "system_control_range_res": {
              "A": {"f_ctrl_range": (0.9,1.0)}, ...
          },
          ...
        },
        "post_calibration": {...}
      }
    We'll walk each sub-dict carefully.
    """
    lookup_name = "ventilation_lookup"
    for stage, stage_dict in ventilation_lookup.items():
        # top-level keys (e.g. "residential_infiltration_range", "year_factor_range", etc.)
        for cat_name, cat_data in stage_dict.items():
            # cat_data might be a dict of subkeys or a direct tuple
            if isinstance(cat_data, dict):
                for subkey, val in cat_data.items():
                    # if 'val' is another dict, e.g. "system_control_range_res": { "A": {"f_ctrl_range": (0.9,1.0)}}
                    if isinstance(val, dict):
                        for param_name, param_val in val.items():
                            if isinstance(param_val, tuple) and len(param_val) == 2:
                                writer.writerow([lookup_name, stage, f"{cat_name}:{subkey}",
                                                 param_name, param_val[0], param_val[1]])
                            else:
                                writer.writerow([lookup_name, stage, f"{cat_name}:{subkey}",
                                                 param_name, param_val, param_val])
                    else:
                        # If val is a tuple for infiltration range, etc.
                        if isinstance(val, tuple) and len(val) == 2:
                            writer.writerow([lookup_name, stage, cat_name, subkey, val[0], val[1]])
                        else:
                            writer.writerow([lookup_name, stage, cat_name, subkey, val, val])
            else:
                # cat_data is not a dict => might be a single tuple or number
                if isinstance(cat_data, tuple) and len(cat_data) == 2:
                    writer.writerow([lookup_name, stage, cat_name, "", cat_data[0], cat_data[1]])
                else:
                    writer.writerow([lookup_name, stage, cat_name, "", cat_data, cat_data])


def report_zone_sizing_lookup(writer):
    """
    zone_sizing_lookup = {
      "pre_calibration": {
        "residential": {
          "cooling_supply_air_temp_range": (13.5,14.5),
          "heating_supply_air_temp_range": (48.0,52.0),
          ...
        },
        "non_residential": {...}
      },
      "post_calibration": {...}
    }
    """
    lookup_name = "zone_sizing_lookup"
    for stage, stage_dict in zone_sizing_lookup.items():
        for bldg_func, param_dict in stage_dict.items():
            for param_name, val_range in param_dict.items():
                if isinstance(val_range, tuple) and len(val_range) == 2:
                    writer.writerow([lookup_name, stage, bldg_func, param_name,
                                     val_range[0], val_range[1]])
                else:
                    writer.writerow([lookup_name, stage, bldg_func, param_name,
                                     val_range, val_range])


def report_groundtemp_lookup(writer):
    """
    groundtemp_lookup = {
      "pre_calibration": {
        "January": (2.0,3.0),
        "February": (3.5,5.0),
        ...
      },
      "post_calibration": {...}
    }
    """
    lookup_name = "groundtemp_lookup"
    for stage, month_dict in groundtemp_lookup.items():
        for month_name, val_range in month_dict.items():
            # usually a (min,max) pair
            if isinstance(val_range, tuple) and len(val_range) == 2:
                writer.writerow([lookup_name, stage, month_name, "", val_range[0], val_range[1]])
            else:
                writer.writerow([lookup_name, stage, month_name, "", val_range, val_range])


# ---------------------------------------------------------------------------
# MAIN REPORT FUNCTION
# ---------------------------------------------------------------------------
def main():
    """
    Creates a CSV file `lookup_report.csv` that merges info from multiple lookups.
    Adjust or add more 'report_*' calls as needed.
    """
    with open("lookup_report.csv", "w", newline="") as f:
        writer = csv.writer(f)
        # Write header row
        writer.writerow(["LookupName", "CalibrationStage", "KeyOrCategory", "ParamName", "MinValue", "MaxValue"])

        # Call each specialized reporting function
        report_dhw_lookup(writer)
        report_lighting_lookup(writer)
        report_ventilation_lookup(writer)
        report_zone_sizing_lookup(writer)
        report_groundtemp_lookup(writer)
        # ... Add more if you have additional lookups

    print("[INFO] Finished creating 'lookup_report.csv' with min/max (or fixed) values from all lookups.")


if __name__ == "__main__":
    main()


AttributeError: 'float' object has no attribute 'items'

In [3]:
import os

def export_structure_and_content(base_directory, output_file, extensions=( '.py', '.csv')):   # extensions=('.py', '.csv')
    """
    Recursively walks through base_directory, finds all files with the given extensions,
    and writes directory structure, filenames, and contents to output_file.
    
    :param base_directory: The path of the directory to walk through.
    :param output_file: The path of the text file where results will be written.
    :param extensions: A tuple of file extensions to include.
    """
    with open(output_file, 'w', encoding='utf-8') as out:
        # Walk through the directory structure
        for root, dirs, files in os.walk(base_directory):
            # Write out the current folder name
            out.write(f"Folder: {root}\n")
            
            # Filter out files by the desired extensions
            for file_name in files:
                if file_name.lower().endswith(extensions):
                    file_path = os.path.join(root, file_name)
                    
                    # Write out the file name
                    out.write(f"\n  File: {file_name}\n")
                    
                    # Read and write the file contents
                    try:
                        with open(file_path, 'r', encoding='utf-8') as f:
                            content = f.read()
                    except UnicodeDecodeError:
                        # If there's an encoding issue, try ignoring errors or switch to a different encoding
                        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                            content = f.read()
                    
                    out.write("  --- File Contents Start ---\n")
                    out.write(content)
                    out.write("\n  --- File Contents End ---\n")
            
            out.write("\n" + "="*80 + "\n\n")


if __name__ == "__main__":
    # Example usage:
    # Change 'your_directory_path' to the path you want to scan
    # Change 'output.txt' to the desired output file name
    base_dir = r"D:\Documents\E_Plus_2030_py/ventilation"
    output_txt = "output.txt"
    
    export_structure_and_content(base_dir, output_txt)
    print(f"All .py and .csv files from '{base_dir}' have been exported to '{output_txt}'.")


All .py and .csv files from 'D:\Documents\E_Plus_2030_py/ventilation' have been exported to 'output.txt'.


In [3]:
import os

def export_structure_and_content(
    base_directory,
    output_file,
    extensions=('.py', '.csv'),
    excluded_dirs=None
):
    """
    Recursively walks through base_directory, finds all files with the given
    extensions, and writes directory structure, filenames, and contents
    to output_file, excluding any directories specified in excluded_dirs.
    
    :param base_directory: The path of the directory to walk through.
    :param output_file: The path of the text file where results will be written.
    :param extensions: A tuple of file extensions to include.
    :param excluded_dirs: A list of folder names to exclude from the walk.
    """
    if excluded_dirs is None:
        excluded_dirs = []  # Default to empty list if not provided

    with open(output_file, 'w', encoding='utf-8') as out:
        # Walk through the directory structure
        for root, dirs, files in os.walk(base_directory):
            # In-place removal of excluded directories
            dirs[:] = [d for d in dirs if d not in excluded_dirs]

            # Write out the current folder name
            out.write(f"Folder: {root}\n")
            
            # Filter out files by the desired extensions
            for file_name in files:
                if file_name.lower().endswith(extensions):
                    file_path = os.path.join(root, file_name)
                    
                    # Write out the file name
                    out.write(f"\n  File: {file_name}\n")
                    
                    # Read and write the file contents
                    try:
                        with open(file_path, 'r', encoding='utf-8') as f:
                            content = f.read()
                    except UnicodeDecodeError:
                        # If there's an encoding issue, try ignoring errors or switch to a different encoding
                        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                            content = f.read()
                    
                    out.write("  --- File Contents Start ---\n")
                    out.write(content)
                    out.write("\n  --- File Contents End ---\n")
            
            out.write("\n" + "="*80 + "\n\n")


if __name__ == "__main__":
    # Example usage:
    base_dir = r"D:\Documents\E_Plus_2030_py\idf_objects\ventilation"  # idf_objects\
    output_txt = "output.txt"
    # Add any directories you'd like to exclude here
    exclude_list = ["_pycache_", "aiaia", "calibration", "ZZZz", "shading", "output"]
    
    export_structure_and_content(
        base_dir,
        output_txt,
        extensions=('.py', '.csv', '.json'),
        excluded_dirs=exclude_list
    )
    print(f"All .py and .csv files from '{base_dir}' have been exported to '{output_txt}', excluding folders: {exclude_list}.")


All .py and .csv files from 'D:\Documents\E_Plus_2030_py\idf_objects\ventilation' have been exported to 'output.txt', excluding folders: ['_pycache_', 'aiaia', 'calibration', 'ZZZz', 'shading', 'output'].


i have the following code that i have, i first made it as piot project with test data. now i want to refine the lookup tables and  input dta. so together step by step we will woork on that. my projectis mainly based on archtetype of buildings. it can be different from object to object thatwe will work on it together.

i firasst provide how my input data will be, and te code i have. you first have ann understanding and a view of what we have and will go to next step later. 



these columns from my data can be :


for functions and types and age range it can be:
1.
building_function	residential_type	non_residential_type
residential	Corner House	null
non_residential	null	Meeting Function
residential	Apartment	null
residential	Terrace or Semi-detached House
non_residential	null	Healthcare Function
non_residential	null	Sport Function
non_residential	null	Cell Function
non_residential	null	Retail Function
residential	Detached House	null
non_residential	null	Industrial Function
residential	Two-and-a-half-story House
non_residential	null	Accommodation Function
non_residential	null	Office Function
non_residential	null	Education Function
non_residential	null	Other Use Function
2.
age_range
2015 and later
1992 - 2005
1945 - 1964
1975 - 1991
1965 - 1974
2006 - 2014
< 1945




ok, first lets fo gor geomz. as yyou can see, we have:
1. default values in geometry_lookup
2. an excel file and overriding that cantotally change this lookup table at beginnging after imports. 
3. a partially temporary overridings that can happen by user config


so, if py works as i want, then lets work on Lookup expansions. 

so,, how the look up table should be. 

also, how i need to provide my excel file. 

and also how the user configs can be 







Loads defaults from dictionaries,
Optionally overrides from an Excel file once at startup,
Optionally overrides on a per-building basis via user_config_XXX lists,
Then uses that final set of parameters to build an EnergyPlus model.

## Structuring

In [2]:
import os

def generate_tree_report(root_dir, indent=""):
    items = os.listdir(root_dir)
    # Sort to ensure consistent order (folders first, then files)
    items.sort(key=lambda x: (not os.path.isdir(os.path.join(root_dir, x)), x.lower()))

    for i, item in enumerate(items):
        path = os.path.join(root_dir, item)
        is_last_item = (i == len(items) - 1)

        # Choose prefix depending on whether this is the last item in the list
        tree_prefix = "└── " if is_last_item else "├── "
        # Print current item
        print(indent + tree_prefix + item)

        if os.path.isdir(path):
            # For child items, increase indentation
            new_indent = indent + ("    " if is_last_item else "│   ")
            generate_tree_report(path, new_indent)

if __name__ == "__main__":
    root_directory = r"D:\Documents\E_Plus_2030_py\aiaia"
    print(root_directory)
    generate_tree_report(root_directory)


D:\Documents\E_Plus_2030_py
├── .git
│   ├── hooks
│   │   ├── applypatch-msg.sample
│   │   ├── commit-msg.sample
│   │   ├── fsmonitor-watchman.sample
│   │   ├── post-update.sample
│   │   ├── pre-applypatch.sample
│   │   ├── pre-commit.sample
│   │   ├── pre-merge-commit.sample
│   │   ├── pre-push.sample
│   │   ├── pre-rebase.sample
│   │   ├── pre-receive.sample
│   │   ├── prepare-commit-msg.sample
│   │   ├── push-to-checkout.sample
│   │   ├── sendemail-validate.sample
│   │   └── update.sample
│   ├── info
│   │   └── exclude
│   ├── logs
│   │   ├── refs
│   │   │   ├── heads
│   │   │   │   └── main
│   │   │   └── remotes
│   │   │       └── origin
│   │   │           └── main
│   │   └── HEAD
│   ├── objects
│   │   ├── 01
│   │   │   └── 7a694b51a029189d21272cc10432ba641f2fad
│   │   ├── 02
│   │   │   └── 3f0df67ac93b04c6e91a5613757ba46545faec
│   │   ├── 03
│   │   │   ├── a9e515917f82b88a5d4dfdb7857740bccc2745
│   │   │   └── e59397c65382f5cef44eb637c61433bd9b7676
│

In [3]:
import os
import ast

def parse_python_file(file_path):
    """
    Parse a Python file using ast and return a dictionary of:
      - imports
      - from_imports
      - function names
      - class names
    """
    info = {
        "imports": [],
        "from_imports": [],
        "functions": [],
        "classes": []
    }
    
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            code = f.read()
        tree = ast.parse(code, file_path)
    except (SyntaxError, UnicodeDecodeError) as e:
        # In case there's a problematic file or encoding issue, skip gracefully
        info["error"] = str(e)
        return info

    # Walk the AST nodes to find imports, functions, and classes
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            # e.g. import os, import sys
            for alias in node.names:
                info["imports"].append(alias.name)
        elif isinstance(node, ast.ImportFrom):
            # e.g. from sys import argv
            module_name = node.module
            for alias in node.names:
                info["from_imports"].append((module_name, alias.name))
        elif isinstance(node, ast.FunctionDef):
            info["functions"].append(node.name)
        elif isinstance(node, ast.ClassDef):
            info["classes"].append(node.name)

    return info


def generate_tree_and_analysis(root_dir, indent=""):
    """
    Recursively walk through the folder structure starting at root_dir.
    Print a tree structure, and for each .py file, parse and report its contents.
    """
    items = sorted(os.listdir(root_dir), key=lambda x: (not os.path.isdir(os.path.join(root_dir, x)), x.lower()))
    for i, item in enumerate(items):
        path = os.path.join(root_dir, item)
        is_last_item = (i == len(items) - 1)

        # Tree icon prefix
        tree_prefix = "└── " if is_last_item else "├── "
        print(indent + tree_prefix + item)

        if os.path.isdir(path):
            # Recursively explore subfolders
            new_indent = indent + ("    " if is_last_item else "│   ")
            generate_tree_and_analysis(path, new_indent)
        else:
            # If it's a Python file, parse and show metadata
            if item.endswith(".py"):
                parse_info = parse_python_file(path)
                show_file_analysis(parse_info, indent + ("    " if is_last_item else "│   "))


def show_file_analysis(parse_info, indent):
    """
    Print the parse results (imports, from_imports, functions, classes)
    with indentation for a tree-like display.
    """
    # If there was an error parsing, report it
    if "error" in parse_info:
        print(f"{indent}└── [Error parsing file: {parse_info['error']}]")
        return

    # Show imports
    if parse_info["imports"]:
        print(f"{indent}└── Imports:")
        for idx, imp in enumerate(parse_info["imports"]):
            prefix = "    " if idx == len(parse_info["imports"]) - 1 else "│   "
            print(f"{indent}    {prefix}{imp}")

    # Show from-imports
    if parse_info["from_imports"]:
        print(f"{indent}└── From Imports:")
        for idx, (mod, name) in enumerate(parse_info["from_imports"]):
            prefix = "    " if idx == len(parse_info["from_imports"]) - 1 else "│   "
            print(f"{indent}    {prefix}from {mod} import {name}")

    # Show functions
    if parse_info["functions"]:
        print(f"{indent}└── Functions:")
        for idx, func in enumerate(parse_info["functions"]):
            prefix = "    " if idx == len(parse_info["functions"]) - 1 else "│   "
            print(f"{indent}    {prefix}{func}")

    # Show classes
    if parse_info["classes"]:
        print(f"{indent}└── Classes:")
        for idx, cls in enumerate(parse_info["classes"]):
            prefix = "    " if idx == len(parse_info["classes"]) - 1 else "│   "
            print(f"{indent}    {prefix}{cls}")


if __name__ == "__main__":
    root_directory = r"D:\Documents\E_Plus_2030_py"
    print(f"Mapping folder structure and Python file contents for:\n{root_directory}\n")
    generate_tree_and_analysis(root_directory)


Mapping folder structure and Python file contents for:
D:\Documents\E_Plus_2030_py

├── .git
│   ├── hooks
│   │   ├── applypatch-msg.sample
│   │   ├── commit-msg.sample
│   │   ├── fsmonitor-watchman.sample
│   │   ├── post-update.sample
│   │   ├── pre-applypatch.sample
│   │   ├── pre-commit.sample
│   │   ├── pre-merge-commit.sample
│   │   ├── pre-push.sample
│   │   ├── pre-rebase.sample
│   │   ├── pre-receive.sample
│   │   ├── prepare-commit-msg.sample
│   │   ├── push-to-checkout.sample
│   │   ├── sendemail-validate.sample
│   │   └── update.sample
│   ├── info
│   │   └── exclude
│   ├── logs
│   │   ├── refs
│   │   │   ├── heads
│   │   │   │   └── main
│   │   │   └── remotes
│   │   │       └── origin
│   │   │           └── main
│   │   └── HEAD
│   ├── objects
│   │   ├── 01
│   │   │   └── 7a694b51a029189d21272cc10432ba641f2fad
│   │   ├── 02
│   │   │   └── 3f0df67ac93b04c6e91a5613757ba46545faec
│   │   ├── 03
│   │   │   ├── a9e515917f82b88a5d4dfdb7857740bccc2745


In [9]:
import os
import ast

def parse_python_file(file_path):
    """
    Parse a Python file using ast and return a dictionary of:
      - imports
      - from_imports
      - function names
      - class names
      - csv_inputs  (detected .csv files read in)
      - csv_outputs (detected .csv files written to)
      - excel_inputs  (detected .xls/.xlsx files read in)
      - excel_outputs (detected .xls/.xlsx files written to)
    """
    info = {
        "imports": [],
        "from_imports": [],
        "functions": [],
        "classes": [],
        "csv_inputs": [],
        "csv_outputs": [],
        "excel_inputs": [],
        "excel_outputs": []
    }

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            code = f.read()
        tree = ast.parse(code, file_path)
    except (SyntaxError, UnicodeDecodeError) as e:
        # In case there's a problematic file or encoding issue, skip gracefully
        info["error"] = f"Parsing error: {e}"
        return info

    # 1. Collect imports, from-imports, function names, and class names
    for node in ast.walk(tree):
        # Imports
        if isinstance(node, ast.Import):
            for alias in node.names:
                info["imports"].append(alias.name)
        elif isinstance(node, ast.ImportFrom):
            module_name = node.module
            for alias in node.names:
                info["from_imports"].append((module_name, alias.name))
        elif isinstance(node, ast.FunctionDef):
            info["functions"].append(node.name)
        elif isinstance(node, ast.ClassDef):
            info["classes"].append(node.name)

    # 2. Attempt to detect CSV/Excel reading & writing
    #    We’ll look for function calls to open/read_csv/read_excel/to_csv/to_excel
    #    and capture string arguments that look like *.csv, *.xls, *.xlsx
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            # The function being called could be open(), or an attribute (e.g., pd.read_csv)
            func = node.func

            # Check if it's "open(...)"
            if isinstance(func, ast.Name) and func.id == "open":
                # e.g., open("mydata.csv", "r")
                filename = _get_first_str_arg(node)
                if filename:
                    if filename.lower().endswith(".csv"):
                        # Check if 'w' or 'a' in the mode to guess it's output
                        mode = _get_mode_arg(node)
                        if mode and any(m in mode for m in ("w", "a", "+")):
                            info["csv_outputs"].append(filename)
                        else:
                            info["csv_inputs"].append(filename)

            # Check if it's an attribute call, e.g. pd.read_csv or df.to_csv
            elif isinstance(func, ast.Attribute):
                attr_name = func.attr.lower()  # e.g. "read_csv", "to_csv", "read_excel", etc.

                # read_csv / read_excel
                if attr_name in ("read_csv", "read_excel"):
                    filename = _get_first_str_arg(node)
                    if filename:
                        if filename.lower().endswith(".csv"):
                            info["csv_inputs"].append(filename)
                        elif filename.lower().endswith(".xls") or filename.lower().endswith(".xlsx"):
                            info["excel_inputs"].append(filename)

                # to_csv / to_excel
                elif attr_name in ("to_csv", "to_excel"):
                    # Usually the filename is the first argument
                    filename = _get_first_str_arg(node)
                    if filename:
                        if filename.lower().endswith(".csv"):
                            info["csv_outputs"].append(filename)
                        elif filename.lower().endswith(".xls") or filename.lower().endswith(".xlsx"):
                            info["excel_outputs"].append(filename)

    return info


def _get_first_str_arg(call_node):
    """
    Helper function to return the first argument of a function call if it’s a string literal.
    E.g. open("data.csv") => "data.csv"
    """
    if call_node.args:
        first_arg = call_node.args[0]
        if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
            return first_arg.value
        elif isinstance(first_arg, ast.Str):  # For older Python versions (<3.8)
            return first_arg.s
    return None


def _get_mode_arg(call_node):
    """
    If the function call has a second positional argument that might be the mode (e.g., 'r', 'w'),
    return that. This is specific to open(...) usage.
    """
    if len(call_node.args) > 1:
        second_arg = call_node.args[1]
        if isinstance(second_arg, ast.Constant) and isinstance(second_arg.value, str):
            return second_arg.value
        elif isinstance(second_arg, ast.Str):  # Python <3.8
            return second_arg.s
    return None


def generate_tree_and_analysis(root_dir, indent=""):
    """
    Recursively walk through the folder structure starting at root_dir.
    Print a tree structure, and for each .py file, parse and show metadata
    about imports, functions, classes, and CSV/Excel usage.
    """
    items = sorted(
        os.listdir(root_dir),
        key=lambda x: (not os.path.isdir(os.path.join(root_dir, x)), x.lower())
    )

    for i, item in enumerate(items):
        path = os.path.join(root_dir, item)
        is_last_item = (i == len(items) - 1)

        tree_prefix = "└── " if is_last_item else "├── "
        print(indent + tree_prefix + item)

        if os.path.isdir(path):
            new_indent = indent + ("    " if is_last_item else "│   ")
            generate_tree_and_analysis(path, new_indent)
        else:
            if item.endswith(".py"):
                parse_info = parse_python_file(path)
                show_file_analysis(parse_info, indent + ("    " if is_last_item else "│   "))


def show_file_analysis(parse_info, indent):
    """
    Print the parse results in a tree-like display.
    """

    if "error" in parse_info:
        print(f"{indent}└── [Error parsing file: {parse_info['error']}]")
        return

    # Show standard info
    _print_list(parse_info["imports"], "Imports", indent)
    _print_list(
        [f"from {mod} import {name}" for (mod, name) in parse_info["from_imports"]],
        "From Imports",
        indent
    )
    _print_list(parse_info["functions"], "Functions", indent)
    _print_list(parse_info["classes"], "Classes", indent)

    # Show CSV/Excel usage
    _print_list(parse_info["csv_inputs"], "CSV Inputs", indent)
    _print_list(parse_info["csv_outputs"], "CSV Outputs", indent)
    _print_list(parse_info["excel_inputs"], "Excel Inputs", indent)
    _print_list(parse_info["excel_outputs"], "Excel Outputs", indent)


def _print_list(items, label, indent):
    """Helper to print a label and list of items in a tree-like manner."""
    if not items:
        return
    print(f"{indent}└── {label}:")
    for i, val in enumerate(items):
        is_last = (i == len(items) - 1)
        prefix = "    " if is_last else "│   "
        print(f"{indent}    {prefix}{val}")


if __name__ == "__main__":
    root_directory = r"D:\Documents\E_Plus_2030_py"
    print(f"Mapping folder structure and Python file contents for:\n{root_directory}\n")
    generate_tree_and_analysis(root_directory)


Mapping folder structure and Python file contents for:
D:\Documents\E_Plus_2030_py

├── .git
│   ├── hooks
│   │   ├── applypatch-msg.sample
│   │   ├── commit-msg.sample
│   │   ├── fsmonitor-watchman.sample
│   │   ├── post-update.sample
│   │   ├── pre-applypatch.sample
│   │   ├── pre-commit.sample
│   │   ├── pre-merge-commit.sample
│   │   ├── pre-push.sample
│   │   ├── pre-rebase.sample
│   │   ├── pre-receive.sample
│   │   ├── prepare-commit-msg.sample
│   │   ├── push-to-checkout.sample
│   │   ├── sendemail-validate.sample
│   │   └── update.sample
│   ├── info
│   │   └── exclude
│   ├── logs
│   │   ├── refs
│   │   │   ├── heads
│   │   │   │   └── main
│   │   │   └── remotes
│   │   │       └── origin
│   │   │           └── main
│   │   └── HEAD
│   ├── objects
│   │   ├── 01
│   │   │   └── 7a694b51a029189d21272cc10432ba641f2fad
│   │   ├── 02
│   │   │   └── 3f0df67ac93b04c6e91a5613757ba46545faec
│   │   ├── 03
│   │   │   ├── a9e515917f82b88a5d4dfdb7857740bccc2745


In [3]:
import os
import ast
import csv

def parse_python_file(file_path, root_dir):
    """
    Parse a Python file using ast and return a dictionary of:
      - imports
      - from_imports
      - functions
      - classes
      - csv_inputs  (detected .csv files read in)
      - csv_outputs (detected .csv files written to)
      - excel_inputs  (detected .xls/.xlsx files read in)
      - excel_outputs (detected .xls/.xlsx files written to)
      - csv_columns  (columns for each input CSV we can actually open)
    """
    info = {
        "imports": [],
        "from_imports": [],
        "functions": [],
        "classes": [],
        "csv_inputs": [],
        "csv_outputs": [],
        "excel_inputs": [],
        "excel_outputs": [],
        "csv_columns": {}  # dict: filename -> [list_of_columns]
    }

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            code = f.read()
        tree = ast.parse(code, file_path)
    except (SyntaxError, UnicodeDecodeError) as e:
        info["error"] = f"Parsing error: {e}"
        return info

    # 1. Collect imports, from-imports, function names, and class names
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                info["imports"].append(alias.name)
        elif isinstance(node, ast.ImportFrom):
            module_name = node.module
            for alias in node.names:
                info["from_imports"].append((module_name, alias.name))
        elif isinstance(node, ast.FunctionDef):
            info["functions"].append(node.name)
        elif isinstance(node, ast.ClassDef):
            info["classes"].append(node.name)

    # 2. Attempt to detect CSV/Excel reading & writing
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            func = node.func

            # open(...)
            if isinstance(func, ast.Name) and func.id == "open":
                filename = _get_first_str_arg(node)
                if filename and filename.lower().endswith(".csv"):
                    mode = _get_mode_arg(node)
                    if mode and any(m in mode for m in ("w", "a", "+")):
                        info["csv_outputs"].append(filename)
                    else:
                        info["csv_inputs"].append(filename)

            # e.g., pd.read_csv, pd.read_excel, df.to_csv, df.to_excel
            elif isinstance(func, ast.Attribute):
                attr_name = func.attr.lower()
                # read_csv / read_excel
                if attr_name in ("read_csv", "read_excel"):
                    filename = _get_first_str_arg(node)
                    if filename:
                        if filename.lower().endswith(".csv"):
                            info["csv_inputs"].append(filename)
                        elif filename.lower().endswith(".xls") or filename.lower().endswith(".xlsx"):
                            info["excel_inputs"].append(filename)
                # to_csv / to_excel
                elif attr_name in ("to_csv", "to_excel"):
                    filename = _get_first_str_arg(node)
                    if filename:
                        if filename.lower().endswith(".csv"):
                            info["csv_outputs"].append(filename)
                        elif filename.lower().endswith(".xls") or filename.lower().endswith(".xlsx"):
                            info["excel_outputs"].append(filename)

    # 3. Try to read the columns from each CSV input
    for csv_file in info["csv_inputs"]:
        # Attempt to locate the CSV file on disk
        #  - If csv_file is absolute, we'll use that path.
        #  - If csv_file is relative, we'll join it with root_dir or with the folder of the .py file.
        possible_path = _resolve_path(csv_file, file_path, root_dir)
        try:
            if os.path.exists(possible_path):
                columns = _get_csv_columns(possible_path)
                if columns:
                    info["csv_columns"][csv_file] = columns
                else:
                    info["csv_columns"][csv_file] = ["[No columns found or file empty]"]
            else:
                info["csv_columns"][csv_file] = [f"[File not found: {possible_path}]"]
        except Exception as e:
            info["csv_columns"][csv_file] = [f"[Error reading file: {e}]"]

    return info

def _get_first_str_arg(call_node):
    """Return the first argument as a string if it’s a literal."""
    if call_node.args:
        first_arg = call_node.args[0]
        # For Python 3.8+:
        if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
            return first_arg.value
        # For older Python versions (< 3.8):
        elif isinstance(first_arg, ast.Str):
            return first_arg.s
    return None

def _get_mode_arg(call_node):
    """Return the second argument (mode) if it’s a string literal (for open(...))."""
    if len(call_node.args) > 1:
        second_arg = call_node.args[1]
        if isinstance(second_arg, ast.Constant) and isinstance(second_arg.value, str):
            return second_arg.value
        elif isinstance(second_arg, ast.Str):
            return second_arg.s
    return None

def _resolve_path(csv_file, py_file_path, root_dir):
    """
    Try to figure out a path to the CSV file.
    1) If csv_file is absolute, return it directly.
    2) Otherwise, try relative to the folder containing the Python script.
    3) If that doesn't exist, try relative to the overall root_dir.
    """
    if os.path.isabs(csv_file):
        return csv_file

    # Path of the folder containing the .py file
    py_folder = os.path.dirname(py_file_path)

    # Try relative to the .py file's folder
    candidate1 = os.path.join(py_folder, csv_file)
    if os.path.exists(candidate1):
        return candidate1

    # If that doesn't exist, try relative to the overall root_dir
    candidate2 = os.path.join(root_dir, csv_file)
    return candidate2

def _get_csv_columns(path_to_csv):
    """
    Read the header row of a CSV and return a list of column names.
    We only read the first line or so, to avoid big memory usage.
    """
    with open(path_to_csv, "r", newline="", encoding="utf-8") as f:
        # We'll try using csv.DictReader to interpret headers:
        reader = csv.DictReader(f)
        return reader.fieldnames  # returns a list of column names

def generate_tree_and_analysis(root_dir, indent=""):
    """
    Recursively walk through the folder structure starting at root_dir.
    Print a tree structure, and for each .py file, parse and show metadata
    about imports, functions, classes, CSV/Excel usage, and CSV columns.
    """
    items = sorted(
        os.listdir(root_dir),
        key=lambda x: (not os.path.isdir(os.path.join(root_dir, x)), x.lower())
    )

    for i, item in enumerate(items):
        path = os.path.join(root_dir, item)
        is_last_item = (i == len(items) - 1)
        tree_prefix = "└── " if is_last_item else "├── "
        print(indent + tree_prefix + item)

        if os.path.isdir(path):
            new_indent = indent + ("    " if is_last_item else "│   ")
            generate_tree_and_analysis(path, new_indent)
        else:
            if item.endswith(".py"):
                parse_info = parse_python_file(path, root_dir)
                show_file_analysis(parse_info, indent + ("    " if is_last_item else "│   "))

def show_file_analysis(parse_info, indent):
    """Print the parse results in a tree-like format."""
    if "error" in parse_info:
        print(f"{indent}└── [Error parsing file: {parse_info['error']}]")
        return

    _print_list(parse_info["imports"], "Imports", indent)
    _print_list(
        [f"from {mod} import {name}" for (mod, name) in parse_info["from_imports"]],
        "From Imports",
        indent
    )
    _print_list(parse_info["functions"], "Functions", indent)
    _print_list(parse_info["classes"], "Classes", indent)
    _print_list(parse_info["csv_inputs"], "CSV Inputs", indent)
    _print_list(parse_info["csv_outputs"], "CSV Outputs", indent)
    _print_list(parse_info["excel_inputs"], "Excel Inputs", indent)
    _print_list(parse_info["excel_outputs"], "Excel Outputs", indent)

    # Now, print out any discovered CSV columns
    if parse_info["csv_columns"]:
        print(f"{indent}└── CSV Columns:")
        for i, (csv_file, columns) in enumerate(parse_info["csv_columns"].items()):
            is_last = (i == len(parse_info["csv_columns"]) - 1)
            prefix = "    " if is_last else "│   "
            print(f"{indent}    {prefix}{csv_file}:")
            if not columns:
                print(f"{indent}    {prefix}    [No columns detected]")
            else:
                for j, col in enumerate(columns):
                    col_prefix = "       " if is_last and j == len(columns) - 1 else "│      "
                    # If there's only one item left, break the lines properly
                    # But let's keep it simple:
                    print(f"{indent}    {prefix}    - {col}")

def _print_list(items, label, indent):
    """Helper to print a label and list of items in a tree-like manner."""
    if not items:
        return
    print(f"{indent}└── {label}:")
    for i, val in enumerate(items):
        is_last = (i == len(items) - 1)
        prefix = "    " if is_last else "│   "
        print(f"{indent}    {prefix}{val}")


if __name__ == "__main__":
    # Adjust this path to your folder
    root_directory = r"D:\Documents\E_Plus_2030_py"
    print(f"Mapping folder structure and Python file contents for:\n{root_directory}\n")
    generate_tree_and_analysis(root_directory)


Mapping folder structure and Python file contents for:
D:\Documents\E_Plus_2030_py

├── .git
│   ├── hooks
│   │   ├── applypatch-msg.sample
│   │   ├── commit-msg.sample
│   │   ├── fsmonitor-watchman.sample
│   │   ├── post-update.sample
│   │   ├── pre-applypatch.sample
│   │   ├── pre-commit.sample
│   │   ├── pre-merge-commit.sample
│   │   ├── pre-push.sample
│   │   ├── pre-rebase.sample
│   │   ├── pre-receive.sample
│   │   ├── prepare-commit-msg.sample
│   │   ├── push-to-checkout.sample
│   │   ├── sendemail-validate.sample
│   │   └── update.sample
│   ├── info
│   │   └── exclude
│   ├── logs
│   │   ├── refs
│   │   │   ├── heads
│   │   │   │   └── main
│   │   │   └── remotes
│   │   │       └── origin
│   │   │           └── main
│   │   └── HEAD
│   ├── objects
│   │   ├── 00
│   │   │   ├── 01ee0f6b96a9f4545f61f74f2d4eff091ba50b
│   │   │   └── 23487c20a3f512631072d1a67402d454f6b774
│   │   ├── 01
│   │   │   ├── 57c36ba5e9b95dc93ace829037970455452f82
│   │   │   └──

In [2]:
import os
import glob
import pandas as pd

def read_files_in_folder(folder_path,
                         row_range=None,
                         col_range=None,
                         col_names=None):
    """
    Reads all .xlsx and .csv files in the given folder and
    prints out data slices based on row_range, col_range, and/or col_names.

    Parameters:
    -----------
    folder_path : str
        The path to the folder containing .xlsx or .csv files.

    row_range : tuple or None, default=None
        A tuple (start_row, end_row) for slicing rows by index.
        e.g. (0, 5) -> the first 5 rows.

    col_range : tuple or None, default=None
        A tuple (start_col, end_col) for slicing columns by index.
        e.g. (0, 3) -> columns at index 0, 1, 2.

    col_names : list or None, default=None
        A list of column names to select, e.g. ["Name", "Age"].

    Usage Examples:
    --------------
    read_files_in_folder(r"C:\MyData",
                         row_range=(0, 5),
                         col_range=(1, 4),
                         col_names=["ColumnA", "ColumnB"])
    """

    # 1. Grab all Excel files ending with ".xlsx"
    excel_files = glob.glob(os.path.join(folder_path, "*.xlsx"))
    # 2. Grab all CSV files ending with ".csv"
    csv_files = glob.glob(os.path.join(folder_path, "*.csv"))

    # Combine the two lists
    all_files = excel_files + csv_files

    # 3. Loop through each file found
    for file_path in all_files:
        # Determine if this is an Excel file or CSV file
        if file_path.lower().endswith(".xlsx"):
            df = pd.read_excel(file_path)
        elif file_path.lower().endswith(".csv"):
            df = pd.read_csv(file_path)
        else:
            # Skip any file that doesn't match .xlsx or .csv
            continue

        # Print file name for clarity
        print(f"\n=== Reading file: {os.path.basename(file_path)} ===")
        
        # ------------------------------------
        # A) Apply row slicing by index if row_range is given
        if row_range is not None:
            start_row, end_row = row_range
            df = df.iloc[start_row:end_row, :]  # Slicing the rows
        
        # B) Apply column slicing by index if col_range is given
        if col_range is not None:
            start_col, end_col = col_range
            df = df.iloc[:, start_col:end_col]  # Slicing the columns

        # C) If col_names is provided, select only those columns by name
        #    (making sure they exist in the DataFrame)
        if col_names is not None:
            # Filter out any col_names that might not be in df.columns
            existing_cols = [c for c in col_names if c in df.columns]
            if existing_cols:
                df = df[existing_cols]
            else:
                print("Warning: None of the requested columns found in this file.")

        # Finally, print the resulting slice
        print(df)

# --------------------------------------------------------------------------
# USAGE EXAMPLE
if __name__ == "__main__":
    folder_path = r"D:\Documents\E_Plus_2030_py\lookup_xlx"
    
    # Example 1: View first 5 rows, columns 0 to 3
    # read_files_in_folder(folder_path, row_range=(0, 5), col_range=(0, 3))

    # Example 2: View first 5 rows, but only columns named "ColumnA" and "ColumnB"
    # read_files_in_folder(folder_path, row_range=(0, 5), col_names=["ColumnA", "ColumnB"])

    # Example 3: Combine row index range and column index range and column names
    # *Typically you'd choose either col_range or col_names, but here's how you'd do both:
    # read_files_in_folder(folder_path,
    #                      row_range=(0, 10),
    #                      col_range=(2, 5),
    #                      col_names=["SomeColumn", "AnotherColumn"])

    # For a real run, un-comment one of the lines above or pass your own parameters:
    read_files_in_folder(folder_path, row_range=(0, 5), col_range=(0, 3))



=== Reading file: dhw_lookup.xlsx ===
            section_type             key_name subkey_name
0  TABLE_13_1_KWH_PER_M2     Meeting Function         NaN
1  TABLE_13_1_KWH_PER_M2      Office Function         NaN
2  TABLE_13_1_KWH_PER_M2      Retail Function         NaN
3  TABLE_13_1_KWH_PER_M2  Healthcare Function         NaN
4  TABLE_13_1_KWH_PER_M2   Education Function         NaN

=== Reading file: elec_schedules.xlsx ===
  building_category building_subtype day_type
0       Residential     Corner House  weekday
1       Residential     Corner House  weekday
2       Residential     Corner House  weekday
3       Residential     Corner House  weekday
4       Residential     Corner House  weekday

=== Reading file: epw_lookup.xlsx ===
                                           file_path  year    lat
0  C:/Users/aminj/OneDrive/Desktop/EnergyPlus/Wea...  2018  52.12
1  C:/Users/aminj/OneDrive/Desktop/EnergyPlus/Wea...  2020  52.15
2  C:/Users/aminj/OneDrive/Desktop/EnergyPlus/Wea...  205

## Excel CSV Files

In [2]:
import os
import glob
import pandas as pd

def create_file_report(directory: str, report_output: str = None):
    """
    Reads all CSV and Excel files from the specified directory,
    extracts the first 5 rows, and creates a pandas DataFrame with:
      1. File Path
      2. A string representation of the first 5 rows
    Optionally writes the result to a CSV file if `report_output` is provided.
    
    :param directory: The path to the directory to search for files.
    :param report_output: Path to an output CSV file. If None, 
                          the DataFrame is only returned (not saved).
    :return: pandas DataFrame containing the report.
    """
    
    # Define patterns for CSV and Excel files
    csv_pattern = os.path.join(directory, "*.csv")
    excel_pattern_xlsx = os.path.join(directory, "*.xlsx")
    excel_pattern_xls = os.path.join(directory, "*.xls")
    excel_pattern_xlsm = os.path.join(directory, "*.xlsm")

    # Gather all file paths
    file_paths = glob.glob(csv_pattern) \
                 + glob.glob(excel_pattern_xlsx) \
                 + glob.glob(excel_pattern_xls) \
                 + glob.glob(excel_pattern_xlsm)

    # (Optional) If you need subdirectories as well, you can do:
    # file_paths = glob.glob(os.path.join(directory, "**", "*.csv"), recursive=True)
    # ... and similarly for Excel patterns, all in one list.

    report_data = []

    for path in file_paths:
        try:
            # Read only first 5 rows
            if path.lower().endswith(".csv"):
                df = pd.read_csv(path, nrows=5)
            else:
                # For Excel files
                df = pd.read_excel(path, nrows=5)

            # Convert the first 5 rows to a string
            preview_str = df.to_string(index=False)

            # Append to our report
            report_data.append({
                "File Path": path,
                "Preview (first 5 rows)": preview_str
            })

        except Exception as e:
            # If there is a problem reading the file, you could
            # either skip or log an error message
            print(f"Could not read file: {path}, due to error: {e}")
            continue

    # Create a DataFrame from collected data
    report_df = pd.DataFrame(report_data)

    # If user wants to output to a CSV, save it
    if report_output:
        report_df.to_csv(report_output, index=False)

    return report_df


if __name__ == "__main__":
    # Example usage:
    directory_to_search = r"D:\Documents\E_Plus_2030_py\output\assigned"
    output_csv = r"D:\Documents\E_Plus_2030_py\report.csv"

    # Call the function (the result is also returned as a DataFrame)
    final_report_df = create_file_report(directory=directory_to_search, report_output=output_csv)

    # Optionally print the DataFrame to the console
    print(final_report_df)


                                            File Path  \
0   D:\Documents\E_Plus_2030_py\output\assigned\as...   
1   D:\Documents\E_Plus_2030_py\output\assigned\as...   
2   D:\Documents\E_Plus_2030_py\output\assigned\as...   
3   D:\Documents\E_Plus_2030_py\output\assigned\as...   
4   D:\Documents\E_Plus_2030_py\output\assigned\as...   
5   D:\Documents\E_Plus_2030_py\output\assigned\as...   
6   D:\Documents\E_Plus_2030_py\output\assigned\as...   
7   D:\Documents\E_Plus_2030_py\output\assigned\as...   
8   D:\Documents\E_Plus_2030_py\output\assigned\as...   
9   D:\Documents\E_Plus_2030_py\output\assigned\as...   
10  D:\Documents\E_Plus_2030_py\output\assigned\st...   
11  D:\Documents\E_Plus_2030_py\output\assigned\st...   

                               Preview (first 5 rows)  
0    ogc_fid                           param_name ...  
1    ogc_fid             param_name    assigned_va...  
2    ogc_fid            param_name     assigned_va...  
3    ogc_fid                   par

In [None]:
import os
import glob
import pandas as pd

def create_file_report_txt(directory: str, report_txt_output: str):
    """
    Recursively reads all CSV and Excel files from the specified directory
    (including subfolders), extracts the first 5 rows, and writes a text report.

    :param directory: The path to the directory to search for files.
    :param report_txt_output: Path to the output .txt file where the report will be written.
    """
    # Define file patterns (using '**' and recursive=True to include subfolders)
    csv_pattern = os.path.join(directory, "**", "*.csv")
    excel_pattern_xlsx = os.path.join(directory, "**", "*.xlsx")
    excel_pattern_xls = os.path.join(directory, "**", "*.xls")
    excel_pattern_xlsm = os.path.join(directory, "**", "*.xlsm")

    # Gather all file paths
    file_paths = []
    file_paths += glob.glob(csv_pattern, recursive=True)
    file_paths += glob.glob(excel_pattern_xlsx, recursive=True)
    file_paths += glob.glob(excel_pattern_xls, recursive=True)
    file_paths += glob.glob(excel_pattern_xlsm, recursive=True)

    # Sort file_paths if you need consistent order
    file_paths.sort()

    with open(report_txt_output, 'w', encoding='utf-8') as txt_file:
        for path in file_paths:
            try:
                # Read only first 5 rows
                if path.lower().endswith(".csv"):
                    df = pd.read_csv(path, nrows=5)
                else:
                    # For Excel files
                    df = pd.read_excel(path, nrows=5)

                # Convert the first 5 rows to a string
                preview_str = df.to_string(index=False)

                # Write to our text report
                txt_file.write(f"File Path: {path}\n")
                txt_file.write("Preview (first 5 rows):\n")
                txt_file.write(preview_str + "\n")
                txt_file.write("=" * 80 + "\n\n")  # Separator line

            except Exception as e:
                # Write an error message and continue
                txt_file.write(f"Could not read file: {path}\n")
                txt_file.write(f"Error: {str(e)}\n")
                txt_file.write("=" * 80 + "\n\n")
                continue

if __name__ == "__main__":
    directory_to_search = r"D:\Documents\E_Plus_2030_py"
    # Choose where you want to save the report.txt
    report_txt = r"D:\Documents\E_Plus_2030_py\report.txt"
    
    create_file_report_txt(directory=directory_to_search, report_txt_output=report_txt)
    print(f"Report generated at: {report_txt}")


In [None]:
import os
import glob
import pandas as pd

def create_file_report(directories, report_output=None, recursive=False):
    """
    Reads all CSV and Excel files from one or multiple directories,
    extracts the first 5 rows from each file, and returns a DataFrame report.
    The report contains:
      1. File Path
      2. A string representation of the first 5 rows

    Optionally writes the report to a CSV file if `report_output` is provided.

    :param directories: A list of directory paths (or a single string for one directory).
    :param report_output: Path to an output CSV file. If None, the DataFrame is not saved.
    :param recursive: If True, searches subdirectories as well.
    :return: pandas DataFrame containing the consolidated report.
    """
    
    # If a single directory is provided, convert it into a list
    if isinstance(directories, str):
        directories = [directories]

    file_paths = []

    # Collect file paths from each directory
    for directory in directories:
        # Ensure the directory actually exists
        if not os.path.isdir(directory):
            print(f"Warning: {directory} is not a valid directory.")
            continue

        if recursive:
            # Search directories (and subdirectories) for CSV & Excel files
            csv_pattern = os.path.join(directory, "**", "*.csv")
            xlsx_pattern = os.path.join(directory, "**", "*.xlsx")
            xls_pattern = os.path.join(directory, "**", "*.xls")
            xlsm_pattern = os.path.join(directory, "**", "*.xlsm")

            file_paths.extend(glob.glob(csv_pattern, recursive=True))
            file_paths.extend(glob.glob(xlsx_pattern, recursive=True))
            file_paths.extend(glob.glob(xls_pattern, recursive=True))
            file_paths.extend(glob.glob(xlsm_pattern, recursive=True))

        else:
            # Search only the top-level directory for CSV & Excel files
            csv_pattern = os.path.join(directory, "*.csv")
            xlsx_pattern = os.path.join(directory, "*.xlsx")
            xls_pattern = os.path.join(directory, "*.xls")
            xlsm_pattern = os.path.join(directory, "*.xlsm")

            file_paths.extend(glob.glob(csv_pattern))
            file_paths.extend(glob.glob(xlsx_pattern))
            file_paths.extend(glob.glob(xls_pattern))
            file_paths.extend(glob.glob(xlsm_pattern))

    # Remove duplicates if the same file is found multiple times
    file_paths = list(set(file_paths))

    report_data = []

    # Read each file and collect the first 5 rows
    for path in file_paths:
        try:
            if path.lower().endswith(".csv"):
                df = pd.read_csv(path, nrows=5)
            else:
                df = pd.read_excel(path, nrows=5)

            preview_str = df.to_string(index=False)

            report_data.append({
                "File Path": path,
                "Preview (first 5 rows)": preview_str
            })
        except Exception as e:
            print(f"Could not read file: {path}\n  Error: {e}")

    # Create a report DataFrame
    report_df = pd.DataFrame(report_data)

    # Optionally save to CSV
    if report_output:
        report_df.to_csv(report_output, index=False)
        print(f"Report saved to: {report_output}")

    return report_df


if __name__ == "__main__":
    # Example usage:

    # You can specify one or multiple folders:
    folders_to_search = [
        r"C:\Path\To\Folder1",
        r"C:\Path\To\Folder2"
    ]

    # If desired, include a path to save the final report:
    output_csv_path = r"C:\Path\To\output_report.csv"

    # Set `recursive=True` if you want to include subfolders
    df_report = create_file_report(directories=folders_to_search,
                                   report_output=output_csv_path,
                                   recursive=True)
    
    # Preview in console
    print(df_report)


In [4]:
import os
import ast
import json

def analyze_file(filepath):
    """
    Parse a Python file and extract its functions and classes.

    Args:
        filepath (str): Full path to the Python file.

    Returns:
        dict: Dictionary containing lists of functions and classes with their docstrings.
    """
    try:
        with open(filepath, "r", encoding="utf-8") as file:
            file_content = file.read()
        tree = ast.parse(file_content, filename=filepath)
    except Exception as e:
        print(f"Failed to parse {filepath}: {e}")
        return None

    file_info = {"functions": [], "classes": []}

    # Walk through all nodes in the AST
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            # Capture top-level functions (and also functions within classes)
            file_info["functions"].append({
                "name": node.name,
                "doc": ast.get_docstring(node)
            })
        elif isinstance(node, ast.ClassDef):
            # For classes, also collect their methods
            class_info = {
                "name": node.name,
                "doc": ast.get_docstring(node),
                "methods": []
            }
            for child in node.body:
                if isinstance(child, ast.FunctionDef):
                    class_info["methods"].append({
                        "name": child.name,
                        "doc": ast.get_docstring(child)
                    })
            file_info["classes"].append(class_info)
    return file_info

def analyze_directory(root_path):
    """
    Walk through the directory and analyze every Python file.

    Args:
        root_path (str): Root directory to start analysis.

    Returns:
        dict: A mapping of file paths to their analysis info.
    """
    analysis = {}
    for dirpath, dirnames, filenames in os.walk(root_path):
        for filename in filenames:
            if filename.endswith(".py"):
                filepath = os.path.join(dirpath, filename)
                file_analysis = analyze_file(filepath)
                if file_analysis is not None:
                    analysis[filepath] = file_analysis
    return analysis

if __name__ == "__main__":
    # Set the target directory path
    target_path = r"D:\Documents\E_Plus_2030_py"
    
    # Analyze the directory
    analysis_result = analyze_directory(target_path)
    
    # Output the analysis to a JSON file
    output_file = "D:\Documents\E_Plus_2030_py\code_analysis.json"
    try:
        with open(output_file, "w", encoding="utf-8") as out_file:
            json.dump(analysis_result, out_file, indent=4)
        print(f"Analysis complete. Results saved to {output_file}")
    except Exception as e:
        print(f"Failed to write analysis to file: {e}")


Failed to parse D:\Documents\E_Plus_2030_py\__pycache__\data_materials_non_residential.py: unexpected EOF while parsing (data_materials_non_residential.py, line 72)
Failed to parse D:\Documents\E_Plus_2030_py\__pycache__\data_materials_residential.py: unexpected EOF while parsing (data_materials_residential.py, line 62)
Analysis complete. Results saved to D:\Documents\E_Plus_2030_py\code_analysis.json


In [6]:
import os
import ast
import json
import networkx as nx
import matplotlib.pyplot as plt

def analyze_file(filepath):
    """
    Parse a Python file to extract functions, classes, and import statements.
    
    Args:
        filepath (str): Full path to the Python file.
    
    Returns:
        dict: Contains lists of functions, classes, and imported modules.
    """
    try:
        with open(filepath, "r", encoding="utf-8") as file:
            file_content = file.read()
        tree = ast.parse(file_content, filename=filepath)
    except Exception as e:
        print(f"Failed to parse {filepath}: {e}")
        return None

    file_info = {
        "functions": [],
        "classes": [],
        "imports": []
    }

    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            file_info["functions"].append({
                "name": node.name,
                "doc": ast.get_docstring(node)
            })
        elif isinstance(node, ast.ClassDef):
            class_info = {
                "name": node.name,
                "doc": ast.get_docstring(node),
                "methods": []
            }
            for child in node.body:
                if isinstance(child, ast.FunctionDef):
                    class_info["methods"].append({
                        "name": child.name,
                        "doc": ast.get_docstring(child)
                    })
            file_info["classes"].append(class_info)
        elif isinstance(node, ast.Import):
            for alias in node.names:
                file_info["imports"].append(alias.name)
        elif isinstance(node, ast.ImportFrom):
            # When level > 0, it means relative import.
            if node.module:
                module_name = "." * node.level + node.module
            else:
                module_name = "." * node.level
            file_info["imports"].append(module_name)
    
    return file_info

def analyze_directory(root_path):
    """
    Walk through the directory and analyze every Python file.
    
    Args:
        root_path (str): Root directory to start analysis.
    
    Returns:
        dict: A mapping of file paths to their analysis info.
    """
    analysis = {}
    for dirpath, dirnames, filenames in os.walk(root_path):
        for filename in filenames:
            if filename.endswith(".py"):
                filepath = os.path.join(dirpath, filename)
                file_analysis = analyze_file(filepath)
                if file_analysis is not None:
                    analysis[filepath] = file_analysis
    return analysis

def build_dependency_graph(analysis, root_path):
    """
    Build a dependency graph from the analysis info. The graph nodes are Python files (relative paths).
    An edge from A to B means that file A imports a module that might correspond to file B.
    
    Args:
        analysis (dict): Analysis information keyed by file paths.
        root_path (str): The root directory to consider for local modules.
    
    Returns:
        networkx.DiGraph: The dependency graph.
    """
    G = nx.DiGraph()

    # Create a mapping of module names to file paths for local files.
    module_to_file = {}
    for filepath in analysis.keys():
        # Create a module name relative to root_path, replacing os.sep with dot and removing .py extension.
        rel_path = os.path.relpath(filepath, root_path)
        module_name = os.path.splitext(rel_path)[0].replace(os.sep, ".")
        module_to_file[module_name] = filepath
        G.add_node(module_name)

    # For each file, try to match its imports to local modules.
    for filepath, info in analysis.items():
        rel_path = os.path.relpath(filepath, root_path)
        source_module = os.path.splitext(rel_path)[0].replace(os.sep, ".")
        for imp in info["imports"]:
            # Skip absolute imports that clearly refer to external libraries.
            # Here, we try a simple check: if the import module is in our mapping, consider it local.
            # Also support relative imports (starting with '.').
            if imp.startswith("."):
                # Handle relative import: count the dots and reconstruct module name.
                level = len(imp) - len(imp.lstrip("."))
                base_module_parts = source_module.split(".")[:-level] if level <= len(source_module.split(".")) else []
                remainder = imp.lstrip(".")
                if remainder:
                    target_module = ".".join(base_module_parts + [remainder])
                else:
                    target_module = ".".join(base_module_parts)
            else:
                target_module = imp
            
            if target_module in module_to_file:
                G.add_edge(source_module, target_module)
    
    return G

def draw_and_save_graph(G, output_image="dependency_graph.png"):
    """
    Draw the dependency graph using networkx and matplotlib and save it as an image.
    
    Args:
        G (networkx.DiGraph): The dependency graph.
        output_image (str): Filename for the saved graph image.
    """
    plt.figure(figsize=(12, 12))
    pos = nx.spring_layout(G, k=0.5, iterations=50)
    nx.draw(G, pos, with_labels=True, node_color="lightblue", edge_color="gray", node_size=2000, font_size=10, arrowsize=20)
    plt.title("Local Module Dependency Graph")
    plt.tight_layout()
    plt.savefig(output_image)
    plt.close()
    print(f"Dependency graph saved as {output_image}")

if __name__ == "__main__":
    # Set the target directory path
    target_path = r"D:\Documents\E_Plus_2030_py"
    
    # Analyze the directory
    analysis_result = analyze_directory(target_path)
    
    # Save the analysis output to a JSON file
    output_json = "D:\Documents\E_Plus_2030_py\code_analysis.json"
    try:
        with open(output_json, "w", encoding="utf-8") as out_file:
            json.dump(analysis_result, out_file, indent=4)
        print(f"Analysis complete. Results saved to {output_json}")
    except Exception as e:
        print(f"Failed to write analysis to file: {e}")
    
    # Build and save the dependency graph
    dep_graph = build_dependency_graph(analysis_result, target_path)
    draw_and_save_graph(dep_graph, output_image="D:\Documents\E_Plus_2030_py\dependency_graph.png")


Failed to parse D:\Documents\E_Plus_2030_py\__pycache__\data_materials_non_residential.py: unexpected EOF while parsing (data_materials_non_residential.py, line 72)
Failed to parse D:\Documents\E_Plus_2030_py\__pycache__\data_materials_residential.py: unexpected EOF while parsing (data_materials_residential.py, line 62)
Analysis complete. Results saved to D:\Documents\E_Plus_2030_py\code_analysis.json


  plt.tight_layout()


Dependency graph saved as D:\Documents\E_Plus_2030_py\dependency_graph.png


In [14]:
import os
import ast
import textwrap
from collections import defaultdict

class FunctionCallVisitor(ast.NodeVisitor):
    """
    Visits all nodes in a function's AST body to find Call nodes.
    We'll store a list of function or attribute names that get called.
    """
    def __init__(self):
        self.calls = []

    def visit_Call(self, node):
        """
        For a call node, check if it's a direct Name() call or an Attribute() call,
        e.g. function_name(...) or obj.method(...).
        """
        func = node.func
        call_name = None

        if isinstance(func, ast.Name):
            # Direct call: e.g. function_name(...)
            call_name = func.id
        elif isinstance(func, ast.Attribute):
            # For an attribute call: e.g. object.method(...)
            # We'll store it as 'object.method' for simplicity
            attr_chain = []
            curr = func
            while isinstance(curr, ast.Attribute):
                attr_chain.append(curr.attr)
                curr = curr.value
            if isinstance(curr, ast.Name):
                attr_chain.append(curr.id)
            attr_chain.reverse()
            call_name = ".".join(attr_chain)

        if call_name:
            self.calls.append(call_name)

        self.generic_visit(node)

def parse_python_file(file_path):
    """
    Parse a single Python file with the ast module.
    Returns a dictionary with structure and basic relationship info:
      {
        'imports': [list of imported modules],
        'classes': [
            {
                'name': <class_name>,
                'docstring': <class_docstring or None>,
                'bases': [base classes],
                'methods': [
                    {
                        'name': <method_name>,
                        'docstring': <method_docstring or None>,
                        'calls': [list of function/attr calls found]
                    },
                    ...
                ]
            },
            ...
        ],
        'functions': [
            {
                'name': <function_name>,
                'docstring': <function_docstring or None>,
                'calls': [list of function/attr calls found]
            },
            ...
        ]
      }
    """
    summary = {
        'imports': [],
        'classes': [],
        'functions': []
    }

    # Read file contents
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            file_content = f.read()
    except (IOError, UnicodeDecodeError):
        # If we can't read the file (binary or encoding issue), just skip
        return summary

    # Parse the abstract syntax tree
    try:
        tree = ast.parse(file_content, filename=file_path)
    except SyntaxError:
        # Skip files that may not parse as valid Python
        return summary

    # Gather imports
    for node in ast.iter_child_nodes(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                summary['imports'].append(alias.name)
        elif isinstance(node, ast.ImportFrom):
            # e.g. from X import Y
            base_module = node.module if node.module else ""
            for alias in node.names:
                import_str = f"{base_module}.{alias.name}".strip(".")
                summary['imports'].append(import_str)

    # Traverse top-level nodes for classes & functions
    for node in ast.iter_child_nodes(tree):
        if isinstance(node, ast.ClassDef):
            class_name = node.name
            class_docstring = ast.get_docstring(node)

            # Capture base classes
            bases_list = []
            for base in node.bases:
                if isinstance(base, ast.Name):
                    bases_list.append(base.id)
                elif isinstance(base, ast.Attribute):
                    # Build a dotted string
                    parts = []
                    curr = base
                    while isinstance(curr, ast.Attribute):
                        parts.append(curr.attr)
                        curr = curr.value
                    if isinstance(curr, ast.Name):
                        parts.append(curr.id)
                    parts.reverse()
                    dotted_base = ".".join(parts)
                    bases_list.append(dotted_base)

            # Collect methods
            methods = []
            for body_item in node.body:
                if isinstance(body_item, ast.FunctionDef):
                    method_docstring = ast.get_docstring(body_item)
                    call_visitor = FunctionCallVisitor()
                    call_visitor.visit(body_item)
                    methods.append({
                        'name': body_item.name,
                        'docstring': method_docstring,
                        'calls': call_visitor.calls
                    })

            summary['classes'].append({
                'name': class_name,
                'docstring': class_docstring,
                'bases': bases_list,
                'methods': methods
            })

        elif isinstance(node, ast.FunctionDef):
            func_name = node.name
            func_docstring = ast.get_docstring(node)

            call_visitor = FunctionCallVisitor()
            call_visitor.visit(node)
            summary['functions'].append({
                'name': func_name,
                'docstring': func_docstring,
                'calls': call_visitor.calls
            })

    return summary

def scan_folder(root_dir):
    """
    Recursively scan the directory `root_dir` and parse all .py files.
    Returns a dict mapping from file path to the parsed summary.
    """
    code_summary = {}
    for current_path, dirs, files in os.walk(root_dir):
        for filename in files:
            if filename.endswith('.py'):
                full_path = os.path.join(current_path, filename)
                file_info = parse_python_file(full_path)

                if (file_info['imports'] or 
                    file_info['classes'] or 
                    file_info['functions']):
                    code_summary[full_path] = file_info
    return code_summary

def print_summary(code_summary):
    """
    Print the summary in a more readable, hierarchical format.
    """
    for file_path, details in code_summary.items():
        rel_path = os.path.relpath(file_path)
        print(f"\n=== File: {rel_path} ===")

        if details['imports']:
            print("  Imports:")
            for imp in details['imports']:
                print(f"    - {imp}")

        for cls in details['classes']:
            print(f"  Class: {cls['name']}")
            if cls['bases']:
                print(f"    Inherits from: {cls['bases']}")
            if cls['docstring']:
                doc_lines = textwrap.indent(textwrap.dedent(cls['docstring']), ' ' * 6)
                print(f"    Docstring:\n{doc_lines}")
            if cls['methods']:
                print(f"    Methods:")
                for m in cls['methods']:
                    print(f"      - {m['name']}")
                    if m['docstring']:
                        mdoc = textwrap.indent(textwrap.dedent(m['docstring']), ' ' * 8)
                        print(f"        Docstring:\n{mdoc}")
                    if m['calls']:
                        print(f"        Calls: {m['calls']}")

        for func in details['functions']:
            print(f"  Function: {func['name']}")
            if func['docstring']:
                fdoc = textwrap.indent(textwrap.dedent(func['docstring']), ' ' * 6)
                print(f"    Docstring:\n{fdoc}")
            if func['calls']:
                print(f"    Calls: {func['calls']}")

# ---------------- VISUALIZATION ----------------

def visualize_inheritance(code_summary, output_file='inheritance.dot'):
    """
    Create a Graphviz DOT file showing class inheritance across all files.
    Each class is a node labeled 'FileName:ClassName'.
    We draw edges: SubClass -> BaseClass (arrow upwards).
    """
    lines = [
        'digraph Inheritance {',
        '  rankdir="BT";  // draw arrow from bottom to top for inheritance',
        '  node [shape=record, style=filled, fillcolor=lightblue];'
    ]

    # We'll store a unique ID for each class as "FileRelativePath:ClassName"
    # Then create edges from subclass to each base.
    for file_path, details in code_summary.items():
        file_name = os.path.basename(file_path)
        for cls in details['classes']:
            subclass_id = f"\"{file_name}:{cls['name']}\""
            # Declare this node
            lines.append(f"  {subclass_id} [label=\"{cls['name']}\\n({file_name})\"]")
            # For each base, draw an edge
            for base in cls['bases']:
                # We'll label the base similarly, though it might not exist in this codebase.
                base_id = f"\"{base}\""
                # Draw an edge: subclass -> base
                lines.append(f"  {subclass_id} -> {base_id};")

    lines.append("}")
    dot_content = "\n".join(lines)

    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(dot_content)
    print(f"[+] Inheritance graph saved to {output_file}")

def visualize_function_calls(code_summary, output_file='function_calls.dot'):
    """
    Create a Graphviz DOT file for function/method calls within each file.
    We'll treat each top-level function or class-method as a node:
      'FileName:FunctionName' or 'FileName:ClassName.methodName'
    Then draw edges from caller -> callee.
    Note: We only do naive textual matching here (e.g., 'some_function' -> 'some_function').
    """
    lines = [
        'digraph FunctionCalls {',
        '  rankdir="LR";  // left-to-right layout',
        '  node [shape=ellipse, style=filled, fillcolor=lightgoldenrod];'
    ]

    # We'll build a dictionary so we know which "canonical name" belongs to which node
    # to match calls. For example, if we have "my_func" at top level, the node is "File:my_func".
    # If we have "MyClass.my_method", the node is "File:MyClass.my_method".
    defined_nodes = set()

    # 1) Collect definitions
    for file_path, details in code_summary.items():
        filename = os.path.basename(file_path)
        
        # Top-level functions
        for func in details['functions']:
            node_name = f"{filename}:{func['name']}"
            defined_nodes.add(func['name'])  # We'll match calls by pure function name
            # Add a node
            lines.append(f"  \"{node_name}\" [label=\"{func['name']}\\n({filename})\"];")

        # Class methods
        for cls in details['classes']:
            for m in cls['methods']:
                node_name = f"{filename}:{cls['name']}.{m['name']}"
                defined_nodes.add(m['name'])  # naive approach: just store method name
                # Add a node
                lines.append(f"  \"{node_name}\" [label=\"{cls['name']}.{m['name']}\\n({filename})\"];")

    # 2) Create edges based on calls
    # We'll do naive matching: if 'calls' list contains 'foo', we try to find a node whose name ends with :foo or .foo
    for file_path, details in code_summary.items():
        filename = os.path.basename(file_path)

        # Top-level functions
        for func in details['functions']:
            caller_node = f"{filename}:{func['name']}"
            for callee in func['calls']:
                # If callee is something we recognized as a function/method name
                # We'll connect the caller to a node that has that name in its label
                # Real resolution is more complicated in practice.
                callee_matches = []
                # For a naive approach, just check if callee is in defined_nodes
                if callee.split('.')[-1] in defined_nodes:
                    # We can't reliably know the *exact* node name across multiple files/classes
                    # so we'll just produce a best-effort edge label with the callee text.
                    # Alternatively, we can skip if we can't map it exactly.
                    callee_node_label = callee  # keep the text
                    lines.append(f"  \"{caller_node}\" -> \"{callee_node_label}\" [label=\"calls\"];")

        # Class methods
        for cls in details['classes']:
            for m in cls['methods']:
                caller_node = f"{filename}:{cls['name']}.{m['name']}"
                for callee in m['calls']:
                    if callee.split('.')[-1] in defined_nodes:
                        lines.append(f"  \"{caller_node}\" -> \"{callee}\" [label=\"calls\"];")

    lines.append("}")
    dot_content = "\n".join(lines)

    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(dot_content)
    print(f"[+] Function call graph saved to {output_file}")

def visualize_imports(code_summary, output_file='imports.dot'):
    """
    Create a Graphviz DOT file to show which files import which modules.
    We'll have a node for each file, and a node for each imported module name.
    Then draw edges file -> module.
    """
    lines = [
        'digraph Imports {',
        '  rankdir="LR";',
        '  node [shape=box, style=filled, fillcolor=lightcyan];'
    ]

    file_nodes = set()
    import_nodes = set()

    for file_path, details in code_summary.items():
        filename = os.path.basename(file_path)
        file_nodes.add(filename)
        for imp in details['imports']:
            import_nodes.add(imp)

    # Declare file nodes
    for fn in file_nodes:
        lines.append(f'  \"File:{fn}\" [label=\"{fn}\", shape=folder];')
    # Declare module nodes
    for mod in import_nodes:
        lines.append(f'  \"Module:{mod}\" [label=\"{mod}\", shape=note, fillcolor=lightgray];')

    # Edges
    for file_path, details in code_summary.items():
        filename = os.path.basename(file_path)
        for imp in details['imports']:
            file_node = f"\"File:{filename}\""
            mod_node = f"\"Module:{imp}\""
            lines.append(f"  {file_node} -> {mod_node};")

    lines.append("}")
    dot_content = "\n".join(lines)

    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(dot_content)
    print(f"[+] Imports graph saved to {output_file}")

if __name__ == "__main__":
    # UPDATE THIS PATH:
    ROOT_DIR = r"D:\Documents\E_Plus_2030_py"

    # 1) Scan the codebase
    summary_data = scan_folder(ROOT_DIR)

    # 2) Print summary to console (optional)
    print_summary(summary_data)

    # 3) Generate Graphviz DOT files
    visualize_inheritance(summary_data, 'D:\Documents\E_Plus_2030_py\inheritance.dot')
    visualize_function_calls(summary_data, 'D:\Documents\E_Plus_2030_py/function_calls.dot')
    visualize_imports(summary_data, 'D:\Documents\E_Plus_2030_py\imports.dot')



=== File: ..\database_handler.py ===
  Imports:
    - os
    - pandas
    - sqlalchemy.create_engine
    - sqlalchemy.text
  Function: load_buildings_from_db
    Docstring:
      Connect to the PostgreSQL database (credentials from environment variables),
      build a SQL query for building data, apply optional filters, and return a
      pandas DataFrame.

      filter_criteria (dict) may include:
      ------------------------------------------------------
      {
        "postcodes": ["1011AB", "1053PJ", ...],   # list of multiple postcodes
        "ids": [1001, 1002, 1003],               # list of ogc_fid
        "pand_ids": ["XYZ123", "XYZ456"],        # list of pand_id if needed
        "bbox_xy": [min_x, min_y, max_x, max_y], # bounding box in X/Y
        "bbox_latlon": [min_lat, min_lon, max_lat, max_lon] # bounding box in lat/lon
      }
      ------------------------------------------------------

      For example:
        "bbox_xy": [120000.0, 487000.0, 121000.0, 488000.0

In [2]:
import os
import json

def show_json_files(directory):
    # List all files in the given directory
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            file_path = os.path.join(directory, filename)
            print("JSON file path:", file_path)
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                print("Content:")
                print(json.dumps(data, indent=4))
            except json.JSONDecodeError as e:
                print("Error reading JSON from", file_path, ":", e)
            except Exception as e:
                print("An error occurred while processing", file_path, ":", e)
            print("-" * 40)

if __name__ == "__main__":
    # Specify the folder path here
    directory = r"D:\Documents\E_Plus_2030_py\user_configs"
    show_json_files(directory)


JSON file path: D:\Documents\E_Plus_2030_py\user_configs\dhw.json
Content:
{
    "dhw": [
        {
            "building_id": 4136730,
            "param_name": "occupant_density_m2_per_person",
            "min_val": 127.0,
            "max_val": 233.0
        },
        {
            "building_id": 4136730,
            "param_name": "liters_per_person_per_day",
            "fixed_value": 145.0
        },
        {
            "building_function": "residential",
            "age_range": "1992-2005",
            "param_name": "setpoint_c",
            "min_val": 58.0,
            "max_val": 60.0
        },
        {
            "building_function": "non_residential",
            "param_name": "occupant_density_m2_per_person",
            "min_val": 12.0,
            "max_val": 18.0
        }
    ]
}
----------------------------------------
JSON file path: D:\Documents\E_Plus_2030_py\user_configs\epw.json
Content:
{
    "epw": [
        {
            "building_id": 4136730,
           

In [43]:
def combine_files_from_list(list_file, output_file):
    # Read file paths from the given file (one path per line)
    with open(list_file, "r", encoding="utf-8") as file:
        file_paths = [line.strip() for line in file if line.strip()]
    
    with open(output_file, "w", encoding="utf-8") as out_file:
        for path in file_paths:
            # Write the file path as a header
            out_file.write(f"File: {path}\n")
            out_file.write("=" * 60 + "\n")
            try:
                # Open and read the content of the current file
                with open(path, "r", encoding="utf-8") as f:
                    content = f.read()
                    out_file.write(content)
            except Exception as e:
                # Write an error message if the file can't be read
                out_file.write(f"Error reading file: {e}\n")
            out_file.write("\n" + "-" * 60 + "\n\n")

if __name__ == "__main__":
    # File that contains the list of paths (one per line) in the given format
    list_file = "file_paths.txt"
    # Output file where combined content will be saved
    output_file = "D:\Documents\E_Plus_2030_py\ZZZz\combined_files.txt"
    
    combine_files_from_list(list_file, output_file)
    print(f"Combined file created: {output_file}")


Combined file created: D:\Documents\E_Plus_2030_py\ZZZz\combined_files.txt


In [1]:
import sys
import os

# Path to the directory containing 'check_files.py'
sys.path.append(r"D:\Documents\E_Plus_2030_py\ZZZz")

from check_files import check_number_of_columns_and_extract, check_number_of_rows_and_extract

# Check columns
check_number_of_columns_and_extract(
    file_list_path=r"D:\Documents\E_Plus_2030_py\ZZZz\file_list.txt", 
    expected_columns=5, 
    output_txt_path=r"D:\Documents\E_Plus_2030_py\ZZZz\c_columns_report.txt"
)

# Check rows
check_number_of_rows_and_extract(
    file_list_path=r"D:\Documents\E_Plus_2030_py\ZZZz\file_list.txt", 
    expected_rows=5, 
    output_txt_path=r"D:\Documents\E_Plus_2030_py\ZZZz\c_rows_report.txt"
)
