In [None]:
from dataclasses import dataclass
import logging

@dataclass
class Config:
    base_url: str = "https://api.hoppe-sts.com/"
    raw_path: str = "./data/raw_data"
    transformed_path: str = "./data/transformed_data" 
    metadata_path: str = "./data/latest"
    silver_tables: str = "./"
    metadata_tables: str = "./"
    batch_size: int = 1000
    max_workers: int = 8  # Erhöhte Worker für bessere Parallelisierung
    days_to_keep: int = 90  # Daten werden für 90 Tage aufbewahrt
    history_days: int = 5  # Letzten 5 Tage für Historie laden
    logger: logging.Logger = logging.getLogger()

In [None]:
import logging
import requests
import time

class API_Client:

    def __init__(self, base_url: str, api_key: str, cofig):
        self.config = config
        self.base_url = base_url
        self.api_key = api_key

    def get_data(self, relative_url, max_retries=3, backoff_factor=2):
        for attempt in range(max_retries):
            try:
                request_url = f"{self.base_url}{relative_url}"
                response = requests.request("GET", request_url, headers={"Authorization": f"ApiKey {self.api_key}"})
                self.config.logger.info(f"INFO - Request for {relative_url} successful")
                return response, response.json()
            except requests.exceptions.SSLError as e:
                self.config.logger.error(f"ERROR - SSL-Zertifikatsfehler: {str(e)}")
                return None, None
            except requests.exceptions.Timeout as e:
                if attempt < max_retries - 1:
                    wait_time = backoff_factor ** attempt
                    self.config.logger.warning(f"Timeout bei API-Anfrage: {str(e)}. Retry {attempt+1}/{max_retries} nach {wait_time}s")
                    time.sleep(wait_time)
                    continue
                self.config.logger.error(f"ERROR - Timeout bei API-Anfrage nach {max_retries} Versuchen: {str(e)}")
                return None, None
            except requests.exceptions.ConnectionError as e:
                if attempt < max_retries - 1:
                    wait_time = backoff_factor ** attempt
                    self.config.logger.warning(f"Verbindungsfehler: {str(e)}. Retry {attempt+1}/{max_retries} nach {wait_time}s")
                    time.sleep(wait_time)
                    continue
                self.config.logger.error(f"ERROR - Verbindungsfehler nach {max_retries} Versuchen: {str(e)}")
                return None, None
            except requests.exceptions.RequestException as e:
                self.config.logger.error(f"ERROR - API request failed: {str(e)}")
                if hasattr(e, 'response'):
                    return e.response, None
                return None, None

In [None]:
import polars as pl
from typing import List, Dict, Tuple


