In [1]:
"""Module to ingest parquet data into the database."""
import os
import sys
import glob
from typing import List, Any, Tuple, Dict
from datetime import datetime
from urllib.parse import urlparse
import pandas as pd
from prisma import Prisma


from backend.paths import DATA_DIR, ROOT_DIR, MLB_DATA_DIR




In [2]:

def get_parquet_files() -> List[str]:
    """Get the list of parquet files in the data directory."""
    pattern = f"{MLB_DATA_DIR}/**/*.parquet"
    return list(glob.iglob(pattern, recursive=True))

parquet_file_list = get_parquet_files()
print(parquet_file_list)


['/home/jbox/coding/ballerz/src/backend/data/mlb/teams.parquet', '/home/jbox/coding/ballerz/src/backend/data/mlb/roster.parquet', '/home/jbox/coding/ballerz/src/backend/data/mlb/people.parquet']


In [None]:
def create_prisma_db_tables()

In [4]:
def get_parquet_files() -> List[str]:
    """Get the list of parquet files in the data directory."""
    parquet_files = []
    for root, _, files in os.walk(MLB_DATA_DIR):
        parquet_files.extend(
            os.path.join(root, file)
            for file in files
            if file.endswith(".parquet")
        )
    return parquet_files


def get_table_name(file_path: str) -> str:
    """Get the table name from the file path."""
    return os.path.basename(file_path).replace(".parquet", "")


get_table_name(get_parquet_files()[0])

'teams'

In [None]:

def ingest_parquet_data(file_name: str) -> Tuple[Dict[str, Any], List[Document]]:
    """Ingest parquet data into the database.

    Args:
        file_name (str): The name of the parquet file to ingest.

    Returns:
        Tuple[Dict[str, Any], List[Document]]: A tuple containing the metadata and the list of documents ingested.
    """
    # Prisma instance
    db = Prisma()
    db.connect()

    # Read the parquet file
    file_path = os.path.join(DATA_DIR, file_name)
    df = pd.read_parquet(file_path)

    # Get the metadata
    metadata = {
        'file_name': file_name,
        'file_path': file_path,
        'num_rows': df.shape[0],
        'num_columns': df.shape[1],
        'columns': df.columns.tolist(),
        'created_at': datetime.now(),
    }

    # Ingest the data
    documents = []
    for i, row in df.iterrows():
        document = db.document.create(
            {
                'data': row.to_dict(),
                'metadata': metadata,
            }
        )
        documents.append(document)

    db.disconnect()

    return metadata, documents  # type: ignore                                                                  

