In [1]:
import os
import sys
import json
import pandas as pd
import numpy as np
import yaml
from zipfile import ZipFile
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Enum, ForeignKey, Boolean
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
import enum
import subprocess

# Überprüfen und installieren Sie pymysql und cryptography, falls sie nicht vorhanden sind
def ensure_dependencies_installed():
    try:
        import pymysql
        print("pymysql ist bereits installiert.")
    except ImportError:
        print("pymysql ist nicht installiert. Installation wird durchgeführt...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "pymysql"])
        print("pymysql wurde erfolgreich installiert.")
    
    try:
        import cryptography
        print("cryptography ist bereits installiert.")
    except ImportError:
        print("cryptography ist nicht installiert. Installation wird durchgeführt...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "cryptography"])
        print("cryptography wurde erfolgreich installiert.")

# Lade die Konfigurationsdatei
def load_config(config_file_path):
    with open(config_file_path, 'r') as file:
        config = json.load(file)
    return config

# Verbindungsschema für MySQL
def create_db_engine(config):
    ensure_dependencies_installed()  # Sicherstellen, dass pymysql und cryptography installiert sind
    db_type = "mysql+pymysql"
    url = f"{db_type}://{config['db_user']}:{config['db_password']}@{config['db_host']}:{config['db_port']}/{config['db_name']}"
    return create_engine(url, echo=False)

# Sitzung und Basis erstellen
Base = declarative_base()

# Enum für den Markt
class Market(enum.Enum):
    crypto = 'crypto'
    stock = 'stock'
    forex = 'forex'
    futures = 'futures'

# Tabellendefinitionen
class Symbol(Base):
    __tablename__ = 'symbol'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ticker = Column(String(50), nullable=False)
    name = Column(String(200), nullable=False)
    market = Column(Enum(Market), nullable=False)
    active = Column(Boolean, nullable=False)

class MinuteBar(Base):
    __tablename__ = 'minute_bar'
    id = Column(Integer, primary_key=True, autoincrement=True)
    date = Column(DateTime, nullable=False)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)
    symbol_id = Column(Integer, ForeignKey('symbol.id', ondelete="CASCADE"), nullable=False)
    symbol = relationship('Symbol', backref='minute_bars')

def extract_and_save_csv(zip_file_path, output_csv_path):
    with ZipFile(zip_file_path) as zf:
        cols = ['time', 'open', 'high', 'low', 'close', 'volume']
        dfs = pd.concat({text_file.filename.split('.')[0]: pd.read_csv(zf.open(text_file.filename), usecols=cols)
            for text_file in zf.infolist() if text_file.filename.endswith('.csv')})
    
    df = dfs.droplevel(1).reset_index().rename(columns={'index': 'ticker'})
    df = df[df['ticker'].str.contains('usd')]
    df['date'] = pd.to_datetime(df['time'], unit='ms')
    df = df.sort_values(by=['date', 'ticker']).drop(columns='time').set_index(['date', 'ticker'])
    
    df.to_csv(output_csv_path)
    print(f"CSV-Datei wurde erfolgreich unter {output_csv_path} gespeichert.")

def load_data_from_csv(csv_file_path):
    # Laden Sie die CSV-Datei und geben Sie die ersten Zeilen aus, um den Inhalt zu überprüfen
    bars1m = pd.read_csv(csv_file_path)


    # Überprüfen Sie, ob die erforderlichen Spalten vorhanden sind
    if 'date' not in bars1m.columns or 'ticker' not in bars1m.columns:
        print("CSV-Datei geladen. Erste Zeilen:")
        print(bars1m.head())
        raise ValueError("Die CSV-Datei muss die Spalten 'date' und 'ticker' enthalten.")

    # Setzen des Index und Konvertieren des Datums
    bars1m = bars1m.set_index(['date', 'ticker'])
    bars1m.index = bars1m.index.set_levels([pd.to_datetime(bars1m.index.levels[0]), bars1m.index.levels[1]], level=['date', 'ticker'])
    bars1m = bars1m.sort_index()
    return bars1m

def process_and_insert_data(session, bars1m, symbol_filter=None):
    if symbol_filter:
        bars1m = bars1m.query(f'ticker == "{symbol_filter}"')
    
    # Resample auf 1-Minuten-Intervalle
    bars1m = bars1m.reset_index().set_index('date').groupby('ticker').resample('1min').last().droplevel(0)
    bars1m.loc[:, bars1m.columns[:-1]] = bars1m[bars1m.columns[:-1]].ffill()
    bars1m.loc[:, 'volume'] = bars1m['volume'].fillna(value=0.0)
    bars1m = bars1m.reset_index().sort_values(by=['date', 'ticker']).set_index(['date', 'ticker'])
    
    tickers = bars1m.index.get_level_values(1).unique()
    latest_date = bars1m.index.get_level_values('date').max()
    active_tickers = bars1m.loc[latest_date].index.get_level_values('ticker').unique()
    
    symbols = pd.DataFrame(tickers, columns=['ticker'])
    symbols['name'] = symbols['ticker']
    symbols['market'] = 'crypto'
    symbols['active'] = np.where(symbols['ticker'].isin(active_tickers), True, False)
    symbols = symbols.sort_values(by='ticker')
    
    total_symbols = len(symbols)
    for i, r in symbols.iterrows():
        try:
            print(f"Uploading symbol {i+1}/{total_symbols}: {r['ticker']}")
            
            symbol = Symbol(ticker=r['ticker'], name=r['name'], market=Market[r['market']], active=r['active'])
            session.add(symbol)
            session.commit()
            
            # Überprüfen, ob der Index existiert
            if r['ticker'] in bars1m.index.get_level_values('ticker'):
                bars = bars1m.xs(r['ticker'], level='ticker').reset_index()
                bars['symbol_id'] = symbol.id
                
                session.bulk_insert_mappings(MinuteBar, bars.to_dict(orient='records'))
                session.commit()
            else:
                print(f"Ticker {r['ticker']} nicht im Index gefunden.")
        except Exception as e:
            print(f"An error occurred while uploading symbol {r['ticker']}: {e}")
            session.rollback()

In [2]:
def main_extract_and_save_csv():
    try:
        # Pfad zur config.json Datei
        config_path = 'config.json'
        
        # Einlesen der Konfigurationsdatei
        config = load_config(config_path)
        
        # Pfad zur Registry-Datei
        registry_file_path = '/etc/airflow/airflow_dag_registry.yaml'
        source_path = '/home/ageq/Git_Projects/MLdatalake/source'
        
        
        # Erstellen des vollständigen Pfades zur ZIP-Datei
        zip_file_name = config['zip_file_name']
        
        zip_file_path = os.path.join(source_path, f"{zip_file_name}.zip")
        output_csv_path = os.path.join(source_path, f"{zip_file_name}.csv")
        
        # Entpacken und Speichern der CSV-Datei
        extract_and_save_csv(zip_file_path, output_csv_path)
    except Exception as e:
        print(f"An error occurred: {e}")
        
main_extract_and_save_csv()

An error occurred: Usecols do not match columns, columns expected but not found: ['time']


In [2]:
def main():
    try:
        # Pfad zur config.json Datei
        config_path = 'config.json'
        
        # Einlesen der Konfigurationsdatei
        config = load_config(config_path)
        
        # Pfad zur Registry-Datei
        registry_file_path = '/etc/airflow/airflow_dag_registry.yaml'
        source_path = '/home/ageq/Git_Projects/MLdatalake/source'
        
        
        # Erstellen des vollständigen Pfades zur ZIP-Datei
        zip_file_name = config['zip_file_name']
        
        output_csv_path = os.path.join(source_path, f"{zip_file_name}.csv")

        
        # Erstellen der Datenbank-Engine
        engine = create_db_engine(config)
        Base.metadata.create_all(engine)
        Session = sessionmaker(bind=engine)
        session = Session()

        # Laden der Daten aus der CSV-Datei
        bars1m = load_data_from_csv(output_csv_path)
        
        #print(bars1m.head(5))
        
        # Optional: Filter für ein bestimmtes Symbol setzen
        symbol_filter = None  # Beispiel: Nur Daten für BTCUSD importieren, setze symbol_filter = "btcusd"
        process_and_insert_data(session, bars1m, symbol_filter)

        print("Data imported successfully")
    except Exception as e:
        print(f"An error occurred: {e}")


main()

pymysql ist bereits installiert.
cryptography ist bereits installiert.
Uploading symbol 6/113: adausd
Uploading symbol 52/113: algusd
Uploading symbol 39/113: ampusd
Uploading symbol 57/113: antusd
Uploading symbol 81/113: apeusd
Uploading symbol 44/113: aptusd
Uploading symbol 29/113: arbusd
Uploading symbol 7/113: atousd
Uploading symbol 76/113: axsusd
Uploading symbol 107/113: b2musd
Uploading symbol 68/113: balusd
Uploading symbol 64/113: batusd
Uploading symbol 30/113: bgbusd
Uploading symbol 111/113: bmnusd
Uploading symbol 78/113: bntusd
Uploading symbol 8/113: btcusd
Uploading symbol 9/113: bttusd
Uploading symbol 22/113: ccdusd
Uploading symbol 86/113: chzusd
Uploading symbol 98/113: clousd
Uploading symbol 69/113: crvusd
Uploading symbol 93/113: daiusd
Uploading symbol 31/113: dgbusd
Uploading symbol 10/113: dotusd


In [None]:
##alternativer Zähler:
def process_and_insert_data(session, bars1m, symbol_filter=None):
    if symbol_filter:
        bars1m = bars1m.query(f'ticker == "{symbol_filter}"')
    
    # Resample auf 1-Minuten-Intervalle
    bars1m = bars1m.reset_index().set_index('date').groupby('ticker').resample('1min').last().droplevel(0)
    bars1m.loc[:, bars1m.columns[:-1]] = bars1m[bars1m.columns[:-1]].ffill()
    bars1m.loc[:, 'volume'] = bars1m['volume'].fillna(value=0.0)
    bars1m = bars1m.reset_index().sort_values(by=['date', 'ticker']).set_index(['date', 'ticker'])
    
    tickers = bars1m.index.get_level_values(1).unique()
    latest_date = bars1m.index.get_level_values('date').max()
    active_tickers = bars1m.loc[latest_date].index.get_level_values('ticker').unique()
    
    symbols = pd.DataFrame(tickers, columns=['ticker'])
    symbols['name'] = symbols['ticker']
    symbols['market'] = 'crypto'
    symbols['active'] = np.where(symbols['ticker'].isin(active_tickers), True, False)
    symbols = symbols.sort_values(by='ticker')
    
    total_symbols = len(symbols)
    for i, r in enumerate(symbols.itertuples(), 1):
        try:
            print(f"Uploading symbol {i}/{total_symbols}: {r.ticker}")
            
            symbol = Symbol(ticker=r.ticker, name=r.name, market=Market[r.market], active=r.active)
            session.add(symbol)
            session.commit()
            
            # Überprüfen, ob der Index existiert
            if r.ticker in bars1m.index.get_level_values('ticker'):
                bars = bars1m.xs(r.ticker, level='ticker').reset_index()
                bars['symbol_id'] = symbol.id
                
                session.bulk_insert_mappings(MinuteBar, bars.to_dict(orient='records'))
                session.commit()
            else:
                print(f"Ticker {r.ticker} nicht im Index gefunden.")
        except Exception as e:
            print(f"An error occurred while uploading symbol {r.ticker}: {e}")
            session.rollback()

In [2]:
from sqlalchemy import create_engine, inspect, Column, Integer, String, Float, DateTime, Enum, ForeignKey, Boolean
from sqlalchemy.orm import sessionmaker

url = "mysql+pymysql://mldatalake_user:userpassword@localhost:3308/mldatalake"
engine = create_engine(url, echo=True)

inspector = inspect(engine)
table_names = inspector.get_table_names()
print(table_names)

Session = sessionmaker(bind=engine)
session = Session()

2024-10-03 18:59:54,872 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2024-10-03 18:59:54,873 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-03 18:59:54,876 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2024-10-03 18:59:54,877 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-03 18:59:54,879 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2024-10-03 18:59:54,880 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-10-03 18:59:54,884 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-10-03 18:59:54,885 INFO sqlalchemy.engine.Engine SHOW FULL TABLES FROM `mldatalake`
2024-10-03 18:59:54,886 INFO sqlalchemy.engine.Engine [raw sql] {}


2024-10-03 18:59:54,890 INFO sqlalchemy.engine.Engine ROLLBACK
['five_minute_bar', 'minute_bar', 'symbol', 'thirty_minute_bar']


In [1]:
from sqlalchemy import create_engine, inspect, Column, Integer, String, Float, DateTime, Enum, ForeignKey, Boolean
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
import enum

# Datenbankverbindung herstellen
url = "mysql+pymysql://mldatalake_user:userpassword@localhost:3308/mldatalake"
engine = create_engine(url, echo=False)

# Inspektor verwenden, um Tabellen zu überprüfen
inspector = inspect(engine)
table_names = inspector.get_table_names()
print("Tabellen in der Datenbank:", table_names)

# Basis für SQLAlchemy-Modelle definieren
Base = declarative_base()

# Enum für den Markt
class Market(enum.Enum):
    crypto = 'crypto'
    stock = 'stock'
    forex = 'forex'
    futures = 'futures'

# Modell für die Tabelle 'symbol' definieren
class Symbol(Base):
    __tablename__ = 'symbol'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ticker = Column(String(50), nullable=False)
    name = Column(String(200), nullable=False)
    market = Column(Enum(Market), nullable=False)
    active = Column(Boolean, nullable=False)

# Modell für die Tabelle 'minute_bar' definieren
class MinuteBar(Base):
    __tablename__ = 'minute_bar'
    id = Column(Integer, primary_key=True, autoincrement=True)
    date = Column(DateTime, nullable=False)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)
    symbol_id = Column(Integer, ForeignKey('symbol.id', ondelete="CASCADE"), nullable=False)
    symbol = relationship('Symbol', backref='minute_bars')

