In [1]:
from common_imports import *
show_home_button()
from db_connection import get_engine
engine = get_engine()

▶ Python executable: c:\Users\StanvanBon\Miniconda3\envs\energymonitor_env\python.exe
▶ dotenv module: c:\Users\StanvanBon\Miniconda3\envs\energymonitor_env\Lib\site-packages\dotenv\__init__.py


HBox(children=(Button(button_style='info', description='Terug naar Startscherm', icon='home', layout=Layout(he…

Output()

In [2]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)                   
                                                                              
class DataRetriever:
    """
    Zorgt voor het ophalen van registrators op basis van een EAN
    en het uitvoeren van de dynamische pivot-query op basis van
    de geselecteerde registrator.
    """
    def __init__(self, engine: sqlalchemy.engine.base.Engine):
        self.engine = engine

    def get_registrators_by_ean(self, ean: str) -> pd.DataFrame:
        query_text = text("""
            SELECT 
                rg.Id AS RegistratorID,
                rg.Description AS RegistratorDescription
            FROM TBL_Registrator rg
            INNER JOIN TBL_Register re ON re.RegistratorId = rg.Id
            INNER JOIN TBL_ConnectionPoint cp ON cp.ID = re.ConnectionPointId
            WHERE cp.EAN_ConnectionPoint = :ean
            GROUP BY rg.Id, rg.Description
        """)
        with self.engine.connect() as conn:
            df = pd.read_sql(query_text, conn, params={"ean": ean})
        return df

    def get_distinct_registers(self, registrator_id: int, periode_begin: str, periode_end: str) -> pd.DataFrame:
        query_text = text("""
            SELECT DISTINCT
                re.ID AS RegisterID,
                re.[Description] AS RegisterDescription
            FROM TBL_Data da
            JOIN TBL_Register re  ON re.ID = da.RegisterID
            JOIN TBL_ConnectionPoint cp ON cp.ID = re.ConnectionPointId
            JOIN TBL_Registrator rg ON rg.Id = re.RegistratorId
            JOIN TBL_Dialerlist dl ON dl.RegistratorID = rg.Id
            JOIN TBL_CrossProjectIdObjectId cpo ON cpo.ObjectId = cp.ObjectId
            JOIN TBL_EnergyMonitorProject emp ON emp.ID = cpo.ProjectID
            JOIN TBL_Object ob ON ob.Id = cp.ObjectId
            WHERE rg.Id = :registrator_id
              AND re.Typeid IN (1005, 1007, 1016, 1022)
              AND da.[period] BETWEEN :pbegin AND :pend
        """)
        params = {
            "registrator_id": int(registrator_id),
            "pbegin": periode_begin,
            "pend":   periode_end
        }
        with self.engine.connect() as conn:
            df = pd.read_sql_query(query_text, conn, params=params)
        return df

    def build_pivot_sql(self, distinct_df: pd.DataFrame,
                        registrator_id: int,
                        periode_begin: str,
                        periode_end: str,
                        include_status_t: bool) -> str:
        if distinct_df.empty:
            return ""
        pivot_in_list = []
        select_col_list = []
        for _, row in distinct_df.iterrows():
            reg_id = row["RegisterID"]
            descr  = row["RegisterDescription"]
            pivot_in_list.append(f"[{reg_id}]")
            alias_text = f"{descr} - {reg_id}".replace(" ", "_")
            select_col_list.append(f"[{reg_id}] AS [{alias_text}]")
        pivot_in_part    = ",".join(pivot_in_list)
        select_cols_part = ",".join(select_col_list)
        cte_filter_condition = "" if include_status_t else "WHERE HasT = 0"
        pivot_sql = f"""
        WITH cteMerged AS
        (
            SELECT
                cp.EAN_ConnectionPoint       AS EAN,
                rg.[Description]             AS Opnemer,
                da.[period]                  AS period,
                re.ID                        AS RegisterID,
                MAX(CASE WHEN da.statusid = 'T' THEN 1 ELSE 0 END) AS HasT,
                MAX(da.consumption)          AS consumption
            FROM TBL_Data da
            JOIN TBL_Register re  ON re.ID = da.RegisterID
            JOIN TBL_ConnectionPoint cp ON cp.ID = re.ConnectionPointId
            JOIN TBL_Registrator rg ON rg.Id = re.RegistratorId
            JOIN TBL_Dialerlist dl ON dl.RegistratorID = rg.Id
            JOIN TBL_CrossProjectIdObjectId cpo ON cpo.ObjectId = cp.ObjectId
            JOIN TBL_EnergyMonitorProject emp ON emp.ID = cpo.ProjectID
            JOIN TBL_Object ob ON ob.Id = cp.ObjectId
            WHERE rg.Id = {int(registrator_id)}
              AND re.Typeid IN (1005, 1007, 1016, 1022)
              AND da.[period] BETWEEN '{periode_begin}' AND '{periode_end}'
            GROUP BY
                cp.EAN_ConnectionPoint,
                rg.[Description],
                da.[period],
                re.ID
        ),
        cteFiltered AS
        (
            SELECT *
            FROM cteMerged
            {cte_filter_condition}
        )
        SELECT
            pvt.EAN AS EAN_ConnectionPoint,
            pvt.Opnemer,
            pvt.period,
            CASE WHEN agg.HasT = 1 THEN 'T' ELSE 'N/A' END AS statusid,
            {select_cols_part}
        FROM
        (
            SELECT EAN, Opnemer, period, RegisterID, consumption
            FROM cteFiltered
        ) AS src
        PIVOT
        (
            MAX(consumption)
            FOR RegisterID IN ({pivot_in_part})
        ) AS pvt
        JOIN
        (
            SELECT EAN, Opnemer, period, MAX(HasT) AS HasT
            FROM cteFiltered
            GROUP BY EAN, Opnemer, period
        ) AS agg
          ON pvt.EAN = agg.EAN AND pvt.Opnemer = agg.Opnemer AND pvt.period = agg.period
        ORDER BY
            pvt.EAN, pvt.period;
        """
        return pivot_sql

    def get_pivoted_data(self, registrator_id: int,
                         periode_begin: str,
                         periode_end: str,
                         include_status_t: bool = False) -> pd.DataFrame:
        distinct_df = self.get_distinct_registers(registrator_id, periode_begin, periode_end)
        if distinct_df.empty:
            return pd.DataFrame()
        pivot_sql = self.build_pivot_sql(distinct_df, registrator_id, periode_begin, periode_end, include_status_t)
        if not pivot_sql.strip():
            return pd.DataFrame()
        with self.engine.connect() as conn:
            df_pivot = pd.read_sql(pivot_sql, conn)
        return df_pivot                  
                                                                              
def export_to_csv(df: pd.DataFrame, filename: str) -> bool:
    if df.empty:
        logger.warning("Lege DataFrame - geen CSV-export.")
        return False
    df.to_csv(filename, index=False, encoding='utf-8')
    logger.info(f"CSV geëxporteerd: {filename}")
    return True

def export_to_excel(df: pd.DataFrame, filename: str) -> bool:
    if df.empty:
        logger.warning("Lege DataFrame - geen XLSX-export.")
        return False
    try:
        with pd.ExcelWriter(filename, engine='xlsxwriter', datetime_format='yyyy-mm-dd HH:MM:SS') as writer:
            df.to_excel(writer, sheet_name='PivotData', index=False)
            workbook  = writer.book
            worksheet = writer.sheets['PivotData']
            header_format = workbook.add_format({
                'bold': True,
                'text_wrap': True,
                'align': 'center',
                'valign': 'middle',
                'fg_color': '#F2F2F2',
                'border': 1,
                'font_name': 'Arial',
                'font_size': 10
            })
            data_format = workbook.add_format({
                'border': 1,
                'align': 'center',
                'valign': 'middle',
                'font_name': 'Arial',
                'font_size': 10
            })
            df_cols = df.columns.tolist()
            for col_num, col_name in enumerate(df_cols):
                worksheet.write(0, col_num, col_name, header_format)
                col_width = max(15, len(str(col_name)) + 2)
                worksheet.set_column(col_num, col_num, col_width, data_format)
            worksheet.set_row(0, 38)
            worksheet.freeze_panes(1, 1)
        logger.info(f"Excel geëxporteerd: {filename}")
        return True
    except Exception as e:
        logger.error(f"Fout bij Excel-export: {e}")
        return False

                                                               
class PivotUIManager:
    def __init__(self, engine: sqlalchemy.engine.base.Engine):
        self.engine = engine
        self.data_retriever = DataRetriever(engine)
        self.current_df = pd.DataFrame()

                        
        self.output_widget = widgets.Output()
        self.data_output = widgets.Output(
            layout={'border': '1px solid #ccc', 'max_height': '300px', 'overflow': 'auto'}
        )
        self.message_output = widgets.Output(
            layout={'border': '1px solid #ccc', 'padding': '10px', 'margin': '10px 0px'}
        )
                    
        self.ean_input = widgets.Text(
            value="", description='EAN:',
            layout=widgets.Layout(width='200px'),
            placeholder="Vul een EAN in..."
        )

                     
        today    = date.today()
        tomorrow = today + timedelta(days=1)
        self.start_picker = widgets.DatePicker(
            description='Start:',
            value=today,
            layout=widgets.Layout(width='250px')
        )
        self.end_picker = widgets.DatePicker(
            description='End:',
            value=tomorrow,
            layout=widgets.Layout(width='250px')
        )

                                    
        self.registrator_dropdown = widgets.Dropdown(
            description="Registrator:",
            options=[],
            layout=widgets.Layout(width='300px'),
            disabled=True
        )

                                                                
        self.include_status_t_checkbox = widgets.Checkbox(
            value=False,
            description="Neem records met Statusid 'T' op",
            indent=False
        )
                                                  
        self.generate_default_message_checkbox = widgets.Checkbox(
            value=True,
            description="Genereer standaardbericht",
            indent=False
        )
                                          
        self.optional_filters_container = widgets.VBox(
            [self.include_status_t_checkbox, self.generate_default_message_checkbox],
            layout=widgets.Layout(border='1px solid #ccc', padding='10px', margin='10px 0px')
        )

                 
        self.find_registrators_button = widgets.Button(
            description="Zoek registrators",
            button_style='info',
            icon='search',
        )
        self.find_registrators_button.on_click(self.on_find_registrators_clicked)

        self.fetch_pivot_button = widgets.Button(
            description="Haal data op",
            button_style='info',
            icon='database',
            disabled=True
        )
        self.fetch_pivot_button.on_click(self.on_fetch_pivot_clicked)

        self.download_csv_button = widgets.Button(
            description="Download CSV",
            button_style='primary',
            icon='download',
            disabled=True
        )
        self.download_xlsx_button = widgets.Button(
            description="Download XLSX",
            button_style='primary',
            icon='file-excel-o',
            disabled=True
        )
        self.download_csv_button.on_click(self.on_download_csv)
        self.download_xlsx_button.on_click(self.on_download_xlsx)

                                                
        self.copy_message_button = widgets.Button(
            description="Kopieer Bericht",
            button_style="primary",
            icon="copy",
            layout=widgets.Layout(width='150px', height='40px')
        )
        self.copy_message_button.on_click(self.on_copy_message_clicked)

                              
        self.progress_bar = widgets.IntProgress(
            value=0,
            min=0,
            max=100,
            step=1,
            description='Voortgang:',
            bar_style='info',
            layout=widgets.Layout(width='220px')
        )
        self.status_label = widgets.Label(value="", layout=widgets.Layout(width='auto', margin="0 0 0 10px"))
        self.etr_label = widgets.Label(value="", layout=widgets.Layout(width='auto', margin="0 0 0 10px"))
        self.progress_container = widgets.HBox(
            [self.progress_bar, self.status_label, self.etr_label],
            layout=widgets.Layout(visibility='hidden', align_items='center', justify_content='center', margin="10px 0px")
        )
        self.progress_start_time = None

                         
        self.row_top = widgets.HBox([
            self.ean_input,
            self.find_registrators_button,
            self.download_csv_button,
            self.download_xlsx_button
        ])
        self.row_top.layout = widgets.Layout(
            display='flex',
            flex_flow='row',
            align_items='center'
        )

        self.row_bottom = widgets.HBox([
            self.start_picker,
            self.end_picker,
            self.registrator_dropdown,
            self.fetch_pivot_button
        ])
        self.row_bottom.layout = widgets.Layout(
            display='flex',
            flex_flow='row',
            align_items='center'
        )

                                                                        
        self.main_box = widgets.VBox([
            self.output_widget,
            self.row_top,
            self.row_bottom,
            self.progress_container,
            self.optional_filters_container,
            widgets.HBox([widgets.HTML("<b>Standaard bericht:</b>"), self.copy_message_button]),
            self.message_output,
            widgets.HTML("<b>Pivot-result (table):</b>"),
            self.data_output
        ])

    def update_progress(self, progress: int, status: str = "", error: bool = False):
        if self.progress_start_time is None:
            self.progress_start_time = time.time()
        elapsed = time.time() - self.progress_start_time
        self.progress_bar.value = progress
        self.status_label.value = f"{status} ({progress}%)"
        if 0 < progress < 100:
            fraction_done = progress / 100.0
            estimated_total = elapsed / fraction_done
            remaining = estimated_total - elapsed
            m, s = divmod(remaining, 60)
            h, m = divmod(m, 60)
            if h >= 1:
                etr_str = f"Resterende tijd: {int(h)}u {int(m)}m {int(s)}s"
            else:
                etr_str = f"Resterende tijd: {int(m)}m {int(s)}s"
            self.etr_label.value = etr_str
        else:
            self.etr_label.value = ""
        if error:
            self.progress_bar.bar_style = "danger"
        elif progress >= 100:
            self.progress_bar.bar_style = "success"
        else:
            self.progress_bar.bar_style = "info"

    def finish_progress(self):
        time.sleep(1)
        self.progress_container.layout.visibility = 'hidden'
        self.progress_bar.value = 0
        self.status_label.value = ""
        self.etr_label.value = ""
        self.progress_bar.bar_style = "info"
        self.progress_start_time = None

    def on_copy_message_clicked(self, b):
        text_to_copy = getattr(self, 'default_message', '')
        if text_to_copy:
            js_code = f"navigator.clipboard.writeText({repr(text_to_copy)});"
            display(Javascript(js_code))
            with self.output_widget:
                clear_output(wait=True)
                print("Standaard bericht is gekopieerd naar het klembord.")
        else:
            with self.output_widget:
                clear_output(wait=True)
                print("Er is geen standaard bericht beschikbaar om te kopiëren.")

    def display_ui(self):
        display(self.main_box)

    def on_find_registrators_clicked(self, _):
        with self.output_widget:
            clear_output()
            print("Bezig met zoeken van registrators voor de opgegeven EAN...")
        self.registrator_dropdown.options = []
        self.registrator_dropdown.disabled = True
        self.fetch_pivot_button.disabled = True
        ean_value = self.ean_input.value.strip()
        if not ean_value:
            with self.output_widget:
                print("Fout: EAN is niet ingevuld.")
            return
        try:
            registrators_df = self.data_retriever.get_registrators_by_ean(ean_value)
        except Exception as exc:
            with self.output_widget:
                print(f"Fout tijdens ophalen registrators: {exc}")
            return
        if registrators_df.empty:
            with self.output_widget:
                print(f"Geen registrators gevonden voor EAN: {ean_value}")
            return
        if len(registrators_df) == 1:
            single_id = registrators_df.iloc[0]['RegistratorID']
            single_desc = registrators_df.iloc[0]['RegistratorDescription']
            self.registrator_dropdown.options = [(f"{single_desc} (ID={single_id})", single_id)]
            self.registrator_dropdown.index = 0
            self.registrator_dropdown.disabled = True
            self.fetch_pivot_button.disabled = False
            with self.output_widget:
                print(f"1 registrator gevonden: ID={single_id}, '{single_desc}'. Automatisch geselecteerd.")
        else:
            options_list = []
            for i, row in registrators_df.iterrows():
                rid = row['RegistratorID']
                rdesc = row['RegistratorDescription']
                options_list.append((f"{rdesc} (ID={rid})", rid))
            self.registrator_dropdown.options = options_list
            self.registrator_dropdown.disabled = False
            self.fetch_pivot_button.disabled = False
            with self.output_widget:
                print(f"Gevonden registrators voor EAN {self.ean_input.value.strip()}: {len(registrators_df)} stuks.")
                print("Selecteer hieronder de gewenste registrator en klik op 'Haal data op'.")

    def on_fetch_pivot_clicked(self, _):
        with self.output_widget:
            clear_output()
        with self.data_output:
            clear_output()
        with self.message_output:
            clear_output()
        self.progress_container.layout.visibility = 'visible'
        self.update_progress(0, "Start ophalen pivot data")
        if not self.registrator_dropdown.options:
            with self.output_widget:
                print("Fout: geen registrator geselecteerd.")
            self.update_progress(100, "Fout: Geen registrator", error=True)
            self.finish_progress()
            return
        start_date = self.start_picker.value
        end_date = self.end_picker.value
        if not start_date or not end_date:
            with self.output_widget:
                print("Fout: start- of einddatum niet ingevuld.")
            self.update_progress(100, "Fout: Datum ontbreekt", error=True)
            self.finish_progress()
            return
        pbegin = f"{start_date.strftime('%Y-%m-%d')} 00:00:00"
        pend   = f"{end_date.strftime('%Y-%m-%d')} 00:00:00"
        self.update_progress(20, "Datumwaarden opgebouwd")
        registrator_id = int(self.registrator_dropdown.value)
        include_status_t = self.include_status_t_checkbox.value
        if self.generate_default_message_checkbox.value:
            ean_value = self.ean_input.value.strip()
            bericht = (
                "Beste meneer/mevrouw,\n"
                "Zoals verzocht, stuur ik u hierbij de meetdata van de aansluiting.\n"
                f"•\tEAN: {ean_value}\n"
                f"•\tVanaf: {start_date.strftime('%d-%m-%Y')} tot {end_date.strftime('%d-%m-%Y')}\n\n"
                "Bijlage toegevoegd in het bestand.\n\n"
                "Met vriendelijke groet,"
            )
            self.default_message = bericht
            with self.message_output:
                print(bericht)
        else:
            with self.message_output:
                clear_output()
        self.update_progress(30, "Pivot data ophalen...")
        try:
            df = self.data_retriever.get_pivoted_data(registrator_id, pbegin, pend, include_status_t)
            self.current_df = df.copy()
            self.update_progress(70, "Data opgehaald, verwerken...")
        except Exception as e:
            with self.output_widget:
                print("Fout tijdens pivot-opvraag:", e)
            self.update_progress(100, "Fout tijdens ophalen", error=True)
            self.finish_progress()
            self.current_df = pd.DataFrame()
            return
        with self.data_output:
            clear_output()
            if self.current_df.empty:
                print("Geen data gevonden of fout opgetreden.")
            else:
                print(f"Aantal rijen: {len(self.current_df)}")
                display(self.current_df.head(50))
        has_data = not self.current_df.empty
        self.download_csv_button.disabled = not has_data
        self.download_xlsx_button.disabled = not has_data
        self.update_progress(100, "Klaar!")
        self.finish_progress()

    def on_download_csv(self, _):
        if self.current_df.empty:
            with self.output_widget:
                clear_output()
                print("Geen data om te exporteren.")
            return
        ean_value = self.ean_input.value.strip()
                                                                          
        downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads")
        if not os.path.exists(downloads_dir):
            os.makedirs(downloads_dir)
        filename = os.path.join(downloads_dir, f"Dataset_{ean_value}.csv")
        if export_to_csv(self.current_df, filename):
            with self.output_widget:
                clear_output()
                print(f"CSV opgeslagen: {filename}")

    def on_download_xlsx(self, _):
        if self.current_df.empty:
            with self.output_widget:
                clear_output()
                print("Geen data om te exporteren.")
            return
        ean_value = self.ean_input.value.strip()
                                                                          
        downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads")
        if not os.path.exists(downloads_dir):
            os.makedirs(downloads_dir)
        filename = os.path.join(downloads_dir, f"Dataset_{ean_value}.xlsx")
        if export_to_excel(self.current_df, filename):
            with self.output_widget:
                clear_output()
                print(f"XLSX opgeslagen: {filename}")
ui = PivotUIManager(engine)
ui.display_ui()

VBox(children=(Output(), HBox(children=(Text(value='', description='EAN:', layout=Layout(width='200px'), place…