class Data_Processor:
    #logger = logging.getLogger("Data_Processor") ???

    @staticmethod
    def get_imo_numbers(data: List[dict]) -> List[str]:
        return [ship['imo'] for ship in data if ship.get('active', True)]
    
    @staticmethod
    def transform_shipdata(shipdata: pl.DataFrame, run_timestamp: str) -> Tuple[pl.DataFrame, Dict[str, pl.DataFrame]]:
        shipdata = shipdata.unnest("data")
        
        # Verschachtelte Tabellen extrahiren
        tables = {}
        for column, dtype in shipdata.collect_schema().items():
            if dtype == pl.List(pl.Struct):
                tables[column] = (
                    shipdata.select("imo", column)
                    .explode(column)
                    .unnest(column)
                    .with_columns(
                        pl.lit(run_timestamp).alias("loaddate")
                    )
                    
                )
            elif dtype == pl.List:
                tables[column] = (
                    shipdata.select("imo", column)
                    .explode(column)
                    .with_columns(
                        pl.lit(run_timestamp).alias("loaddate")
                    )
                    
                )

        # Schiffsdaten ohne Verschachtelung extrahieren
        shipdata = shipdata.select(
            pl.exclude([col for col, dtype in shipdata.collect_schema().items() if dtype == pl.List])
        ).with_columns(
            pl.lit(run_timestamp).alias("loaddate")
        )

        return shipdata, tables

    
    
    @staticmethod
    def transform_signals(signals: pl.DataFrame, run_timestamp: str) -> pl.DataFrame:
        if len(signals) == 0:
            return signals

        # Initiale Transformation    
        signals = (
            signals.unnest("signals")
            .unpivot(index="imo", variable_name="signal")
            .unnest("value")
        )

        # Verbleibende Verschachtelungen plätten
        for column, dtype in signals.collect_schema().items():
            if dtype == pl.Struct:
                signals = signals.unnest(column)

        # Null-Werte
        for column, dtype in signals.collect_schema().items():
            if dtype == pl.Null:
                signals = signals.with_columns(pl.lit(column).cast(pl.String))
        
        # Das Lade-Datum hinzufügen
        signals = signals.with_columns(
            pl.lit(run_timestamp).alias("loaddate")
        )
                
        return signals
    

    @staticmethod
    def transform_timeseries(timeseries: pl.DataFrame, imo: str, run_timestamp: str) -> Tuple[pl.DataFrame, pl.DataFrame]:
        
        if len(timeseries) == 0:
            # Hier müssen wir sicherstellen, dass beide zurückgegebenen DataFrames dieselbe Struktur haben
            empty_df = pl.DataFrame({
                "imo": [], 
                "signal": [],
                "signal_timestamp": [], 
                "signal_value": [], 
                "loaddate": []
            })
            return empty_df, empty_df.clone()
        
        # Optimierte Transformation
        transformed = (
            timeseries.drop("timestamp")
            .unpivot(
                index=[],
                variable_name="signal"
            )
            .unnest("value")
            .unpivot(
                index=["signal"],
                variable_name="signal_timestamp",
                value_name="signal_value"
            )
            .with_columns([
                pl.lit(imo).alias("imo"),
                pl.lit(run_timestamp).alias("loaddate")
            ])
        )
        
        # Lücken (NULL-Werte) identifizieren
        gaps = (
            transformed
            .filter(pl.col("signal_value").is_null())
            .select("imo", "signal", "signal_timestamp", "loaddate")
            .with_columns([
                pl.col("signal_timestamp").alias("gap_start")
            ])
        )
        
        # NULL-Werte aus dem Hauptdatensatz entfernen
        data = transformed.filter(pl.col("signal_value").is_not_null())
        
        return data, gaps
    
    @staticmethod
    def process_gaps(gaps_df: pl.DataFrame) -> pl.DataFrame:
        
        if len(gaps_df) == 0:
            return pl.DataFrame()
        
        # Stelle sicher, dass gap_start ein Datetime-Objekt ist
        if gaps_df["gap_start"].dtype != pl.Datetime:
            gaps_df = gaps_df.with_columns(
                pl.col("gap_start").cast(pl.Datetime)
            )
            
        result = []
        
        # Gruppieren nach IMO und Signal, sortiere nach Zeitstempel
        for (imo, signal), group in gaps_df.sort("gap_start").group_by(["imo", "signal"]):
            group_df = group.sort("gap_start")
            
            if len(group_df) <= 1:
                # Bei nur einem Eintrag
                result.append({
                    "imo": imo,
                    "signal": signal,
                    "gap_start": group_df["gap_start"][0],
                    "gap_end": group_df["gap_start"][0],
                    "loaddate": group_df["loaddate"][0]
                })
                continue
                
            current_start = group_df["gap_start"][0]
            prev_time = current_start
            
            for i in range(1, len(group_df)):
                curr_time = group_df["gap_start"][i]
                
                # Prüfe, ob mehr als 5 Minuten zwischen zwei Einträgen liegen
                max_sec = 5*60
                # Hier kommt es zum Fehler, wenn prev_time und curr_time Strings sind
                time_diff_seconds = (curr_time - prev_time).total_seconds()
                
                if time_diff_seconds > max_sec:
                    result.append({
                        "imo": imo,
                        "signal": signal,
                        "gap_start": current_start,
                        "gap_end": prev_time,
                        "loaddate": group_df["loaddate"][i]
                    })
                    current_start = curr_time  # Starte neue Lücke
                
                prev_time = curr_time  # Aktualisiere den vorherigen Zeitstempel
            
            # Letzte Lücke hinzufügen
            result.append({
                "imo": imo,
                "signal": signal,
                "gap_start": current_start,
                "gap_end": prev_time,
                "loaddate": group_df["loaddate"][-1]
            })
        
        return pl.DataFrame(result)
    
    @staticmethod
    def enrich_timeseries_with_friendly_names(timeseries_df: pl.DataFrame, signals_df: pl.DataFrame, imo: pl.String) -> pl.DataFrame:

        if len(timeseries_df) == 0 or len(signals_df) == 0:
            return timeseries_df
            
        # Extrahiere Signal-Mapping (signal -> friendly_name)
        signal_mapping = (
            signals_df
            .filter(pl.col("friendly_name").is_not_null(), pl.col("imo") == imo)
            .select(["signal", "friendly_name"])
            .unique()
        )
        
        # Join mit Timeseries-Daten
        return timeseries_df.join(
            signal_mapping,
            on="signal",
            how="left"
        ).select(["imo", "signal", "friendly_name", "signal_timestamp", "signal_value", "loaddate"])
    
    @staticmethod
    def update_daily_timeseries_summary(hist_df: pl.DataFrame, daily_df: pl.DataFrame, current_df: pl.DataFrame) -> pl.DataFrame:
        combined_df = pl.concat([hist_df, daily_df, current_df])
        # summary_df = combined_df.unique(subset=["imo", "signal_timestamp", "friendly_name"], keep="first").filter(pl.col("tag")=="new"| pl.col("tag")=="today")
        new_df = combined_df.unique(subset=["imo", "signal_timestamp", "friendly_name"], keep="first").filter(pl.col("tag").is_in(["new"]))
        summary_df = combined_df.unique(subset=["imo", "signal_timestamp", "friendly_name"], keep="first").filter(pl.col("tag").is_in(["new", "today"]))

        return summary_df, new_df


