In [0]:
jdbcHostname = "server4traffic.database.windows.net"
jdbcDatabase = "datawarehouse04"
jdbcPort = 1433
jdbcUsername = "admin2traffic"
jdbcPassword = "abcd1234@"

In [0]:
# URL JDBC
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

# Paramètres de connexion
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType


In [0]:
dbutils.widgets.text("date_param", "")
date_a_traiter = dbutils.widgets.get("date_param")

In [0]:
# construire la requête SQL avec filtre
query = f"(SELECT * FROM raw.transactions WHERE CAST(timestamp_insertion AS DATE) = '{date_a_traiter}') AS transactions_filtrés"

# lire uniquement les données de la veille
df_transactions = spark.read.jdbc(
    url=jdbcUrl,
    table=query,
    properties=connectionProperties
)

In [0]:
# # Lire une table SQL dans un DataFrame Spark
# df_transactions = spark.read.jdbc(url=jdbcUrl, table="raw.transactions", properties=connectionProperties)

In [0]:
display(df_transactions.limit(10))

store_id,date,heure,transactions,chiffre_affaires
1,2023-04-27,00:00:00,0.0,0.0
1,2023-04-27,01:00:00,0.0,0.0
1,2023-04-27,02:00:00,0.0,0.0
1,2023-04-27,03:00:00,0.0,0.0
1,2023-04-27,04:00:00,0.0,0.0
1,2023-04-27,05:00:00,0.0,0.0
1,2023-04-27,06:00:00,0.0,0.0
1,2023-04-27,07:00:00,0.0,0.0
1,2023-04-27,08:00:00,0.0,0.0
1,2023-04-27,09:00:00,56.0,3478.81


In [0]:
# Compter les lignes où AU MOINS UNE colonne est null
nb_lignes_null = df_transactions.filter(
    F.col("store_id").isNull() |
    F.col("date").isNull() |
    F.col("heure").isNull() |
    F.col("transactions").isNull() |
    F.col("chiffre_affaires").isNull()
).count()

In [0]:
print(f"Nombre de lignes avec au moins un null : {nb_lignes_null}")

Nombre de lignes avec au moins un null : 1209


In [0]:
df_transactions = df_transactions.dropna()


In [0]:
clean_transactions = df_transactions.withColumn(
    "date", F.coalesce(
        F.to_date("date", "yyyy-MM-dd"),
        F.to_date("date", "yyyy/MM/dd"),
        F.to_date("date", "dd/MM/yyyy")
    )
).withColumn(
    "store_id", F.col("store_id").cast("int")
).withColumn(
    "heure", F.hour(F.col("heure").cast(TimestampType()))
).withColumn(
    "nb_transactions", F.col("transactions").cast("int")
).withColumn(
    "chiffre_affaires", F.round(F.col("chiffre_affaires").cast("double"), 2)
).select(
    "store_id", "date", "heure", "nb_transactions", "chiffre_affaires"
)
display(clean_transactions)

store_id,date,heure,nb_transactions,chiffre_affaires
1,2023-04-27,0.0,0,0.0
1,2023-04-27,1.0,0,0.0
1,2023-04-27,2.0,0,0.0
1,2023-04-27,3.0,0,0.0
1,2023-04-27,4.0,0,0.0
1,2023-04-27,5.0,0,0.0
1,2023-04-27,6.0,0,0.0
1,2023-04-27,7.0,0,0.0
1,2023-04-27,8.0,0,0.0
1,2023-04-27,9.0,56,3478.81


In [0]:
clean_transactions.createOrReplaceTempView("clean_transactions")

In [0]:
spark.sql("""
    SELECT 
        *
    FROM clean_transactions
    where nb_transactions> 1000
    ORDER BY nb_transactions
""").show(10)

+--------+----------+-----+---------------+----------------+
|store_id|      date|heure|nb_transactions|chiffre_affaires|
+--------+----------+-----+---------------+----------------+
|       5|2023-05-11|    2|           9999|             0.0|
|       8|2023-05-05|   20|           9999|             0.0|
|       1|2023-05-01|    5|           9999|             0.0|
|       7|2023-05-03|    6|           9999|             0.0|
|      10|2023-05-09|   15|           9999|         1726.42|
|       4|2023-05-01|   10|           9999|         2505.48|
|      10|2023-05-15|   10|           9999|         1002.91|
|       4|2023-05-15|   22|           9999|             0.0|
|       3|2023-05-11|   11|           9999|         3941.03|
|       8|2023-05-04|   18|           9999|         5099.83|
+--------+----------+-----+---------------+----------------+
only showing top 10 rows



In [0]:
# Filtrage des données
clean_transactions = clean_transactions.filter(
    (F.col("store_id") <= 10) &
    (F.col("heure").between(9, 19)) &
    (F.col("date").isNotNull()) &
    (F.col("nb_transactions").between(1, 1000)) &
    (F.col("chiffre_affaires").between(1, 50000))
)
clean_transactions.createOrReplaceTempView("clean_transactions")

display(clean_transactions)