# Sitzung erstellen
Session = sessionmaker(bind=engine)
session = Session()

# Überprüfen, ob die Tabelle 'minute_bar' existiert
if 'minute_bar' in table_names:
    # Anzahl der Einträge in der Tabelle 'minute_bar' überprüfen
    count = session.query(MinuteBar).count()
    print(f"Anzahl der Einträge in der Tabelle 'minute_bar': {count}")

    # Daten aus der Tabelle 'minute_bar' abrufen und die letzten 5 Einträge ausgeben
    minute_bars = session.query(MinuteBar).order_by(MinuteBar.date.desc()).limit(5).all()
    if minute_bars:
        for bar in reversed(minute_bars):  # reversed, um die Einträge in aufsteigender Reihenfolge anzuzeigen
            print(f"Date: {bar.date}, Symbol: {bar.symbol.ticker}, Open: {bar.open}, Close: {bar.close}, High: {bar.high}, Low: {bar.low}, Volume: {bar.volume}")
    else:
        print("Keine Einträge in der Tabelle 'minute_bar' gefunden.")
else:
    print("Die Tabelle 'minute_bar' existiert nicht in der Datenbank.")

Tabellen in der Datenbank: ['five_minute_bar', 'minute_bar', 'symbol', 'thirty_minute_bar']
Anzahl der Einträge in der Tabelle 'minute_bar': 17638421
Date: 2023-10-08 09:28:00, Symbol: btcusd, Open: 27912.0, Close: 27912.0, High: 27912.0, Low: 27912.0, Volume: 0.00044152
Date: 2023-10-08 09:28:00, Symbol: eosusd, Open: 0.5647, Close: 0.5647, High: 0.5647, Low: 0.5647, Volume: 125.433
Date: 2023-10-08 09:28:00, Symbol: filusd, Open: 3.4256, Close: 3.427, High: 3.427, Low: 3.4256, Volume: 28.618
Date: 2023-10-08 09:28:00, Symbol: iotusd, Open: 0.15376, Close: 0.15376, High: 0.15376, Low: 0.15376, Volume: 375.219
Date: 2023-10-08 09:28:00, Symbol: etcusd, Open: 15.51, Close: 15.51, High: 15.51, Low: 15.51, Volume: 0.52338


