# Ćwiczenie: Budowa pipeline dla danych ustrukturyzowanych i nieustrukturyzowanych

**Czas trwania:** 60 minut

**Cel ćwiczenia:** Stworzymy kompleksowy pipeline danych, który będzie pobierał dane z różnych źródeł, zarówno ustrukturyzowanych jak i nieustrukturyzowanych, i przygotowywał je do dalszego przetwarzania przez DataBota.

## 1. Wprowadzenie do pipeline'u danych dla RAG

Retrieval Augmented Generation (RAG) to podejście, które łączy możliwości generatywne modeli językowych z dostępem do zewnętrznych źródeł danych. Aby zbudować efektywny DataBot, musimy stworzyć pipeline danych, który pozwoli na:

1. Pobieranie danych z różnych źródeł organizacyjnych
2. Przetwarzanie tych danych do formatu zrozumiałego dla modelu LLM
3. Przechowywanie i indeksowanie danych w sposób umożliwiający szybkie wyszukiwanie

W tym ćwiczeniu skupimy się na trzech typach danych:
- **Dane nieustrukturyzowane**: dokumenty tekstowe (.docx, .pdf) przechowywane w SharePoint/OneDrive
- **Dane ustrukturyzowane**: dane tabelaryczne przechowywane w bazie danych SQL
- **Dane półstrukturalne**: pliki CSV i JSON przechowywane w Azure Data Lake Storage

Referencyjną architekturę RAG dla Microsoft Azure można znaleźć pod adresem: [Microsoft RAG Solution Design and Evaluation Guide](https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/rag/rag-solution-design-and-evaluation-guide)

In [None]:
!pip install pyodbc
!pip install azure-storage-file-datalake
!pip install fitz

## 2. Połączenie z bazą danych SQL

Pierwszym krokiem jest uzyskanie dostępu do danych strukturalnych przechowywanych w bazie danych SQL.

In [None]:
# Połączenie z bazą danych SQL
import pyodbc
import pandas as pd

# Parametry połączenia - powinny być przechowywane w bezpiecznym miejscu (np. Key Vault)
server = 'your-server.database.windows.net'
database = 'your-database'
username = 'your-username'
password = 'your-password'
driver = '{ODBC Driver 17 for SQL Server}'

# String połączenia
conn_str = f'DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}'

try:
    # Nawiązanie połączenia
    conn = pyodbc.connect(conn_str)
    print("Połączono z bazą danych SQL")
    
    # Przykładowe zapytanie
    query = "SELECT TOP 10 * FROM Customers"  # Dostosuj do swojej struktury bazy danych
    
    # Wykonanie zapytania i pobranie wyników jako DataFrame
    df = pd.read_sql(query, conn)
    print(f"Pobrano {len(df)} wierszy")
    display(df.head())
    
    # Zamknięcie połączenia
    conn.close()
    
except Exception as e:
    print(f"Błąd podczas łączenia z bazą danych: {str(e)}")

## 4. Praca z danymi nieustrukturyzowanymi w Azure Data Lake

Teraz zajmiemy się pobieraniem i przetwarzaniem danych przechowywanych w Azure Data Lake Storage.

In [None]:
# Praca z plikami w Azure Data Lake Storage
from azure.storage.filedatalake import DataLakeServiceClient
import pandas as pd
import json
import io
import fitz

# Parametry połączenia
storage_account_name = ""
storage_account_key = ""  # Powinien być przechowywany w bezpiecznym miejscu
file_system_name = ""  # Nazwa kontenera ADLS

# Inicjalizacja klienta usługi
service_client = DataLakeServiceClient(
    account_url=f"https://{storage_account_name}.dfs.core.windows.net",
    credential=storage_account_key
)

# Pobierz referencję do systemu plików
file_system_client = service_client.get_file_system_client(file_system_name)

