In [1]:
import requests
import pandas as pd
import time
import json
import importlib, wikidata_queries
importlib.reload(wikidata_queries)
from wikidata_queries import WIKIDATA_QUERIES  # erneut ausführen
import requests
import pandas as pd
from io import StringIO
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)

In [2]:
class Retryable(Exception):
    pass

class Abort(Exception):
    pass
class WikiDataClient():
    def __init__(self):
        self.url = "https://query.wikidata.org/sparql"
        self.session = requests.Session()
        self.session.headers.update({"User-Agent": "PortfolioBot/1.0 (contact: your-email@example.com)", "Accept": "text/csv",})
        self.max_retries = 3
        self.timeout = 30
        self.retry_statuses = (429, 500, 502, 503, 504)
    
    def request(self, query):
        return self.session.get(self.url, params={"query": query}, timeout=self.timeout)

    def validate_format(self, response) -> None:
        ctype = (response.headers.get("Content-Type") or "").lower()
        text = response.text or ""

        is_csv_header = "text/csv" in ctype
        looks_like_csv = text.lstrip().startswith(('"', '?'))  # heuristik

        if not is_csv_header and not looks_like_csv:
            raise ValueError(f"Non-CSV response. ctype={ctype}. head={text[:200]}")

    def parse_df(self, response) -> pd.DataFrame:
        text = response.text or ""
        return pd.read_csv(StringIO(text), sep=",", engine="python")
    
    def _check_status(self, response):
        retryable = {429, 500, 502, 503, 504}
        status = response.status_code

        if status in (400, 404):
            raise Abort(f"Non-retryable HTTP {status}: {response.text[:200]}")

        if 400 <= status < 500 and status != 429:
            raise Abort(f"Non-retryable client error HTTP {status}: {response.text[:200]}")

        if status in retryable:
            raise Retryable(f"Retryable HTTP {status}: {response.text[:200]}")

        if 500 <= status < 600:
            raise Retryable(f"Server error HTTP {status}: {response.text[:200]}")

    def _attempt(self, query):
        response = self.request(query)              # kann Timeout/ConnectionError werfen
        self._check_status(response)                # kann Retryable/Abort werfen

        try:
            self.validate_format(response)          # kann ValueError werfen
            return self.parse_df(response)          # kann ParserError werfen
        except ValueError as e:
            raise Retryable(f"Format invalid: {e}") from e
        except pd.errors.ParserError as e:
            raise Retryable(f"CSV ParserError: {e}") from e
    
    def invoke_retry(self, query):
        for attempt in range(self.max_retries):
            try:
                return self._attempt(query)
        
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
                print(f"Attempt {attempt+1} network failed:", e)
                time.sleep(min(30, 2 ** attempt))
                continue
        
            except Retryable as e:
                print(f"Attempt {attempt+1} retryable:", e)
                time.sleep(5)
                continue

            except Abort as e:
                print(f"Abort:", e)
                return None

        return None


In [3]:
class WikiDataQueryBuilder:
   def __init__(self):
      self.templates = WIKIDATA_QUERIES
   def formatting_request_id(self, tickers):
      return " ".join(f'"{i}"' for i in tickers)
   def formatting_company_qid(self, qids):
      return " ".join(f'wd:{i}' for i in qids)
   def build_company_by_isin(self, isin):
      isin_formatted = " ".join(f'"{i}"' for i in isin)
      query = f"""SELECT ?reqId ?company WHERE {{VALUES ?reqId {{ {isin_formatted} }} OPTIONAL {{ ?company wdt:P946 ?reqId . }}}}"""
      return query
   def build_company_by_ticker(self, exchange_qid, tickers):
      tickers_formatted = self.formatting_request_id(tickers)
      query = f"""SELECT ?reqId ?company WHERE {{VALUES ?reqId {{ {tickers_formatted} }} OPTIONAL {{ ?company p:P414 [ ps:P414 wd:{exchange_qid} ; pq:P249 ?reqId ] . }}}}"""
      return query
   def build_features_by_qid(self, qids, feature_name):
      formatted_qids = self.formatting_company_qid(qids)
      feature_template = self.templates[feature_name]
      query = feature_template.replace("__VALUES__", formatted_qids)
      return query


