In [None]:
import json
import csv
import xml.etree.ElementTree as ET
import pandas as pd
import os
import math
import streamlit as st

class SchemaAnalyzer:
    """
    Universal schema analyzer, der JSON, CSV, Excel und XML erkennt und analysiert.
    Unterstützt jetzt die rekursive Analyse von Arrays, um detaillierte Schemas 
    für verschachtelte Objekte zu erstellen.
    """

    def __init__(self, file_path, rows_per_sample_section=20):
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Die Datei wurde nicht gefunden: {file_path}")
        self.file_path = file_path
        self.sample_chunk_size = rows_per_sample_section
        self.extension = os.path.splitext(file_path)[1].lower()

    def analyze(self):
        """Bestimmt den Datentyp und das Format automatisch und analysiert das Schema."""
        try:
            if self.extension == ".json":
                return self._analyze_json()
            elif self.extension == ".csv":
                return self._analyze_csv()
            elif self.extension in [".xls", ".xlsx"]:
                return self._analyze_excel()
            elif self.extension == ".xml":
                return self._analyze_xml()
            else:
                return {"error": f"Unsupported file type: {self.extension}"}
        except Exception as e:
            import traceback
            return {"error": f"An error occurred: {e}", "traceback": traceback.format_exc()}

    def _determine_type(self, value):
        """Bestimmt den Datentyp eines Wertes, mit rekursiver Analyse für Listen und Dictionaries."""
        if isinstance(value, dict):
            return self._analyze_schema(value)
        elif isinstance(value, list):
            if not value:
                return ["array of", "empty"]
            # Analysiere den Typ jedes Elements in der Liste
            element_types = [self._determine_type(item) for item in value]
            # Fasse die gefundenen Typen zusammen (dies ist die neue, leistungsfähige Logik)
            merged_types = self._merge_types(element_types)
            return ["array of"] + merged_types
        elif isinstance(value, str):
            return "string"
        elif isinstance(value, int):
            return "integer"
        elif isinstance(value, float):
            return "number"
        elif isinstance(value, bool):
            return "boolean"
        elif value is None:
            return "null"
        else:
            return "unknown"

    def _infer_value_type(self, value: str):
        """Leitet den Datentyp aus einem String ab (für CSV/Excel)."""
        if not isinstance(value, str) or value.strip() == '':
            return self._determine_type(value)
        try: int(value); return "integer"
        except (ValueError, TypeError): pass
        try: float(value); return "number"
        except (ValueError, TypeError): pass
        if value.lower() in ['true', 'false']: return "boolean"
        return "string"

    def _merge_schemas(self, list_of_schemas):
        """Führt eine Liste von Schema-Dictionaries zu einem einzigen Schema zusammen."""
        if not list_of_schemas:
            return {}
            
        # Sammle alle Schlüssel aus allen Schemas
        all_keys = set()
        for schema in list_of_schemas:
            if isinstance(schema, dict):
                all_keys.update(schema.keys())

        merged_schema = {}
        for key in all_keys:
            # Sammle alle Typen für diesen Schlüssel aus allen Schemas
            types_for_key = []
            for schema in list_of_schemas:
                if isinstance(schema, dict) and key in schema:
                    types_for_key.append(schema[key])
            
            # Führe die gesammelten Typen rekursiv zusammen
            merged_schema[key] = self._merge_types(types_for_key)[0] if len(self._merge_types(types_for_key)) == 1 else self._merge_types(types_for_key)
            
        return merged_schema

    def _merge_types(self, types):
        """
        Führt eine Liste von Typ-Definitionen zusammen. Kann simple Typen (string),
        Objekt-Schemas (dict) und gemischte Typen verarbeiten.
        """
        simple_types = set()
        schemas = []
        
        for t in types:
            if isinstance(t, dict):
                schemas.append(t)
            elif isinstance(t, list) and t and t[0] == "array of":
                # Behandelt Arrays in Arrays
                # Vereinfachte Darstellung, um unendliche Komplexität zu vermeiden
                simple_types.add("array") 
            elif isinstance(t, list):
                # Flache Listen von Typen
                for inner_t in t:
                    if isinstance(inner_t, dict):
                        schemas.append(inner_t)
                    else:
                        simple_types.add(inner_t)
            elif t is not None:
                simple_types.add(t)

        final_types = sorted(list(simple_types))
        
        if schemas:
            merged_schema = self._merge_schemas(schemas)
            final_types.append(merged_schema)
            
        if not final_types:
            return ["unknown"]

        return final_types

    def _analyze_schema(self, data):
        """Analysiert das Schema eines einzelnen Dictionaries."""
        if not isinstance(data, dict):
            return self._determine_type(data)
        schema = {}
        for key, value in data.items():
            schema[key] = self._determine_type(value)
        return schema
    
    # --- NEUE METHODE ---
    
    def get_file_snippets(self, n):
        """
        Liest die ersten n, die mittleren n und die letzten n Zeilen einer Datei
        und gibt sie als Strings zurück.

        Args:
            n (int): Die Anzahl der Zeilen, die aus jedem Abschnitt geholt werden sollen.

        Returns:
            dict: Ein Dictionary mit den Schlüsseln 'head', 'middle', und 'tail',
                  das die entsprechenden Dateiabschnitte als String enthält.
                  Bei sehr kleinen Dateien können sich diese Abschnitte überschneiden.
        """
        try:
            # Versuche, die Datei mit UTF-8 zu öffnen, falle auf Latin-1 zurück, falls ein Fehler auftritt
            try:
                with open(self.file_path, 'r', encoding='utf-8') as f:
                    lines = f.readlines()
            except UnicodeDecodeError:
                with open(self.file_path, 'r', encoding='latin1') as f:
                    lines = f.readlines()

            total_lines = len(lines)

            # Extrahiere den Anfang (head)
            head_lines = lines[:n]
            head_str = "".join(head_lines)

            # Extrahiere das Ende (tail)
            tail_lines = lines[-n:]
            tail_str = "".join(tail_lines)

            # Extrahiere die Mitte (middle)
            if total_lines <= n:
                # Wenn die Datei kürzer als n ist, ist der mittlere Teil der gesamte Inhalt
                middle_lines = lines
            else:
                middle_start_index = max(0, (total_lines - n) // 2)
                middle_lines = lines[middle_start_index : middle_start_index + n]
            middle_str = "".join(middle_lines)
            
            return {
                "head": head_str,
                "middle": middle_str,
                "tail": tail_str
            }
        except Exception as e:
            import traceback
            return {
                "error": f"Fehler beim Lesen der Datei: {e}",
                "traceback": traceback.format_exc()
            }

    # --- Format-spezifische Parser und Hilfsmethoden ---
    
    def _get_strategic_samples(self, all_rows):
        """Nimmt Stichproben vom Anfang, der Mitte und dem Ende."""
        total_rows = len(all_rows)
        chunk_size = self.sample_chunk_size
        if total_rows <= chunk_size * 2:
            return all_rows
        head = all_rows[:chunk_size]
        middle_start = max(chunk_size, (total_rows - chunk_size) // 2)
        middle = all_rows[middle_start : middle_start + chunk_size]
        tail = all_rows[-chunk_size:]
        samples = head + middle + tail
        seen_representations = set()
        unique_samples = []
        for row in samples:
            try:
                representation = json.dumps(row, sort_keys=True)
                if representation not in seen_representations:
                    seen_representations.add(representation)
                    unique_samples.append(row)
            except TypeError:
                rep_str = str(row)
                if rep_str not in seen_representations:
                    seen_representations.add(rep_str)
                    unique_samples.append(row)
        return unique_samples

    def _analyze_json(self):
        with open(self.file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return self._determine_type(data)

    def _analyze_combined_samples(self, samples, use_infer_type=False):
        """Analysiert eine Liste von Zeilen (aus CSV/Excel)."""
        if not samples:
            return {"info": "No data samples found to analyze."}
        
        # Dies ist im Grunde eine Analyse eines Arrays von Objekten
        return self._determine_type(samples)

    def _analyze_csv(self):
        try:
            df = pd.read_csv(self.file_path, low_memory=False, keep_default_na=False)
        except Exception:
            df = pd.read_csv(self.file_path, low_memory=False, keep_default_na=False, encoding='latin1')
        if df.empty:
            return {"info": "CSV is empty or header-only"}
        # Konvertiere alles in native Python-Typen, wo möglich
        rows = df.to_dict(orient='records')
        typed_rows = []
        for row in rows:
            typed_row = {k: self._infer_value_type(v) if isinstance(v, str) else v for k, v in row.items()}
            typed_rows.append(typed_row)

        samples = self._get_strategic_samples(typed_rows)
        return self._analyze_combined_samples(samples, use_infer_type=False)

    def _analyze_excel(self):
        df = pd.read_excel(self.file_path, keep_default_na=False)
        if df.empty: return {"info": "Excel sheet is empty"}
        df = df.astype(str)
        rows = df.to_dict(orient='records')
        typed_rows = []
        for row in rows:
            typed_row = {k: self._infer_value_type(v) for k,v in row.items()}
            typed_rows.append(typed_row)

        samples = self._get_strategic_samples(typed_rows)
        return self._analyze_combined_samples(samples, use_infer_type=False)

    def _analyze_xml(self):
        # XML-Analyse bleibt für diese Anforderung unverändert, kann aber auch von der Logik profitieren.
        # Implementierung aus der vorherigen Version...
        tree = ET.parse(self.file_path)
        root = tree.getroot()
        def xml_to_dict(elem):
            if not list(elem) and elem.text is None and elem.attrib:
                return self._analyze_schema(elem.attrib)
            if not list(elem):
                return self._infer_value_type(elem.text) if elem.text else None
            d = {}
            for child in list(elem):
                child_data = xml_to_dict(child)
                if child.tag in d:
                    if not isinstance(d[child.tag], list):
                        d[child.tag] = [d[child.tag]]
                    d[child.tag].append(child_data)
                else:
                    d[child.tag] = child_data
            return d
        dict_data = {root.tag: xml_to_dict(root)}
        return self._analyze_schema(dict_data)

In [27]:
analyzer = SchemaAnalyzer("../knowledge_base/ChinookData.json")
result = analyzer.analyze()

number_of_lines = 10
snippets = analyzer.get_file_snippets(number_of_lines)


In [28]:
import pprint
pprint.pprint(result, sort_dicts=False)

{'Genre': ['array of', {'Name': 'string', 'GenreId': 'integer'}],
 'MediaType': ['array of', {'Name': 'string', 'MediaTypeId': 'integer'}],
 'Artist': ['array of', {'Name': 'string', 'ArtistId': 'integer'}],
 'Album': ['array of',
           {'ArtistId': 'integer', 'AlbumId': 'integer', 'Title': 'string'}],
 'Track': ['array of',
           {'AlbumId': 'integer',
            'Bytes': 'integer',
            'UnitPrice': 'number',
            'TrackId': 'integer',
            'MediaTypeId': 'integer',
            'Name': 'string',
            'Milliseconds': 'integer',
            'GenreId': 'integer',
            'Composer': 'string'}],
 'Employee': ['array of',
              {'City': 'string',
               'Phone': 'string',
               'Email': 'string',
               'ReportsTo': ['integer', 'null'],
               'HireDate': 'string',
               'Title': 'string',
               'FirstName': 'string',
               'Address': 'string',
               'PostalCode': 'strin

In [31]:
# 2. Holen Sie die Daten-Ausschnitte
snippets = analyzer.get_file_snippets(n=10) # n=10 für 10 Zeilen
head_str = snippets.get('head')
middle_str = snippets.get('middle')
tail_str = snippets.get('tail')

In [32]:
import json
import tiktoken

# 1. Das Schema in einen JSON-String umwandeln
schema_str = json.dumps(result, indent=2)

# 2. Tokenizer für dein Modell auswählen
encoding = tiktoken.encoding_for_model("gpt-4")  # oder "gpt-3.5-turbo", etc.

# 3. Tokenisieren und zählen
tokens = encoding.encode(schema_str)
print("Token-Anzahl:", len(tokens))

Token-Anzahl: 671


In [33]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

In [34]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

In [None]:
messages = [
    (
        "system",
        "Your task is to analyze the data and understand its schema, then create a corresponding LinkML schema. This should serve as a transformation proposal for a relational database.",
    ),
    (
        "human",
        f"Can you generate a LinkML schema that reflects the data structure, its format, and relationships? Please provide it in a code block. The schema is: result {schema_str}. The file is a JSON. \n Tail of the file: {tail_str} \n Head: {head_str} \n Middle: {middle_str}"
    ),
]

ai_msg = llm.invoke(messages)

In [43]:
from IPython.display import display, Markdown
display(Markdown(f"{ai_msg.content}"))

Hier ist ein LinkML-Schema, das auf den bereitgestellten Daten basiert:

```yaml
id: http://example.org/schema/
name: music_store

types:
  id:
    base: int
    uri: xsd:int
  string:
    base: str
    uri: xsd:string
  number:
    base: float
    uri: xsd:float

classes:
  Genre:
    slots:
      - id
      - name
  MediaType:
    slots:
      - id
      - name
  Artist:
    slots:
      - id
      - name
  Album:
    slots:
      - id
      - artist_id
      - title
  Track:
    slots:
      - id
      - album_id
      - bytes
      - unit_price
      - media_type_id
      - name
      - milliseconds
      - genre_id
      - composer
  Employee:
    slots:
      - id
      - city
      - phone
      - email
      - reports_to
      - hire_date
      - title
      - first_name
      - address
      - postal_code
      - fax
      - birth_date
      - state
      - country
      - last_name
  Customer:
    slots:
      - id
      - city
      - phone
      - email
      - last_name
      - first_name
      - fax
      - address
      - postal_code
      - state
      - country
      - company
      - support_rep_id
  Invoice:
    slots:
      - id
      - billing_address
      - total
      - invoice_date
      - billing_state
      - customer_id
      - billing_city
      - billing_postal_code
      - billing_country
  InvoiceLine:
    slots:
      - id
      - track_id
      - quantity
      - unit_price
      - invoice_id
  Playlist:
    slots:
      - id
      - name
  PlaylistTrack:
    slots:
      - playlist_id
      - track_id

slots:
  id:
    description: The unique identifier
    range: id
  name:
    description: The name
    range: string
  artist_id:
    description: The unique identifier of the artist
    range: id
  album_id:
    description: The unique identifier of the album
    range: id
  track_id:
    description: The unique identifier of the track
    range: id
  media_type_id:
    description: The unique identifier of the media type
    range: id
  genre_id:
    description: The unique identifier of the genre
    range: id
  bytes:
    description: The size of the track in bytes
    range: id
  unit_price:
    description: The price of the track
    range: number
  milliseconds:
    description: The length of the track in milliseconds
    range: id
  composer:
    description: The composer of the track
    range: string
  # ... repeat for all other slots ...
```

Bitte beachten Sie, dass Sie die Beschreibungen und Bereiche für jeden Slot an Ihre spezifischen Anforderungen anpassen müssen. Dieses Schema ist nur ein Ausgangspunkt und kann weiter verfeinert und erweitert werden, um Ihre Daten genauer zu modellieren.