try:
    pdf_path = ""
    file_client = file_system_client.get_file_client(pdf_path)
    

    with fitz.open("plik.pdf") as doc:
        text = ""
        for page in doc:
            text += page.get_text()

    print(text[:1000])  # Wyświetl pierwsze 1000 znaków
    
except Exception as e:
    print(f"Wystąpił błąd: {str(e)}")

## 6. Walidacja połączenia i testowy odczyt danych

Zanim przejdziemy dalej, powinniśmy przetestować wszystkie nasze połączenia, aby upewnić się, że działają poprawnie.

In [None]:
# Walidacja połączeń - szablon
# Zastąp własnymi wartościami

def validate_connections():
    results = {}
    
   
    # Test 1: Połączenie z bazą danych SQL
    try:
        # Tutaj kod do testu połączenia z SQL
        # ...
        results["sql"] = "Sukces"
    except Exception as e:
        results["sql"] = f"Błąd: {str(e)}"
    
    # Test 2: Połączenie z Azure Data Lake
    try:
        # Tutaj kod do testu połączenia z ADLS
        # ...
        results["adls"] = "Sukces"
    except Exception as e:
        results["adls"] = f"Błąd: {str(e)}"
    
    return results

# Uruchom walidację
# validation_results = validate_connections()
# for source, status in validation_results.items():
#     print(f"Połączenie z {source}: {status}")

## 7. Integracja danych w jednym pipeline

Ostatnim krokiem jest integracja wszystkich źródeł danych w jednym spójnym pipeline'u, który będzie mógł być wykorzystany przez DataBota.

In [None]:
# Integracja wszystkich źródeł danych w jednym pipeline

class DataPipeline:
    def __init__(self):
        # Inicjalizacja połączeń
        self.init_connections()
    
    def init_connections(self):
        # Inicjalizacja połączeń do różnych źródeł danych
        # Kod inicjalizacji SQL, ADLS...
        pass
    
    
    def get_data_from_sql(self, query):
        # Pobieranie danych z SQL
        # ...
        sql_data = None
        return sql_data
    
    def get_data_from_adls(self, file_path):
        # Pobieranie danych z ADLS
        # ...
        adls_data = None
        return adls_data
    
    def process_documents(self, documents):
        # Przetwarzanie dokumentów, np. ekstrakcja tekstu
        # ...
        processed_docs = []
        return processed_docs
    
    def process_structured_data(self, data):
        # Przetwarzanie danych strukturalnych
        # ...
        processed_data = None
        return processed_data
    
    def run_pipeline(self, params):
        # Uruchomienie pełnego pipeline'u
        results = {}
        
        
        # Pobierz i przetwórz dane SQL
        if params.get("sql_enabled", True):
            sql_data = self.get_data_from_sql(params.get("sql_query"))
            results["sql_data"] = self.process_structured_data(sql_data)
        
        # Pobierz i przetwórz dane z ADLS
        if params.get("adls_enabled", True):
            adls_data = self.get_data_from_adls(params.get("adls_path"))
            results["adls_data"] = self.process_structured_data(adls_data)
        
        return results

# Przykład użycia
pipeline = DataPipeline()
pipeline_params = {
    "documents_enabled": True,
    "folder_path": "Documents/DataBotWorkshop",
    "sql_enabled": True,
    "sql_query": "SELECT * FROM Customers WHERE Region = 'Europe'",
    "adls_enabled": True,
    "adls_path": "data/sales_data.csv"
}

# Uruchomienie pipeline'u
# results = pipeline.run_pipeline(pipeline_params)
# print(f"Pipeline zakończony, zebrano dane z {len(results)} źródeł")

## 8. Zadania do wykonania

1. Uzupełnij brakujące fragmenty kodu w klasie `DataPipeline`
2. Zaimplementuj metodę `process_documents`, która będzie ekstrahować tekst z różnych typów dokumentów
3. Zaimplementuj metodę `process_structured_data`, która będzie przetwarzać dane analityczne
4. Uruchom pipeline na przykładowych danych i sprawdź, czy działa poprawnie

