# -------- Installing Dependencies --------

In [None]:
pip install psycopg2

In [None]:
pip install streamlit streamlit-apexjs streamlit-card streamlit-elements streamlit-option-menu streamlit-pandas-profiling Markdown markdown-it-py MarkupSafe matplotlib numerize scikit-learn seaborn pandas-profiling

In [None]:
pip install pandas

# ----------- Imports ----------------

In [None]:
import csv
import json
import os
import requests
from requests.auth import HTTPBasicAuth
import json
from dotenv import load_dotenv
from datetime import datetime
import pandas as pd
import psycopg2
import streamlit as st
from streamlit_option_menu import option_menu
from numerize.numerize import numerize
from datetime import timedelta
import subprocess
import time
import concurrent.futures

# Loading the env variables
load_dotenv()
db_name=os.getenv("DB")
db_user=os.getenv("DB_USER")
db_pwd=os.getenv("DB_PASSWORD")
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')

## --------- ETL PROCESS ---------

In [None]:
#Excécute  ETL PROCESS
import subprocess

def execute_batch_file(batch_file_path):
    try:
        batch_command = f"cmd /c {batch_file_path}"
        
        process = subprocess.Popen(batch_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()

        print("Sortie standard:")
        print(stdout.decode())
        
        if stderr:
            print("Erreurs:")
            print(stderr.decode())
        
        # Vérification du code de sortie
        if process.returncode == 0:
            print("Le fichier batch a été exécuté avec succès.")
        else:
            print(f"Erreur lors de l'exécution du fichier batch. Code de sortie : {process.returncode}")
    
    except Exception as e:
        print(f"Une erreur s'est produite : {e}")

batch_file_path = r"D:\workspace\BI_PROJECT\build\jAlimentationDB\jAlimentationDB_run.bat"
execute_batch_file(batch_file_path)


# --------- Agent Work ---------

In [None]:
#Database connection 

try:
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(f"host={db_host} port={db_port} dbname={db_name} user={db_user} password={db_pwd} "
    )
    if conn is not None:
        print('> Database connection established ... ')
    else:
        print("> There is no  connection ! Check your connection prameters! ")
    

except (Exception) as e:
    print(e)

In [None]:
#WORKER JOB

def surveiller_seuils_ventes(connection, seuil_inferieur, seuil_superieur):
    try:
        with connection.cursor() as cursor:
            query = """
                SELECT COUNT(*)
                FROM VENTE_DWH.FAIT_VENTE
                WHERE NB_VENTE < %s OR NB_VENTE > %s
            """
            cursor.execute(query, (seuil_inferieur, seuil_superieur))
            count = cursor.fetchone()[0]
            
            if count > 0:
                print(f"Alerte : Il y a {count} enregistrements avec des ventes en dehors des seuils.")
            else:
                print("Aucune alerte : Les ventes sont dans les limites attendues.")
    except Exception as e:
        print(f"Erreur lors de la surveillance des seuils de ventes : {e}")

def surveiller_performances_requetes(connection):
    try:
        with connection.cursor() as cursor:
            query = """
                SELECT query, total_time
                FROM pg_stat_statements
                ORDER BY total_time DESC
                LIMIT 5
            """
            cursor.execute(query)
            results = cursor.fetchall()
            
            print("Top 5 des requêtes les plus lentes :")
            for row in results:
                print(f"Requête : {row[0]}, Temps total d'exécution : {row[1]} ms")
    except Exception as e:
        print(f"Erreur lors du suivi des performances des requêtes SQL : {e}")

def surveiller_modifications_schema(connection):
    try:
        with connection.cursor() as cursor:
            query = """
                SELECT table_name, column_name, data_type
                FROM information_schema.columns
                WHERE table_schema = VENTE_DWH AND table_name = DIM_CLIENT
            """
            cursor.execute(query)
            results = cursor.fetchall()
            
            print("Modifications de schéma détectées :")
            for row in results:
                print(f"Table : {row[0]}, Colonne : {row[1]}, Type de données : {row[2]}")
    except Exception as e:
        print(f"Erreur lors de la surveillance des modifications de schéma : {e}")

def mettre_a_jour_prix_produits(connection):
    try:
        with connection.cursor() as cur:
            query = """
                UPDATE VENTE_DWH.DIM_PRODUIT
                SET PRIX_ACHAT_PRODUIT = PRIX_ACHAT_PRODUIT * 1.05,
                    PRIX_VENTE_PRODUIT = PRIX_VENTE_PRODUIT * 1.05
                WHERE BL_LIGNE_ACTIVE = 1
            """
            cur.execute(query)
            print(cur)
            connection.commit()
            print(f"{cur} prix de produits ont été mis à jour.")
    except Exception as e:
        print(f"Erreur lors de la mise à jour des prix des produits : {e}")
if conn :
    # Create a ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit the functions to the executor
        futures = [
            executor.submit(mettre_a_jour_prix_produits, conn),
            executor.submit(surveiller_seuils_ventes, conn, 2, 10),
            executor.submit(surveiller_modifications_schema, conn),
            executor.submit(surveiller_performances_requetes, conn),
        ]

        # Wait for all tasks to complete
        concurrent.futures.wait(futures)




In [None]:
import pandas as pd
import psycopg2


# Establish a connection to your PostgreSQL database


db_name='dwh',
db_user='postgres',
db_pwd='postgres',
db_host='127.0.0.1',
db_port='5432' 
try:
    conn = psycopg2.connect(f"host={db_host} port={db_port} dbname={db_name} user={db_user} password={db_pwd} ")
    cur = conn.cursor()
    print("Connected to the database!")
    # Perform database operations here
    query = '''SELECT 

    VILLE_CLIENT,
    PAYS_CLIENT,
    SUM(NB_VENTE) AS TotalSales
    FROM 
        DIM_CLIENT 
    JOIN 
        FAIT_VENTE ON DIM_CLIENT.ID_DIM_CLIENT = FAIT_VENTE.ID_DIM_CLIENT
    GROUP BY CUBE ( VILLE_CLIENT, PAYS_CLIENT)
    ORDER BY ( VILLE_CLIENT, PAYS_CLIENT);'''

    # cur.execute(query)

    sales_data = pd.read_sql(query,conn)



    # print(SALES_LOCATIONS_DIM)

    

    # Close the database connection
    conn.close()

    # Display the top 10 selling countries (pays)
    lowest_10_pays = sales_data[sales_data['Pays'] != 'Total'].groupby('Pays')['TotalSales'].sum().nsmallest(10)
    print("lowest 10 Selling Countries (Pays):")
    print(lowest_10_pays)
    print("add more ads")

    # Display the top 10 selling cities (villes)
    lowest_10_villes = sales_data[sales_data['City'] != 'Total'].groupby('City')['TotalSales'].sum().nsmallest(10)
    print("\n lowest 10 Selling Cities (Villes):")
    print(lowest_10_villes)
    print("create promotions")

    # Display the top 10 best-selling products
    top_10_products = sales_data[sales_data['ProductName'] != 'Total'].groupby('ProductName')['TotalSales'].sum().nlargest(10)
    print("\nTop 10 Best-Selling Products:")
    print(top_10_products)
    print("stock more") 

except Exception as e:
    print(f"Error: {e}")






In [None]:

# Fermer la connexion à la base de données
conn.close()