In [None]:
import os
import json
import logging
from pathlib import Path
from typing import List, Dict, Union, defaultdict
import polars as pl

class Data_Storage:
    
    
    def __init__(self, config):
        self.config = config
        # storage_options = {"bearer_token": notebookutils.credentials.getToken('storage'), "use_fabric_endpoint": "true"}
        
    # Schreiben von Files in lokale Ordner    
    def write_file(self, data: Union[List, Dict, pl.DataFrame], filename: str, path: str, postfix: str) -> None:
        notebookutils.fs.mkdirs(path)
        full_path = f"{path}/{filename}.{postfix}"
        
        try:
            # Schreibt json files
            if postfix == 'json':
                with open(full_path, 'w') as f:
                    json.dump(data, f)
                self.config.logger.info(f"INFO - Writting to {filename}.json file successful")

            # Schreibt parquet files
            elif postfix == 'parquet':

                # Check auf richtiges Input-Format
                if not isinstance(data, pl.DataFrame):
                    if isinstance(data, list) or isinstance(data, dict):
                        data = pl.DataFrame(data)
                    else:
                        raise ValueError("Data must be DataFrame, List, or Dict for parquet format")
                    
                data.write_parquet(full_path, compression="snappy")
                self.config.logger.info(f"INFO - Writting to {filename}.parquet file successful")

            else:
                raise ValueError(f"Unsupported format: {postfix}")
                
            # self.config.logger.info(f"INFO - Data saved to {full_path}")
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to write file {full_path}: {str(e)}")
            raise


    def read_file(self, filename: str, path: str, postfix: str) -> pl.DataFrame:
        full_path = f"{path}/{filename}.{postfix}"
        
        try:
            if not notebookutils.fs.exists(full_path):
                self.config.logger.info(f"INFO - File {full_path} does not exist")
                return pl.DataFrame()  # Leeres DataFrame zurückgeben
            
            else:
                if postfix == 'json':
                    with open(full_path, 'r') as f:
                        data = json.load(f)
                        data = pl.DataFrame(data)
                    self.config.logger.info(f"INFO - Reading from {filename}.json file successfully")
                    return data

                elif postfix == 'parquet':
                    data = pl.read_parquet(full_path)
                    self.config.logger.info(f"INFO - Reading from {filename}.parquet file successfully")
                    return data

                else:
                    raise ValueError(f"Unsupported format: {postfix}")
            
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to read file {full_path}: {str(e)}")
            raise

    def write_table(self, data: pl.DataFrame, tablename: str, path: str, method: str = 'append', key: str = None, key2: str = None, key3: str = None):
        # bsp. final_table_path: f'{lh_path}/Tables/hoppe/{tablename}'
        # schema_mode="overwrite" or schema_mode="merge". schema_mode="overwrite"

        table_path = f'{path}/{tablename}'

        try: 
            if method != 'merge':
                data.write_delta(table_path, mode=method)
                self.config.logger.info(f"INFO - Writting to table {tablename} successful")
                print(f"INFO - Writting to table {tablename} successful")
            else: 
                if key2 != None and key3 != None:
                    data.write_delta(table_path, mode=method,
                    delta_merge_options={
                        'predicate': f'source.{key} = target.{key} AND source.{key2} = target.{key2} AND source.{key3} = target.{key3}',
                        'source_alias': 'source',
                        'target_alias': 'target',
                    },
                    ).when_matched_update_all(
                    ).when_not_matched_insert_all(
                    ).execute()
                    self.config.logger.info(f"INFO - Merge to table {tablename} successful")
                elif key2 != None:
                    data.write_delta(table_path, mode=method,
                    delta_merge_options={
                        'predicate': f'source.{key} = target.{key} AND source.{key2} = target.{key2}',
                        'source_alias': 'source',
                        'target_alias': 'target',
                    },
                    ).when_matched_update_all(
                    ).when_not_matched_insert_all(
                    ).execute()
                    self.config.logger.info(f"INFO - Merge to table {tablename} successful")
                else:
                    data.write_delta(table_path, mode=method,
                    delta_merge_options={
                        'predicate': f'source.{key} = target.{key}',
                        'source_alias': 'source',
                        'target_alias': 'target',
                    },
                    ).when_matched_update_all(
                    ).when_not_matched_insert_all(
                    ).execute()
                    self.config.logger.info(f"INFO - Merge to table {tablename} successful")
                
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to {method} to table {table_path}: {str(e)}")
            raise

        

    def read_table(tablename: str, path: str) -> pl.DataFrame:
        # bsp. final_table_path: f'{lh_path}/Tables/hoppe/{tablename}' 

        table_path = f'{path}/{tablename}'
        try: 
            df = pl.read_delta(table_path)
            print(f"INFO - Loading table {tablename} successful")
            return df
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to load {table_path}: {str(e)}")
            return pl.DataFrame
            raise
        
    def find_timeseries_files(self, base_path: str, max_days: int = None, pattern: str = "Timeseries_*.parquet") -> defaultdict:
        if not notebookutils.fs.exists(base_path) or not notebookutils.fs.ls(full_path):
            self.config.logger.error(f"ERROR - Directory {base_path} does not exist or is no directory")
            return defaultdict(list)

        try:
            # Optimiertes Dateisystem-Scannen mit einmaliger Tiefensuche
            files_by_imo = defaultdict(list)
            
            # Metadaten für Logging
            days_found = set()
            months_found = set()
            years_found = set()
            
            # Schnellere Dateisuche mit glob statt rekursivem Durchsuchen
            year_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir()], key=lambda x: x.name, reverse=True)
            
            for year_dir in year_dirs:
                years_found.add(year_dir.name)
                
                month_dirs = sorted([d for d in year_dir.iterdir() if d.is_dir()], key=lambda x: x.name, reverse=True)
                for month_dir in month_dirs:
                    months_found.add(f"{year_dir.name}/{month_dir.name}")
                    
                    day_dirs = sorted([d for d in month_dir.iterdir() if d.is_dir()], key=lambda x: x.name, reverse=True)
                    
                    # Begrenzung auf max_days
                    if max_days is not None and len(days_found) >= max_days:
                        break
                        
                    for day_dir in day_dirs:
                        # Begrenzung auf max_days
                        if max_days is not None and len(days_found) >= max_days:
                            break
                            
                        days_found.add(f"{year_dir.name}/{month_dir.name}/{day_dir.name}")
                        
                        # Direktes Sammeln aller Dateien für diesen Tag mit glob
                        for file in day_dir.glob(pattern):
                            imo = file.stem.split("_")[1]  # Extrahiert <imo> aus "Timeseries_<imo>.parquet"
                            files_by_imo[imo].append(file)
            
            self.config.logger.info(f"INFO - {sum(len(files) for files in files_by_imo.values())} files found: "
                            f"{len(files_by_imo)} different ships, {len(days_found)} days, "
                            f"{len(months_found)} months, {len(years_found)} years")
            
            return files_by_imo  # Dictionary mit Listen von Dateien nach IMO
            
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to get historical Data: {str(e)}")
            raise
    
    def find_timeseries_summaries(self, base_path: str,  pattern:str = "*.parquet") -> list:
        base_dir = Path(base_path)
        if not base_dir.exists() or not base_dir.is_dir():
            self.config.logger.error(f"ERROR - Directory {base_path} does not exist or is no directory")
            return defaultdict(list)

        # Alles Files mit dem pattern finden, pattern kann Datum begrenzen z.B. 2025*.parquet = alle Dateien aus 2025
        try:
            
            files =  []
            files_found = 0

            for file in base_dir.rglob(pattern):
                files.append(file)
                files_found +=1

        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to get historical Data: {str(e)}")
            raise

        self.config.logger.info(f"INFO - {files_found} summary files found")
    
        return files
        


