In [None]:
pip install mysql-connector-python pandas gspread oauth2client sqlalchemy rich

In [28]:
from rich.console import Console
from rich.table import Table
from rich import print
import pandas as pd
from sqlalchemy import create_engine


def show_dataframe(df, title=None):
    console = Console()

    if title:
        console.print(f"\n[bold green]{title}[/bold green]")

    table = Table(show_header=True, header_style="bold white on green")
    for col in df.columns:
        table.add_column(col)

    for _, row in df.iterrows():
        table.add_row(*[str(val) for val in row])

    console.print(table)


# Database credentials
DATABASE_URL = f"mysql+mysqlconnector://qa-general:5twnhWXLmLvXzwkL@qa-delivery.cluster-ctxzyzjrixle.us-east-2.rds.amazonaws.com:3306/delivery"
engine = create_engine(DATABASE_URL)


def call_query(sql_query) -> pd.DataFrame:
    try:
        # Execute query using SQLAlchemy engine
        with engine.connect() as connection:
            df = pd.read_sql(sql_query, connection)
            return df
    except Exception as err:
        print(f"Error: {err}")
        return None


def execute_query_from_file(filepath):
    try:
        with open(filepath, "r") as file:
            sql_query = file.read()
        return sql_query
    except FileNotFoundError:
        print(f"Error: File '{filepath}' not found.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None


def get_all_views_schema():
    """
    Retrieves the schema of all views in the database and saves them as SQL files.
    """
    try:
        # Query to fetch all view names
        query_views = f"""
        SELECT TABLE_NAME 
        FROM INFORMATION_SCHEMA.VIEWS 
        WHERE TABLE_SCHEMA = 'delivery';
        """
        views = call_query(query_views)

        if views is not None and not views.empty:
            for view_name in views["TABLE_NAME"]:
                # Query to get the view schema
                query_view_schema = f"SHOW CREATE VIEW `{view_name}`;"
                view_schema = call_query(query_view_schema)

                if view_schema is not None and not view_schema.empty:
                    # Save the schema to a file
                    create_statement = view_schema.iloc[0]["Create View"]

                    # Save original view definition
                    with open(f"views/{view_name}.sql", "w", encoding="utf-8") as file:
                        file.write(create_statement)

                    # Extract and save just the SELECT portion
                    select_statement = create_statement[
                        create_statement.lower().find("select") :
                    ]
                    with open(
                        f"views_test/{view_name}.sql", "w", encoding="utf-8"
                    ) as file:
                        file.write(select_statement)
                    print(f"View '{view_name}' schema saved to views/{view_name}.sql")
                    print(
                        f"View '{view_name}' select saved to views_test/{view_name}.sql"
                    )
        else:
            print("No views found in the database.")
    except Exception as e:
        print(f"Error retrieving views: {e}")


def export_excel(data_frame, excel_filename="DF.xlsx"):
    data_frame.to_excel(excel_filename, index=False)
    print(f"Datos exportados a {excel_filename}")

In [None]:
get_all_views_schema()

In [None]:
query_dw_base_financiera = "SELECT * FROM dw_base_financiera"
df_bf = call_query(query_dw_base_financiera)
print(df_bf)

In [14]:
# Consulta cantidad de registros en tablas principales
query_dw_base_financiera = "SELECT COUNT(*) FROM dw_base_financiera"
total_base_financiera = call_query(query_dw_base_financiera)
print(f"Total de financiera: {total_base_financiera.iloc[0,0]}")

query_pedidos = "SELECT COUNT(*) FROM pedidos"
total_pedidos = call_query(query_pedidos)
print(f"Total de pedidos: {total_pedidos.iloc[0,0]}")
query_usuarios = "SELECT COUNT(*) FROM usuario"
total_usuarios = call_query(query_usuarios)
print(f"Total de usuarios: {total_usuarios.iloc[0,0]}")

query_comercios = "SELECT COUNT(*) FROM comercio"
total_comercios = call_query(query_comercios)
print(f"Total de comercios: {total_comercios.iloc[0,0]}")

Total de financiera: 129767
Total de pedidos: 281971
Total de usuarios: 238505
Total de comercios: 456


In [30]:
query_fechas = """
SELECT 
    NOW() as fecha_actual,
    date_format((now() - interval (((weekday(now()) + 7) % 7) + 7) day),'%Y-%m-%d') as fecha_inicio_filtro,
    MIN(PE.FechaEstimadaEntrega) as primera_fecha,
    MAX(PE.FechaEstimadaEntrega) as ultima_fecha
FROM pedidos P
JOIN pedido_entrega PE ON PE.IdPedido = P.IdPedido
"""

df_fechas = call_query(query_fechas)
# Mostrar el dataframe de fechas
show_dataframe(df_fechas, "Fechas de análisis:")

In [31]:
query_fechas = """
SELECT COUNT(*) 
FROM pedidos P
JOIN pedido_entrega PE ON PE.IdPedido = P.IdPedido
JOIN sucursal S ON P.IdSucursal = S.IdSucursal
JOIN comercio C ON S.IdComercio = C.IdComercio
WHERE C.CodigoCentroNegocio = 'AGIL_SOFT'
AND C.IdComercio <> 8;
"""

df_fechas = call_query(query_fechas)
# Mostrar el dataframe de fechas
show_dataframe(df_fechas, "Fechas de análisis:")

In [39]:
query_fechas = """
-- Sugerencia de corrección:
"""

df_fechas = call_query(query_fechas)
# Mostrar el dataframe de fechas
show_dataframe(df_fechas, "Fechas de análisis:")