In [4]:
class WikiDataTransformer:
    def __init__(self):
        pass
    import pandas as pd

    def wikidata_uri_to_qid_df(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()

        def convert_cell(x):
            # NaN/NaT unverändert
            if pd.isna(x):
                return x
            # nur Strings prüfen
            if not isinstance(x, str):
                return x

            s = x.strip()
            # nur echte Wikidata Entity-URIs konvertieren
            prefix = "http://www.wikidata.org/entity/"
            prefix2 = "https://www.wikidata.org/entity/"
            if s.startswith(prefix):
                return s[len(prefix):]
            if s.startswith(prefix2):
                return s[len(prefix2):]
            return x

        return df.applymap(convert_cell)

    def extract_qids(self, df):
        uris = df.copy()
        uris['Company_QID'] = uris['company'].astype(str).str.rsplit("/", n=1).str[-1]
        return uris.drop(columns=["company"])
    def transform_qid(self, qid_batch):
        no_nan_qid_batch = qid_batch.dropna()
        return self.extract_qids(no_nan_qid_batch)
    def stack_long_dfs(self, dfs, id_col="company"):
        out = []
        for df in dfs:
            value_cols = [c for c in df.columns if c != id_col]
            if len(value_cols) != 1:
                raise ValueError(f"Expected exactly 1 value col besides {id_col}, got {value_cols}")
            value_col = value_cols[0]
            tmp = df.rename(columns={value_col: "Value"}).copy()
            tmp["Item_Description"] = value_col
            out.append(tmp[[id_col, "Item_Description", "Value"]])
        return pd.concat(out, ignore_index=True)
    

In [5]:
class WikiDataIngestor:
    def __init__(self):
        self.client = WikiDataClient()
        self.builder = WikiDataQueryBuilder()
        self.transformer = WikiDataTransformer()
        self.features_to_fetch = WIKIDATA_QUERIES.keys()

    def batching_for_qid(self, tickerlist, batch_size=20):
        required_cols = ['OriginalTicker', 'YahooTicker', 'Exchange_QID', 'ISIN']
        missing = [c for c in required_cols if c not in tickerlist.columns]
        if missing:
            raise ValueError(f"tickerlist missing required columns: {missing}")
        batches = []
        df_all = tickerlist.copy()

        for exchange_qid, df in df_all.groupby("Exchange_QID", sort=True):
            isin_series = df["ISIN"].astype("string").str.strip()
            has_isin = isin_series.notna() & (isin_series != "")

            # --- ISIN batches ---
            isin_vals = isin_series.loc[has_isin].dropna().astype(str).str.strip()
            isin_vals = [v for v in isin_vals.tolist() if v]  # remove empty
            # stable dedup
            isin_vals = list(dict.fromkeys(isin_vals))

            for i in range(0, len(isin_vals), batch_size):
                batches.append(("ISIN", None, isin_vals[i:i + batch_size]))

            # --- Ticker batches ---
            tkr_series = df.loc[~has_isin, "OriginalTicker"].astype("string").str.strip()
            tkr_vals = [v for v in tkr_series.dropna().astype(str).tolist() if v]
            tkr_vals = list(dict.fromkeys(tkr_vals))

            for i in range(0, len(tkr_vals), batch_size):
                batches.append(("TICKER", str(exchange_qid), tkr_vals[i:i + batch_size]))

        return batches
    def batching_for_features(self, qids, batch_size = 10):
        batches = []
        for i in range(0,len(qids), batch_size):
            batches.append(qids[i:i+batch_size])
        return batches

    def get_qids(self, tickerlist):
        batches_for_qids = self.batching_for_qid(tickerlist)
        qids = []
        for batch in batches_for_qids:
            if batch[0] == "TICKER":
                query = self.builder.build_company_by_ticker(batch[1], batch[2])
            if batch[0] == "ISIN":
                query = self.builder.build_company_by_isin(batch[2])
            qid = self.client.invoke_retry(query)
            qid_batch = self.transformer.transform_qid(qid)
            qids.extend(qid_batch['Company_QID'])
        return qids
    
    def get_feature(self, batch, features_to_fetch):
        join_list = []
        for feature in features_to_fetch:
            query = self.builder.build_features_by_qid(batch, feature)
            feature_df = self.client.invoke_retry(query)
            feature_df = self.transformer.wikidata_uri_to_qid_df(feature_df)
            join_list.append(feature_df)
        return join_list

    def run(self, tickerlist):
        list_of_long_batches = []
        qids = self.get_qids(tickerlist)
        feature_batches = self.batching_for_features(qids)
        for batch in feature_batches:
            features = self.get_feature(batch, features_to_fetch=self.features_to_fetch)
            features_long = self.transformer.stack_long_dfs(features)
            list_of_long_batches.append(features_long)
        return pd.concat(list_of_long_batches)
                

test = pd.read_csv("C:\\Diversification\\data\\tickers.csv")
df = WikiDataIngestor().run(test)


Attempt 1 retryable: Retryable HTTP 429: 


In [10]:
len(df)

36638

In [7]:
from pathlib import Path
import duckdb
class DataHub:
    def __init__(self, db_name="yahoo_finance.db"):
        # Pfad-Management (funktioniert in Scripts & Notebooks)
        base_path = Path.cwd()
        db_path = base_path.parent.parent / "data" / "01_raw" / "yahoo" / db_name
        db_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Verbindung herstellen
        self.con = duckdb.connect(str(db_path))
        self._initialize_tables()
        print(f"DuckDB verbunden: {db_path}")

    def _initialize_tables(self):
            """Erstellt die Tabellenstruktur, falls sie noch nicht existiert."""
            # Yahoo / Financials
            self.con.execute("""
                CREATE TABLE IF NOT EXISTS bronze_financials (
                    ticker VARCHAR,
                    date DATE,
                    affiliation VARCHAR,
                    item_description VARCHAR,
                    value DOUBLE,
                    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            self.con.execute(
                "CREATE INDEX IF NOT EXISTS idx_ticker_date ON bronze_financials (ticker, date);"
            )

            # Wikidata
            self.con.execute("""
                CREATE TABLE IF NOT EXISTS bronze_wikidata (
                    company_qid VARCHAR,
                    item_description VARCHAR,
                    value VARCHAR,
                    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)

            # Index(e) für schnellere Abfragen
            self.con.execute(
                "CREATE INDEX IF NOT EXISTS idx_wikidata_qid ON bronze_wikidata (company_qid);"
            )
            self.con.execute(
                "CREATE INDEX IF NOT EXISTS idx_wikidata_qid_item ON bronze_wikidata (company_qid, item_description);"
            )

    def insert_financials(self, df: pd.DataFrame):
        """Speichert ein einheitliches DataFrame in die Datenbank."""
        if df is None or df.empty:
            return

        # Wir nutzen den 'Appender' Mechanismus von DuckDB für maximale Speed
        # DuckDB erkennt das DataFrame 'df' im lokalen Python-Scope automatisch
        try:
            self.con.execute("INSERT INTO bronze_financials (ticker, date, affiliation, item_description, value) SELECT ticker, date, affiliation, item_description, value FROM df")
        except Exception as e:
            print(f"Fehler beim Insert: {e}")

    def insert_wikidata(self, df: pd.DataFrame):
        """
        Speichert Wikidata-DataFrame in die Datenbank.
        Erwartete Spalten: Company_QID, Item_Description, Value
        """
        if df is None or df.empty:
            return

        # Defensive: Spalten checken (leicht tolerant bzgl. Case)
        required = {"company", "Item_Description", "Value"}
        if not required.issubset(set(df.columns)):
            missing = required - set(df.columns)
            raise ValueError(f"Wikidata DF fehlt Spalten: {missing}. Vorhanden: {list(df.columns)}")

        # Optional: Leichte Normalisierung/Typ-Sicherheit (Value als string ist am robustesten)
        df = df.copy()
        df["Company_QID"] = df["company"].astype(str)
        df["Item_Description"] = df["Item_Description"].astype(str)
        df["Value"] = df["Value"].astype(str)

        # In DuckDB-Insert: aliasen auf die Spaltennamen der Tabelle
        try:
            self.con.execute("""
                INSERT INTO bronze_wikidata (company_qid, item_description, value)
                SELECT
                    Company_QID AS company_qid,
                    Item_Description AS item_description,
                    Value AS value
                FROM df
            """)
        except Exception as e:
            print(f"Fehler beim Wikidata-Insert: {e}")

    def preview_data(self, table="bronze_wikidata", limit=100):
        """Holt Einträge als DataFrame zur Kontrolle."""
        df = self.con.execute(f"SELECT * FROM {table} LIMIT {int(limit)}").df()
        return df


    def get_summary_stats(self):
        """Gibt eine kleine Statistik über den Füllstand der DB aus."""
        return self.con.execute("""
            SELECT 
                (SELECT COUNT(DISTINCT ticker) FROM bronze_financials) as count_tickers,
                (SELECT COUNT(*) FROM bronze_financials) as total_financial_rows,
                (SELECT COUNT(DISTINCT company_qid) FROM bronze_wikidata) as count_company_qids,
                (SELECT COUNT(*) FROM bronze_wikidata) as total_wikidata_rows
        """).df()

    def close(self):
        """Schließt die Verbindung sauber."""
        self.con.close()

    def clear_database(self):
        """
        Löscht alle Daten und Tabellen aus der DuckDB-Datenbank.
        """
        try:
            tables = self.con.execute("SHOW TABLES").fetchall()

            if not tables:
                print("Datenbank ist bereits leer.")
                return

            print(f"Lösche {len(tables)} Tabellen...")

            for (table_name,) in tables:
                self.con.execute(f"DROP TABLE IF EXISTS {table_name}")

            print("Datenbank wurde erfolgreich geleert.")

        except Exception as e:
            print(f"Fehler beim Leeren der Datenbank: {e}")

In [8]:
DataHub().insert_wikidata(df)

DuckDB verbunden: c:\Diversification\data\01_raw\yahoo\yahoo_finance.db


In [9]:
DataHub().preview_data()

DuckDB verbunden: c:\Diversification\data\01_raw\yahoo\yahoo_finance.db


Unnamed: 0,company_qid,item_description,value,ingested_at
0,Q753684,isin,FR0010478248,2026-02-27 12:14:20.443279
1,Q753684,isin,US4659401040,2026-02-27 12:14:20.443279
2,Q1052675,isin,FR0000053506,2026-02-27 12:14:20.443279
3,Q1375196,isin,FR0010490920,2026-02-27 12:14:20.443279
4,Q2943995,isin,FR0010425595,2026-02-27 12:14:20.443279
5,Q65158292,isin,FR0011648716,2026-02-27 12:14:20.443279
6,Q79189910,isin,FR0013341781,2026-02-27 12:14:20.443279
7,Q98778444,isin,FR0000060840,2026-02-27 12:14:20.443279
8,Q98931288,isin,FR0000054421,2026-02-27 12:14:20.443279
9,Q130387411,isin,FR0012819381,2026-02-27 12:14:20.443279