In [None]:
import os
import logging
from datetime import datetime, timedelta
from typing import List
from concurrent.futures import ThreadPoolExecutor
# from env.api_client import API_Client
# from env.data_storage import Data_Storage
# from env.data_processor import Data_Processor
# from env.config import Config
import polars as pl

class Pipeline:
    def __init__(self, config: Config, api_key: str, verify_ssl: bool = True):
        self.config = config
        self.api_client = API_Client(config.base_url, api_key, config)
        self.processor = Data_Processor()
        self.storage = Data_Storage(config)
    
    def process_shipdata(self, run_timestamp: str):
        try:

            self.config.logger.info("INFO - Sart processing of ship data")

            # Get Shipdata
            response, shipdata = self.api_client.get_data("fleet")

            # Get & Store IMO Numbers
            imo_numbers = self.processor.get_imo_numbers(shipdata)
            self.storage.write_file(
                imo_numbers,
                'imos',
                f"{self.config.metadata_path}",
                'parquet'
            )
            self.storage.write_table(
                data = pl.DataFrame({"imo": imo_numbers}), 
                tablename = 'imos', 
                path = f"{self.config.metadata_tables}", 
                method = 'overwrite'
                )

            self.config.logger.info(f"INFO - Found {len(imo_numbers)} active ships")

            if not shipdata:
                self.config.logger.error(f"ERROR - No ship data received - {response}")
                raise ValueError(f"No ship data received - {response}")
            
            # Store raw ship data
            self.storage.write_file(
                    shipdata,
                    'ShipData',
                    f"{self.config.raw_path}/{run_timestamp}",
                    'json'
                )
            
            # Transform and store ship data
            ships_df = pl.DataFrame(shipdata)
            if ships_df.columns == ['detail']:
                self.config.logger.info("Request Error: see json file for details")
            
            else:
                ships_transformed, tables = self.processor.transform_shipdata(ships_df, run_timestamp)
            
                self.storage.write_file(
                        ships_transformed,
                        'ShipData',
                        f"{self.config.transformed_path}/{run_timestamp}",
                        'parquet'
                    )

                self.storage.write_table(
                    data = ships_transformed,
                    tablename ='ShipData',
                    path = f"{self.config.silver_tables}",
                    method = 'overwrite'
                )
                    
                # Process nested tables
                for name, table in tables.items():
                        self.storage.write_file(
                            table,
                            f"ShipData_{name}",
                            f"{self.config.transformed_path}/{run_timestamp}",
                            'parquet'
                        )
                        self.storage.write_table(
                            table,
                            'ShipData_{name}',
                            f"{self.config.silver_tables}",
                            'overwrite'
                        )
                        
            
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to process ship data: {str(e)}")


    def process_signals_old(self, run_timestamp: str, imo_numbers:List[str], current_signals_df: pl.DataFrame):
        try:
            
            for imo in imo_numbers:

                self.config.logger.info(f"INFO - Processing signals data for {imo}")

                response, signals = self.api_client.get_data(f"fleet/{imo}/signals")

                if not signals:
                    self.config.logger.error(f"ERROR - No signals data received - {response}")
                    ValueError(f"No signals data received - {response}")

                # Store raw signals data
                self.storage.write_file(
                        signals,
                        f'Signals_{imo}',
                        f"{self.config.raw_path}/{run_timestamp}",
                        'json'
                    )
                
                # Transform and store signals data
                signals_df = pl.DataFrame(signals)
                if signals_df.columns == ['detail']:
                    self.config.logger.info("Request Error: see json file for details")
                else:
                    signals_transformed = self.processor.transform_signals(signals_df, run_timestamp)
                    self.storage.write_file(
                            signals_transformed,
                            f'Signals_{imo}',
                            f"{self.config.transformed_path}/{run_timestamp}",
                            'parquet'
                        )
                    self.storage.write_table(
                            data = signals_transformed,
                            tablename = 'Signals',
                            path = f"{self.config.silver_tables}",
                            method = 'merge',
                            key = 'signal'
                        )
                    self.config.logger.info(f"INFO - Signals data processed for {imo}")
                
                    # Update Signals Mapping

                    new_signal_mapping = signals_transformed.select(["imo", "signal", "friendly_name"]).unique()
                    self.storage.write_table(
                            data = signals_transformed,
                            tablename = 'signal_mapping',
                            path = f"{self.config.metadata_tables}",
                            method = 'merge',
                            key = 'signal',
                            key2 = 'signal',
                            key3 = 'friendly_name'
                        )
                    self.config.logger.info(f"INFO - Signal Mapping updated")
                    # if current_signals_df is None:
                        # self.storage.write_file(
                            # new_signal_mapping,
                            # 'signal_mapping',
                            # f"{self.config.metadata_path}",
                            # 'parquet'
                        # )
                        # self.config.logger.info(f"INFO - Signal Mapping updated for the first time")
                    # else:        
                        # Add new Signals to existing Signal Mapping
                        # 
                        # updated_signals = pl.concat([current_signals_df, new_signal_mapping]).unique(subset=["imo", "signal", "friendly_name"], keep="first")
                        # 
                        # self.storage.write_file(
                                # updated_signals,
                                # "signal_mapping",
                                # f"{self.config.metadata_path}",
                                # "parquet")
                        # self.config.logger.info(f"INFO - Signal Mapping updated")    
                
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to process signals data: {str(e)}")

    def process_timeseries(self, run_timestamp: str, imo_numbers: List[str], signal_mapping: pl.DataFrame, current_df: pl.DataFrame, run_start) -> pl.DataFrame:
        try:
            self.config.logger.info(f"INFO - Starting parallel processing of timeseries data for {len(imo_numbers)} ships")
            result_df = current_df
            
            def process_per_imo(imo):
                try:
                    self.config.logger.info(f"INFO - Processing timeseries data for {imo}")
                    response, timeseries = self.api_client.get_data(f"fleet/{imo}/timeseries")
                    
                    # Store raw timeseries data
                    self.storage.write_file(
                        timeseries,
                        f'Timeseries_{imo}',
                        f"{self.config.raw_path}/{run_timestamp}",
                        'json'
                    )
                    
                    # Transform and store timeseries data
                    timeseries_df = pl.DataFrame(timeseries)
                    
                    if timeseries_df.columns == ['detail']:
                        self.config.logger.info(f"INFO - Request Error for {imo}: see json file for details")
                        return pl.DataFrame()
                    
                    if not timeseries:
                        self.config.logger.error(f"ERROR - No timeseries data received for {imo} - {response}")
                        timeseries_transformed = pl.DataFrame({
                            "imo": [], "signals":[], "friendly_name": [], "signal_timestamp": [], "signal_value": [], "loaddate": []
                        })
                        gaps = pl.DataFrame()
                    else:
                        timeseries_transformed, gaps = self.processor.transform_timeseries(timeseries_df, imo, run_timestamp)
                    
                    # Enrich with friendly names
                    timeseries_transformed = self.processor.enrich_timeseries_with_friendly_names(
                        timeseries_transformed, signal_mapping, imo
                    )
                    
                    self.storage.write_file(
                        timeseries_transformed,
                        f"Timeseries_{imo}",
                        f"{self.config.transformed_path}/{run_timestamp}",
                        'parquet'
                    )
                    
                    # Process gaps
                    gaps_df = self.processor.process_gaps(pl.DataFrame(gaps))
                    self.storage.write_file(
                        gaps_df,
                        f"Gaps_{imo}",
                        f"{self.config.transformed_path}/gaps/{run_timestamp}",
                        'parquet'
                    )
                    
                    # self.storage.write_table(
                    #     data = gaps_df,
                    #     tablename = f"Gaps",
                    #     path = f"{self.config.silver_tables}",
                    #     method ='merge',
                    #     key = 'imo',
                    #     key2 = 'signal',
                    #     key3 = 'gap_end'
                    # )
                    
                    return timeseries_transformed
                    
                except Exception as e:
                    self.config.logger.error(f"ERROR - Failed to process timeseries for {imo}: {str(e)}")
                    return pl.DataFrame()
            
            # Process parallelized with ThreadPoolExecutor
            with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
                results = list(executor.map(process_per_imo, imo_numbers))
            # Alle Ergebnisse kombinieren
            valid_results = [df for df in results if not df.is_empty()]
            if valid_results:
                result_df = pl.concat([result_df] + valid_results)
                
            return result_df
        
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to process timeseries data: {str(e)}")
            return current_df
        
    def process_signals(self, run_timestamp: str, imo_numbers: List[str], current_signals_df: pl.DataFrame):
        try:
                      
            self.config.logger.info(f"INFO - Starting parallel processing of signal data for {len(imo_numbers)} ships")
            result_df = current_signals_df

            def process_per_imo(imo: str) -> pl.DataFrame:
                try:
                    self.config.logger.info(f"INFO - Processing signals data for {imo}")
                    
                    response, signals = self.api_client.get_data(f"fleet/{imo}/signals")
                    
                    if not signals:
                        self.config.logger.error(f"ERROR - No signals data received for {imo} - {response}")
                        return pl.DataFrame()
                        
                    # Store raw signals data
                    self.storage.write_file(
                        signals,
                        f'Signals_{imo}',
                        f"{self.config.raw_path}/{run_timestamp}",
                        'json'
                    )
                    
                    # Transform and store signals data
                    signals_df = pl.DataFrame(signals)
                    if signals_df.columns == ['detail']:
                        self.config.logger.info(f"INFO - Request Error for {imo}: see json file for details")
                        return pl.DataFrame()
                        
                    signals_transformed = self.processor.transform_signals(signals_df, run_timestamp)
                    
                    self.storage.write_file(
                        signals_transformed,
                        f'Signals_{imo}',
                        f"{self.config.transformed_path}/{run_timestamp}",
                        'parquet'
                    )
                    self.config.logger.info(f"INFO - Signals data processed for {imo}")
                    return signals_transformed
                    
                except Exception as e:
                    self.config.logger.error(f"ERROR - Failed to process signals for {imo}: {str(e)}")
                    return pl.DataFrame()
            
            # Process parallelized with ThreadPoolExecutor
            with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
                results = list(executor.map(process_per_imo, imo_numbers))
            
            
            self.config.logger.info("All Signals processed and written")
            # Alle Ergebnisse kombinieren

            valid_subset = [df.select(["imo", "signal", "friendly_name"]).unique() for df in results if not df.is_empty()]

            if valid_subset:

                updated_signals = pl.concat([result_df] + valid_subset).unique(
                        subset=["imo", "signal", "friendly_name"], 
                        keep="first"
                    )

            else:
                updated_signals = result_df

                    
            # Speichere das aktualisierte Signal-Mapping
            if not updated_signals.is_empty():
                self.storage.write_file(
                    updated_signals,
                    "signal_mapping",
                    f"{self.config.metadata_path}",
                    "parquet"
                )

                self.storage.write_table(
                    data = updated_signals,
                    tablename = 'signal_mapping',
                    path = f"{self.config.metadata_tables}",
                    method = 'overwrite'
                )

                self.config.logger.info(f"INFO - Signal Mapping updated with {len(updated_signals)} records")
                
        except Exception as e:
            self.config.logger.error(f"ERROR - Failed to process signals: {str(e)}")
            


    def run(self, mode: str = "all"):

        try: 

            run_start = datetime.now()
            run_end = None
            run_timestamp = run_start.strftime('%Y/%m/%d/%H/%M')
            summary_filename = f"{run_start.strftime('%Y%m%d')}"
            self.config.logger.info(f"INFO - Starting pipeline run at {run_start}")
            cutoff_date_str = (run_start - timedelta(days=self.config.history_days)).strftime('%Y/%m/%d')

            # Initialize directories
            notebookutils.fs.mkdirs(f"{self.config.raw_path}/{run_timestamp}")
            notebookutils.fs.mkdirs(f"{self.config.transformed_path}/{run_timestamp}")
            notebookutils.fs.mkdirs(f"{self.config.transformed_path}/gaps/{run_timestamp}")
            notebookutils.fs.mkdirs(f"{self.config.transformed_path}/daily_summary")

            empty_ts_schema = {"imo": pl.String, "signal": pl.String, "friendly_name": pl.String, "signal_timestamp": pl.String, "signal_value": pl.Float64, "loaddate": pl.String}
            empty_ts_schema_tags = {"imo": pl.String, "signal": pl.String, "friendly_name": pl.String, "signal_timestamp": pl.String, "signal_value": pl.Float64, "loaddate": pl.String, "tag": pl.String}
            current_signals_df = self.storage.read_file("signal_mapping", f"{self.config.metadata_path}", "parquet")

            # Read daily and historical data
            daily_df = self.storage.read_file(summary_filename, f"{self.config.transformed_path}/daily_summary", "parquet")
            hist_df = self.storage.read_file("ref_data",  f"{self.config.metadata_path}", "parquet")
            if hist_df.is_empty(): 
                hist_df = pl.DataFrame(schema=empty_ts_schema_tags)
            else: # Filter, wo die ersten 10 Zeichen (YYYY/MM/DD) größer als cutoff sind
                hist_df = hist_df.filter(pl.col("loaddate").str.slice(0, 10) > cutoff_date_str).with_columns(pl.lit("hist").alias("tag"))
                

            if daily_df.is_empty(): # means it is a new day

                daily_df = pl.DataFrame(schema=empty_ts_schema_tags)
                
                last_day = (run_start - timedelta(days=1)).strftime('%Y%m%d')
                last_day_df = self.storage.read_file(last_day, f"{self.config.transformed_path}/daily_summary", "parquet")
                
                if last_day_df.is_empty(): 
                    last_day_df = pl.DataFrame(schema=empty_ts_schema_tags)
                else:
                    last_day_df = last_day_df.with_columns(pl.lit("hist").alias("tag"))

                hist_df = pl.concat([hist_df, last_day_df])

                self.storage.write_file(
                    hist_df, 
                    "ref_data", 
                    f"{self.config.metadata_path}", 
                    "parquet")
                
                self.storage.write_table(
                    data = hist_df, 
                    tablename = "ref_data", 
                    path = f"{self.config.metadata_tables}", 
                    method = "overwrite")
                
            else: 
                daily_df = daily_df.select(["imo", "signal", "friendly_name", "signal_timestamp", "signal_value", "loaddate"]).with_columns(pl.lit("today").alias("tag"))

            current_df = pl.DataFrame(schema=empty_ts_schema)

            self.config.logger.info(f"INFO - Loaded {hist_df.shape[0]} historical records and {daily_df.shape[0]} daily records")
            
            self.config.logger.info(f"INFO - Processing data for mode: {mode}")

            if mode in ["all", "fleet"]:

                # Process Shipdata
                self.process_shipdata(run_timestamp)

                # Get IMO Numbers and turn them into list
                imo_numbers = self.storage.read_file(
                    'imos',
                    f"{self.config.metadata_path}",
                    'parquet'
                )            
                imo_numbers = imo_numbers.to_series(0).to_list()


                # Process Signals
                self.process_signals(run_timestamp, imo_numbers, current_signals_df)



            if mode in ["all", "timeseries"]:

                self.config.logger.info("Start to Process Timeseries Data")
                # Get IMO Numbers from file in data
                imo_numbers = self.storage.read_file(
                    'imos',
                    f"{self.config.metadata_path}",
                    'parquet'
                )
                imo_numbers = imo_numbers.to_series(0).to_list()

                # Get Signal-Mapping
                signal_mapping = self.storage.read_file(
                    'signal_mapping',
                    f"{self.config.metadata_path}",
                    'parquet'
                )                
                
                # Process Timeseries
                current_df = self.process_timeseries(run_timestamp, imo_numbers, signal_mapping, current_df, run_start)
                current_df = current_df.with_columns(pl.lit("new").alias("tag"))

                # Save Delta
                summary_df, new_df = self.processor.update_daily_timeseries_summary(hist_df, daily_df, current_df)
                self.storage.write_file(
                    summary_df.select(["imo", "signal", "friendly_name", "signal_timestamp", "signal_value", "loaddate"]),
                    summary_filename,
                    f"{self.config.transformed_path}/daily_summary",
                    "parquet"
                )

                self.storage.write_table(
                    data = summary_df.select(["imo", "signal", "friendly_name", "signal_timestamp", "signal_value", "loaddate"]),
                    tablename = 'Timeseries_today',
                    path = f"{self.config.silver_tables}",
                    method = "overwrite"
                )

            
            run_end = datetime.now()
        
        except Exception as e:
            run_end = datetime.now()
            self.config.logger.error(f"ERROR - Pipeline run failed at {run_end}: {str(e)}") 
            raise

        

        return run_start, run_end