store_id,date,heure,nb_transactions,chiffre_affaires
1,2023-04-27,9,56,3478.81
1,2023-04-27,10,119,7844.15
1,2023-04-27,11,96,6008.11
1,2023-04-27,12,120,7132.01
1,2023-04-27,13,133,9165.67
1,2023-04-27,14,103,6709.42
1,2023-04-27,15,132,8808.94
1,2023-04-27,16,110,7025.82
1,2023-04-27,17,209,13592.59
1,2023-04-27,18,189,12221.38


In [0]:
transactions_manquants = spark.sql("""
    -- table des dates attendues
    WITH heures_attendues AS (
        SELECT explode(sequence(9, 19)) AS heure
    ), 
    dates_attendues AS (
        select d.date from
        (SELECT explode(sequence(DATE '2023-04-27', DATE '2025-04-25', 
        INTERVAL 1 DAY)
        ) AS date) as d
        where dayofweek(d.date) BETWEEN 2 AND 6
    ),
    -- ids des magasins et des capteurs
    identifiants_uniques AS (
        SELECT DISTINCT store_id
        FROM clean_transactions
    ),
    toutes_combinaisons AS (
        SELECT i.store_id, d.date, h.heure
        FROM identifiants_uniques i
        CROSS JOIN dates_attendues d
        CROSS JOIN heures_attendues h
    )

    select t.store_id, t.date,t.heure
    from toutes_combinaisons t
    left join clean_transactions v
    on v.store_id = t.store_id
    and v.date = t.date
    and v.heure = t.heure
    where v.heure is null

""")
transactions_manquants.createOrReplaceTempView("transactions_manquants")
display(transactions_manquants)



store_id,date,heure
1,2023-08-29,15
2,2023-09-21,18
2,2023-10-23,12
1,2024-01-03,11
5,2024-01-09,9
10,2024-08-13,15
7,2025-01-29,15
7,2025-03-18,15
5,2025-03-14,15
10,2023-08-25,10


In [0]:
transactions_manquants = spark.sql("""
    with moyenne_transactions as (
    select v.store_id, 
    v.date, 
    cast(avg(v.nb_transactions) AS int) as nb_transactions,
    cast(avg(v.chiffre_affaires) AS decimal(10,2)) as chiffre_affaires
    from clean_transactions v
    group by v.store_id, v.date
    )
    
    select v.*, mt.nb_transactions, mt.chiffre_affaires
    from transactions_manquants v
    inner join moyenne_transactions mt
    on v.store_id = mt.store_id
    and v.date = mt.date
""")
transactions_manquants.createOrReplaceTempView("transactions_manquants")
display(transactions_manquants)

store_id,date,heure,nb_transactions,chiffre_affaires
5,2023-05-02,16,57,3664.82
8,2023-05-05,12,57,3753.53
10,2023-05-09,15,36,2403.27
4,2023-05-05,14,88,5702.91
7,2023-05-09,16,73,4791.31
1,2023-04-28,16,159,10307.53
8,2023-05-08,13,44,3008.23
2,2023-05-11,16,88,5699.04
2,2023-05-11,17,88,5699.04
6,2023-05-02,11,54,3538.69


In [0]:
clean_transactions = spark.sql("""
    SELECT store_id, date, heure, nb_transactions, chiffre_affaires
    FROM clean_transactions

    UNION ALL

    SELECT store_id, date, heure, nb_transactions, chiffre_affaires
    FROM transactions_manquants
""")
display(transactions_manquants)


store_id,date,heure,nb_transactions,chiffre_affaires
1,2023-08-29,15,127,8308.7
2,2023-09-21,18,83,5518.61
2,2023-10-23,12,68,4508.68
1,2024-01-03,11,156,10224.53
5,2024-01-09,9,67,4421.29
10,2024-08-13,15,37,2380.83
7,2025-01-29,15,90,5932.37
7,2025-03-18,15,67,4336.2
5,2025-03-14,15,72,4708.29
10,2023-08-25,10,40,2594.95


In [0]:
clean_transactions.groupBy("date", "heure", "store_id") \
    .agg(F.count("*").alias("nb")) \
    .filter(F.col("nb") > 1) \
    .show()

+----+-----+--------+---+
|date|heure|store_id| nb|
+----+-----+--------+---+
+----+-----+--------+---+



In [0]:
clean_transactions.count()

57420

In [0]:
clean_transactions = clean_transactions.dropDuplicates(
    ["date", "heure", "store_id"]
)

In [0]:
clean_transactions.count()

57420

In [0]:
clean_transactions.select(F.min(F.col("date")), F.max(F.col("date"))).show()

+----------+----------+
| min(date)| max(date)|
+----------+----------+
|2023-04-27|2025-04-25|
+----------+----------+



In [0]:
try:
    clean_transactions.write.jdbc(
        url=jdbcUrl,
        table="analytics.transactions",
        mode="append",  # ou "overwrite" selon usage
        properties=connectionProperties
    )
    print("Données insérées avec succès dans analytics.transactions.")
except Exception as e:
    print("Erreur lors de l'insertion dans la base SQL :")
    print(str(e))