In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, concat_ws, first

In [2]:
# Création Session Spark
spark = (SparkSession.builder
         .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
         .getOrCreate())

In [3]:
activity = spark.read.csv("hdfs://namenode:9000/data/activity.csv", header=True, inferSchema=True)
code = spark.read.csv("hdfs://namenode:9000/data/code.csv", header=True, inferSchema=True)
denomination = spark.read.csv("hdfs://namenode:9000/data/denomination.csv", header=True, inferSchema=True)
address = spark.read.csv("hdfs://namenode:9000/data/address.csv", header=True, inferSchema=True)

activity.show(5)
code.show(5)
denomination.show(5)
address.show(5)

+------------+-------------+-----------+--------+--------------+
|EntityNumber|ActivityGroup|NaceVersion|NaceCode|Classification|
+------------+-------------+-----------+--------+--------------+
|0200.065.765|            6|       2008|   84130|          MAIN|
|0200.065.765|            1|       2008|   41101|          MAIN|
|0200.065.765|            1|       2003|   70111|          MAIN|
|0200.068.636|            6|       2008|   36000|          MAIN|
|0200.068.636|            1|       2008|   36000|          MAIN|
+------------+-------------+-----------+--------+--------------+
only showing top 5 rows

+-------------+----+--------+-----------------+
|     Category|Code|Language|      Description|
+-------------+----+--------+-----------------+
|ActivityGroup| 001|      FR|    Activités TVA|
|ActivityGroup| 001|      NL| BTW-activiteiten|
|ActivityGroup| 002|      FR|   Activités EDRL|
|ActivityGroup| 002|      NL|EDRL-activiteiten|
|ActivityGroup| 003|      FR|        Activités|
+-----

In [4]:
# Filtre du code pour Nace2008 en Français
code2008fr = code.filter(
    (col("Category") == "Nace2008") &
    (col("Language") == "FR")
).select("Code", "Description")

# Filtre uniquement des activités 2008
activity2008 = activity.filter(col("NaceVersion") == 2008)

# Joindre activity2008 avec code2008fr pour obtenir la description NACE
activity_desc = activity2008.join(
    code2008fr,
    activity2008.NaceCode == code2008fr.Code,
    how="left"
).drop(code2008fr.Code)

activity_desc.show(5)

+------------+-------------+-----------+--------+--------------+--------------------+
|EntityNumber|ActivityGroup|NaceVersion|NaceCode|Classification|         Description|
+------------+-------------+-----------+--------+--------------+--------------------+
|0200.065.765|            6|       2008|   84130|          MAIN|Administration pu...|
|0200.065.765|            1|       2008|   41101|          MAIN|Promotion immobil...|
|0200.068.636|            6|       2008|   36000|          MAIN|Captage, traiteme...|
|0200.068.636|            1|       2008|   36000|          MAIN|Captage, traiteme...|
|0200.068.636|            1|       2008|   93110|          SECO|Gestion d'install...|
+------------+-------------+-----------+--------+--------------+--------------------+
only showing top 5 rows



In [5]:
# Filtrer pour récupérer le nom de l'entreprise typeofdenomination 1 FR.

denomination_fr = denomination.filter(
    (col("TypeOfDenomination") == 1) &
    (col("Language") == 1)
).select("EntityNumber", "Denomination")

denomination_fr.show(5)


+------------+--------------------+
|EntityNumber|        Denomination|
+------------+--------------------+
|0200.362.210|in BW Association...|
|0200.362.408|Intercommunale So...|
|0201.105.843|  """I.D.E.A. S.C"""|
|0201.107.526|Association Inter...|
|0201.107.922|INTERCOMMUNALE DU...|
+------------+--------------------+
only showing top 5 rows



In [6]:
# Joindre activity_desc avec denomination
activity_with_name = activity_desc.join(
    denomination_fr,
    "EntityNumber",
    how="left"
)

activity_with_name.show(5)

