In [1]:
import os
import sys

import dlt
from dlt.sources.sql_database import sql_database
from loguru import logger
from dotenv import load_dotenv

load_dotenv()


True

In [2]:
os.getenv('PG_URL_SECRET')

'postgresql://jira_gcp:GjXqsyc2Mi8bXu8d8eu8@localhost:9999/jira?options=-c%20search_path%3Djira'

In [3]:

logger.remove()  # Supprimer le handler par défaut
logger.add(
    sys.stdout,
    format="<level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan> - <level>{message}</level>",
    level="INFO",
)



1

In [5]:

# def load_jira_data():
    # """
    # Pipeline dlt pour exporter JIRA de PostgreSQL vers BigQuery.
    # """
# Configuration PostgreSQL
pg_url_secret = os.getenv('PG_URL_SECRET', 'postgresql://localhost:5432/jira?user=postgres&password=your_password_here')

# Configuration JIRA
jira_project_key = os.getenv('JIRA_PROJECT_KEY')

# Configuration BigQuery
bq_dataset_id = os.getenv('BQ_DATASET_ID', 'jira_export')
bq_table_id = os.getenv('BQ_TABLE_ID', 'issues')

required_vars = [
    ('PG_URL_SECRET', pg_url_secret),
    ('JIRA_PROJECT_KEY', jira_project_key),
    ('BQ_DATASET_ID', bq_dataset_id),
    ('BQ_TABLE_ID', bq_table_id),
]

for var_name, var_value in required_vars:
    if not var_value:
        logger.error(f"{var_name} n'est pas défini")
        sys.exit(1)

logger.info(f"Début de l'export JIRA vers BigQuery - Projet: {jira_project_key}")

# try:
# Lire la requête SQL
with open('request.sql', 'r', encoding='utf-8') as f:
    sql_query = f.read()
sql_query


[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mDébut de l'export JIRA vers BigQuery - Projet: DATA[0m


"SELECT \n    i.id,\n    p.pname AS project,\n    p.pkey AS project_code,\n    i.issuenum AS numero,\n    pr.pname AS type_urgence,\n    it.pname AS type_tache,\n    it.pstyle sous_type_tache,\n    i.summary AS resume,\n    iss.pname AS status_tache,\n    u1.lower_user_name AS rapporteur,\n    u2.lower_user_name AS responsable,\n    i.description,\n    i.priority AS priorite,\n    i.created AS create_date,\n    i.updated AS update_date,\n    i.resolutiondate AS resolution_date,\n    i.duedate AS echeance_date,\n    e.etiquettes,\n    c.commentaires,\n    fields.custom_fields,\n    cl.changelog\nFROM \n    jiraissue i\nJOIN \n    project p ON p.id = i.project\nLEFT JOIN \n    issuetype it ON it.id = i.issuetype\nLEFT JOIN \n    issuestatus iss ON iss.id = i.issuestatus\nLEFT JOIN \n    priority pr ON pr.id = i.priority\nLEFT JOIN \n    app_user u1 ON i.reporter = u1.user_key\nLEFT JOIN \n    app_user u2 ON i.assignee = u2.user_key\nLEFT JOIN LATERAL (\n    SELECT jsonb_agg(jsonb_build_o

In [12]:

logger.info("Fichier request.sql chargé")

# Créer la pipeline dlt
logger.info("Initialisation de la pipeline dlt")
pipeline = dlt.pipeline(
    pipeline_name='jira_to_bq',
    destination=dlt.destinations.bigquery(location='EU'),
    dataset_name=bq_dataset_id,
    # dlt crée automatiquement le dataset s'il n'existe pas
)

# Créer la ressource avec custom SQL
@dlt.resource(
    table_name=bq_table_id,
    write_disposition='replace',
    max_table_nesting=0,
    # columns={
    #     'commentaires': {'data_type': 'json'},
    #     'changelog': {'data_type': 'json'},
    #     'custom_fields': {'data_type': 'json'},
    #     'etiquettes': {'data_type': 'json'}
    # }
)
def jira_issues():
    """Récupère les issues JIRA de PostgreSQL."""
    import psycopg2
    
    logger.info(f"Connexion à la base de données pour le projet: {jira_project_key}")
    conn = psycopg2.connect(pg_url_secret)
    
    try:
        with conn.cursor() as cursor:
            logger.info(f"Exécution de la requête pour le projet: {jira_project_key}")
            cursor.execute(sql_query, (jira_project_key,))
            
            # Récupérer les colonnes
            columns = [desc[0] for desc in cursor.description]
            logger.info(f"Colonnes trouvées: {len(columns)}")
            
            # Yielder les données
            row_count = 0
            for row in cursor.fetchall():
                yield dict(zip(columns, row))
                row_count += 1
            
            logger.info(f"Nombre de lignes extraites: {row_count}")
    finally:
        conn.close()

# Exécuter la pipeline
logger.info("Lancement de la pipeline dlt")
load_info = pipeline.run(jira_issues())
# load_info = pipeline.run(jira_issues(), write_disposition="replace")

logger.info("Export terminé avec succès")
 


[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mFichier request.sql chargé[0m
[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mInitialisation de la pipeline dlt[0m
[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mInitialisation de la pipeline dlt[0m
[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mLancement de la pipeline dlt[0m
[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mLancement de la pipeline dlt[0m




[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mConnexion à la base de données pour le projet: DATA[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mExécution de la requête pour le projet: DATA[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mExécution de la requête pour le projet: DATA[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mColonnes trouvées: 21[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mNombre de lignes extraites: 386[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mColonnes trouvées: 21[0m
[1mINFO    [0m | [36m__main__[0m:[36mjira_issues[0m - [1mNombre de lignes extraites: 386[0m




[1mINFO    [0m | [36m__main__[0m:[36m<module>[0m - [1mExport terminé avec succès[0m


In [None]:
   
except FileNotFoundError as e:
    logger.error(f"Fichier request.sql non trouvé: {e}")
    sys.exit(1)
except Exception as e:
    logger.error(f"Erreur lors de l'export: {e}", exc_info=True)
    sys.exit(1)


if __name__ == "__main__":
    load_jira_data()