In [None]:
# import_data_notebook.ipynb

import sys
import os
import json
from importlib import import_module

# Ermitteln des aktuellen Verzeichnisses des Notebooks
current_dir = os.path.dirname(os.path.abspath('__file__'))

# Füge das Verzeichnis hinzu, in dem sich die Module befinden
modules_dir = os.path.join(current_dir, 'modules')
sys.path.append(modules_dir)

# Importiere die Module
create_database = import_module('create_database')
extract_and_save_csv = import_module('extract_and_save_csv')
import_to_db = import_module('import_to_db')

# Lade die Konfigurationsdatei
def load_config(config_file_path):
    with open(config_file_path, 'r') as file:
        config = json.load(file)
    return config

source = '/home/ageq/Git_Projects/MLdatalake/source/'

# Beispiel für die Ausführung im Notebook
config_file_path = os.path.join(current_dir, 'config.json')
# Zielspeicherpfad
csv_file_path = os.path.join(source, 'gespeicherter_dataframe.csv')
symbol_filter = None  # Optional: Nur Daten für ein bestimmtes Symbol importieren, z.B. "btcusd"



In [None]:
# 1. Erstelle die Datenbank und Tabellen
create_database.main(config_file_path)

In [None]:
# 2. Extrahiere und speichere die CSV-Datei
zip_file_path = os.path.join(source, 'trimmed_file.zip')
extract_and_save_csv.extract_and_save_csv(zip_file_path, csv_file_path)

In [None]:
# 3. Importiere die Daten in die Datenbank
import_to_db.main(config_file_path, csv_file_path, symbol_filter)