+------------+-------------+-----------+--------+--------------+--------------------+------------+
|EntityNumber|ActivityGroup|NaceVersion|NaceCode|Classification|         Description|Denomination|
+------------+-------------+-----------+--------+--------------+--------------------+------------+
|0798.255.362|            1|       2008|   47713|          MAIN|Commerce de détai...|        NULL|
|0200.068.636|            6|       2008|   36000|          MAIN|Captage, traiteme...|        NULL|
|0200.068.636|            1|       2008|   36000|          MAIN|Captage, traiteme...|        NULL|
|0200.068.636|            1|       2008|   93110|          SECO|Gestion d'install...|        NULL|
|0200.068.636|            1|       2008|   93126|          SECO|Activités de club...|        NULL|
+------------+-------------+-----------+--------+--------------+--------------------+------------+
only showing top 5 rows



In [7]:
# Filtre de l'address pour TypeOfAddress='REGO'
address_rego = address.filter(
    col("TypeOfAddress") == "REGO"
).select("EntityNumber", "MunicipalityFR", "Zipcode")

address_rego.show(5)

+------------+-------------------+-------+
|EntityNumber|     MunicipalityFR|Zipcode|
+------------+-------------------+-------+
|0200.065.765|       Destelbergen|   9070|
|0200.068.636|               Gent|   9000|
|0200.171.970|               Gent|   9000|
|0200.245.711|     Geraardsbergen|   9500|
|0200.305.493|Sint-Lievens-Houtem|   9520|
+------------+-------------------+-------+
only showing top 5 rows



In [8]:
# Joindre activity_with_name avec address_rego
df = activity_with_name.join(
    address_rego,
    "EntityNumber",
    how="left"
)

df.show(5)

+------------+-------------+-----------+--------+--------------+--------------------+------------+--------------+-------+
|EntityNumber|ActivityGroup|NaceVersion|NaceCode|Classification|         Description|Denomination|MunicipalityFR|Zipcode|
+------------+-------------+-----------+--------+--------------+--------------------+------------+--------------+-------+
|0798.255.362|            1|       2008|   47713|          MAIN|Commerce de détai...|        NULL|          NULL|   NULL|
|0200.068.636|            6|       2008|   36000|          MAIN|Captage, traiteme...|        NULL|          Gent|   9000|
|0200.068.636|            1|       2008|   36000|          MAIN|Captage, traiteme...|        NULL|          Gent|   9000|
|0200.068.636|            1|       2008|   93110|          SECO|Gestion d'install...|        NULL|          Gent|   9000|
|0200.068.636|            1|       2008|   93126|          SECO|Activités de club...|        NULL|          Gent|   9000|
+------------+----------

In [None]:
final_df = df.groupBy("EntityNumber").agg(
    first("Denomination").alias("EnterpriseName"),
    concat_ws(";", collect_list("Description")).alias("Activities"),
    first("MunicipalityFR").alias("Municipality"),
    first("Zipcode").alias("Zipcode")
)

final_df.show(5)

# Réduire le nombre de partitions à 1
final_single = final_df.coalesce(1)

# Sauvegarder fichier CSV
final_single.write.csv("/data/output/result.csv", header=True)

+------------+--------------------+--------------------+------------+-------+
|EntityNumber|      EnterpriseName|          Activities|Municipality|Zipcode|
+------------+--------------------+--------------------+------------+-------+
|0201.105.843|  """I.D.E.A. S.C"""|Administration pu...|        Mons|   7000|
|0201.311.226|                NULL|Distribution d'él...|     Hasselt|   3500|
|0202.554.608|Association Inter...|Captage, traiteme...|      Mettet|   5640|
|0205.157.176|                NULL|Construction de r...|     Torhout|   8820|
|0205.797.475|Association Inter...|Administration pu...|       Arlon|   6700|
+------------+--------------------+--------------------+------------+-------+
only showing top 5 rows