In [None]:
import logging
import os
from datetime import datetime
#from env.config import Config
#from env.pipeline import Pipeline


api_key = "YWExNTNiMzEtNmJlZi00ODAwLWJkMzgtYzk4NTdkYTEyZDk0OmNUaGo5T05OMks3YmVpTk1TVmI4ZDVTenR0bm9wY3llMzlGbGZDUzY="
notebookutils.fs.mount("abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/9e6baa74-8d1f-4201-9a08-b6fd53c72180", "/lakehouse/lh_metadata")
notebookutils.fs.mount("abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/30df6f99-a299-4ca8-9d9d-ddc2a10b1943","/lakehouse/lh_silver")
notebookutils.fs.mount("abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/92bb8409-5f37-48a1-af16-827475a5000b","/lakehouse/lh_bronze")
notebookutils.fs.getProperties
file_info = notebookutils.fs.ls("/synfs/notebook")
base_path = file_info [0].path

metadata_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/9e6baa74-8d1f-4201-9a08-b6fd53c72180"
bronze_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/92bb8409-5f37-48a1-af16-827475a5000b"
silver_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/30df6f99-a299-4ca8-9d9d-ddc2a10b1943"

def setup_logging(log_podtfix):
    """Konfiguriert das Logging"""
    logging.basicConfig(
        level=logging.INFO
        ,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ,handlers=[
            logging.FileHandler(f"{base_path}/lakehouse/lh_metadata/Files/hoppe/logs/runs_{log_postfix}.log"),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger('run')

In [None]:
# Configure pipeline
log_postfix = f"{datetime.now().strftime('%Y%m%d')}"
logger = setup_logging(log_postfix)
logger.setLevel('INFO')

config = Config(
    base_url="https://api.hoppe-sts.com/",
    raw_path= f'{base_path}/lakehouse/lh_bronze/Files/hoppe',
    transformed_path= f'{base_path}/lakehouse/lh_silver/Files/hoppe',
    metadata_path = f'{base_path}/lakehouse/lh_metadata/Files/hoppe',
    silver_tables = f'{silver_base}/Tables/hoppe',
    metadata_tables = f'{metadata_base}/Tables/hoppe',
    batch_size = int("1000"),
    max_workers=int("4"),
    days_to_keep=int("90"),
    history_days=int("5"),
    logger=logger
)


In [None]:
# Create and run pipeline
try:
    mode = "all" # "all" or "timeseries" or "fleet"
    pipeline = Pipeline(config, api_key)
    run_start, run_end = pipeline.run(mode)  
    logger.info(f"INFO - Pipeline run completed at {run_end}: total runtime {run_end - run_start}")
except Exception as e:
    logger.error(f"ERROR - Pipeline run failed: {str(e)}")

# TESTING

In [None]:
for imo in imo_numbers:
    print(imo)
    full_path = f"{config.transformed_path}/2025/04/03/14/50/Signals_{imo}.parquet"
    if notebookutils.fs.exists(full_path):
        df = pl.read_parquet(f"{full_path}")
        #print(df.schema)
        print(df.shape)
    else:
        print("No Signals found")


In [None]:
import polars as pl

In [None]:
silver_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/30df6f99-a299-4ca8-9d9d-ddc2a10b1943"
path = f'{silver_base}/Tables/hoppe/Timeseries'
data = pl.read_delta(path)

In [None]:
ts_df = data.select(["imo", "signal", "signal_timestamp", "signal_value", "loaddate"])

In [None]:
metadata_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/9e6baa74-8d1f-4201-9a08-b6fd53c72180"
path = f'{metadata_base}/Tables/hoppe/signal_mapping'
signal_mapping = pl.read_delta(path)

In [None]:
ts_df_proc = ts_df.join(
        signal_mapping,
        on="signal",
        how="left"
        ).select(["imo", "signal", "friendly_name", "signal_timestamp", "signal_value", "loaddate"])

In [None]:
ts_df_proc = ts_df_proc.with_columns(pl.col().cast

In [None]:
silver_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/30df6f99-a299-4ca8-9d9d-ddc2a10b1943"
table_path = f'{silver_base}/Tables/hoppe/Timeseries'

ts_df_proc.write_delta(table_path, mode='overwrite')

In [None]:
ts_df_ref = ts_df_proc.filter(pl.col().)

In [None]:
silver_base = "abfss://c5691838-bce4-4eff-9d3e-8f3f9ed6d2d3@onelake.dfs.fabric.microsoft.com/30df6f99-a299-4ca8-9d9d-ddc2a10b1943"
table_path = f'{silver_base}/Tables/hoppe/Timeseries_today'

ts_df_proc.write_delta(table_path, mode='overwrite')