In [2]:
import os
import json
import random
import psycopg2
import mysql.connector
from faker import Faker
from datetime import datetime
from PIL import Image, ImageDraw
from kafka import KafkaProducer

# üìå Initialiser Faker pour g√©n√©rer des donn√©es r√©alistes
fake = Faker()

# üìÅ Dossiers pour stocker les fichiers
os.makedirs("data_lake/raw", exist_ok=True)
os.makedirs("data_lake/raw/images", exist_ok=True)
os.makedirs("data_lake/raw/logs", exist_ok=True)

# ======================================
# üìå 1. G√©n√©ration de Donn√©es Relationnelles (PostgreSQL & MySQL)
# ======================================
def generate_sql_data():
    try:
        # üìå Connexion PostgreSQL
        pg_conn = psycopg2.connect(database="ecommerce_pg", user="postgres", password="azerty143rose", host="localhost", port="5432")
        pg_cursor = pg_conn.cursor()
        pg_cursor.execute("CREATE TABLE IF NOT EXISTS clients (id SERIAL PRIMARY KEY, nom VARCHAR(255), email VARCHAR(255), pays VARCHAR(100));")

        # üìå Connexion MySQL
        my_conn = mysql.connector.connect(user='logco', password='logCo@143', host='localhost', database='ecommerce_mysql')
        my_cursor = my_conn.cursor()
        my_cursor.execute("CREATE TABLE IF NOT EXISTS transactions (id INT AUTO_INCREMENT PRIMARY KEY, client_id INT, montant DECIMAL(10,2), date_achat DATE);")

        # üìå Ins√©rer des donn√©es
        for _ in range(101):
            name, email, country = fake.name(), fake.email(), fake.country()
            pg_cursor.execute("INSERT INTO clients (nom, email, pays) VALUES (%s, %s, %s);", (name, email, country))

            client_id, amount, date = random.randint(1, 10), round(random.uniform(10, 500), 2), fake.date()
            my_cursor.execute("INSERT INTO transactions (client_id, montant, date_achat) VALUES (%s, %s, %s);", (client_id, amount, date))

        pg_conn.commit()
        my_conn.commit()
        print("‚úÖ Donn√©es SQL g√©n√©r√©es avec succ√®s !")

    except Exception as e:
        print(f"‚ö† Erreur SQL : {e}")

    finally:
        pg_cursor.close()
        pg_conn.close()
        my_cursor.close()
        my_conn.close()

# ======================================
# üìå 2. G√©n√©ration de Donn√©es Semi-Structur√©es (CSV & JSON)
# ======================================
def generate_csv_json():
    # üìå G√©n√©rer un fichier CSV (Produits)
    with open("data_lake/raw/produits.csv", "w") as f:
        f.write("id,nom,prix,categorie\n")
        for i in range(1, 101):
            f.write(f"{i},{fake.word()},{round(random.uniform(5, 200), 2)},{fake.word()}\n")
    print("‚úÖ Fichier produits.csv g√©n√©r√© !")

    # üìå G√©n√©rer un fichier JSON (Commandes)
    commandes = [{"id": i, "produit": fake.word(), "quantit√©": random.randint(1, 5), "date": fake.date()} for i in range(10)]
    with open("data_lake/raw/commandes.json", "w") as f:
        json.dump(commandes, f, indent=4)
    print("‚úÖ Fichier commandes.json g√©n√©r√© !")

# ======================================
# üìå 3. G√©n√©ration de Donn√©es Non Structur√©es (Texte & Logs)
# ======================================
def generate_text_logs():
    # üìå G√©n√©rer un fichier texte (Avis clients)
    with open("data_lake/raw/avis_clients.txt", "w") as f:
        for _ in range(101):
            f.write(fake.text() + "\n")
    print("‚úÖ Fichier avis_clients.txt g√©n√©r√© !")

    # üìå G√©n√©rer un fichier log (Logs serveur)
    with open("data_lake/raw/logs/serveur.log", "w") as f:
        for _ in range(101):
            log = f"{datetime.now()} - INFO - {fake.sentence()}"
            f.write(log + "\n")
    print("‚úÖ Fichier serveur.log g√©n√©r√© !")

# ======================================
# üìå 4. G√©n√©ration d'Images (Non Structur√©es)
# ======================================
def generate_images():
    for i in range(50):
        img = Image.new("RGB", (100, 100), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
        draw = ImageDraw.Draw(img)
        draw.text((10, 40), f"Prod {i+1}", fill=(255, 255, 255))
        img.save(f"data_lake/raw/images/produit_{i+1}.png")
    print("‚úÖ Images produits g√©n√©r√©es !")

# ======================================
# üìå 5. G√©n√©ration de Donn√©es en Streaming (Kafka)
# ======================================
def generate_kafka_data():
    try:
        producer = KafkaProducer(bootstrap_servers="localhost:9092")
        for _ in range(10):
            transaction = {
                "client_id": random.randint(1, 10),
                "montant": round(random.uniform(10, 500), 2),
                "date": fake.date()
            }
            producer.send("transactions", json.dumps(transaction).encode("utf-8"))
        print("‚úÖ Messages envoy√©s √† Kafka !")
    except Exception as e:
        print(f"‚ö† Erreur Kafka : {e}")

# ======================================
# üìå Ex√©cuter toutes les fonctions
# ======================================
if __name__ == "__main__":
    generate_sql_data()
    generate_csv_json()
    generate_text_logs()
    generate_images()
    generate_kafka_data()


‚úÖ Donn√©es SQL g√©n√©r√©es avec succ√®s !
‚úÖ Fichier produits.csv g√©n√©r√© !
‚úÖ Fichier commandes.json g√©n√©r√© !
‚úÖ Fichier avis_clients.txt g√©n√©r√© !
‚úÖ Fichier serveur.log g√©n√©r√© !
‚úÖ Images produits g√©n√©r√©es !
‚ö† Erreur Kafka : NoBrokersAvailable
