In [8]:
import numpy as np
import pandas as pd
import findspark
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [9]:
# Initialisation de la session Spark
spark = SparkSession.builder.appName("example").getOrCreate()

### génération de 3 tables

In [10]:
# Génération des données pour transaction_table
np.random.seed(42)  # Pour reproduire les mêmes résultats
levels = np.random.randint(1, 4, size=100)
source_nodes = np.random.choice(['x', 'B', '8', 'D'], size=100)
cible_nodes = np.random.choice(['B', 'C', 'D', 'E', 'F', 'Q'], size=100)
volumes = np.random.randint(1, 31, size=100)
way_none = ['X>B>D>F' for _ in range(100)]
way_volume = ['070401' for _ in range(100)]
result = np.random.rand(100)
actions = np.random.choice(['Continue', 'Stop'], size=100)

# Création du DataFrame Pandas
transaction_table = pd.DataFrame({
    'level': levels,
    'source_node': source_nodes,
    'cible_node': cible_nodes,
    'volume': volumes,
    'way_none': way_none,
    'way_volume': way_volume,
    'result': result,
    'action': actions
})

# Affichage du DataFrame
transaction_table


Unnamed: 0,level,source_node,cible_node,volume,way_none,way_volume,result,action
0,3,D,E,17,X>B>D>F,070401,0.914864,Continue
1,1,D,B,2,X>B>D>F,070401,0.370159,Stop
2,3,D,Q,1,X>B>D>F,070401,0.015457,Stop
3,3,8,B,16,X>B>D>F,070401,0.928319,Continue
4,1,8,B,30,X>B>D>F,070401,0.428184,Continue
...,...,...,...,...,...,...,...,...
95,1,B,E,12,X>B>D>F,070401,0.703658,Continue
96,1,8,C,28,X>B>D>F,070401,0.474174,Continue
97,3,D,B,30,X>B>D>F,070401,0.097834,Continue
98,1,B,E,29,X>B>D>F,070401,0.491616,Stop


In [11]:
# Génération des données pour node_table avec 100 lignes
np.random.seed(42)  # Pour reproduire les mêmes résultats
cible_nodes_unique_100 = np.unique(cible_nodes)
names_100 = [f'Acteur {i}' for i in range(1, len(cible_nodes_unique_100) + 1)]
notes_100 = np.round(np.random.uniform(0.7, 1.0, len(cible_nodes_unique_100)), 2)

# Répétition des données pour atteindre 100 lignes
cible_nodes_100 = np.repeat(cible_nodes_unique_100, 100 // len(cible_nodes_unique_100))
names_100 = np.repeat(names_100, 100 // len(cible_nodes_unique_100))
notes_100 = np.repeat(notes_100, 100 // len(cible_nodes_unique_100))

# Création du DataFrame Pandas
node_table_100 = pd.DataFrame({
    'cible_node': cible_nodes_100,
    'name': names_100,
    'note': notes_100
})

# Sélection des 30 premières lignes pour créer node_table_30
node_table = node_table_100.head(100)

# Affichage du DataFrame node_table_30
node_table


Unnamed: 0,cible_node,name,note
0,B,Acteur 1,0.81
1,B,Acteur 1,0.81
2,B,Acteur 1,0.81
3,B,Acteur 1,0.81
4,B,Acteur 1,0.81
...,...,...,...
91,Q,Acteur 6,0.75
92,Q,Acteur 6,0.75
93,Q,Acteur 6,0.75
94,Q,Acteur 6,0.75


In [12]:
# Génération des données pour liste_product_table avec 100 lignes
np.random.seed(42)  # Pour reproduire les mêmes résultats
products = np.unique(cible_nodes)  # Utilisation des mêmes produits que cible_nodes
cible_nodes_products_100 = np.repeat(cible_nodes_unique_100, 100 // len(cible_nodes_unique_100))
note_products_100 = np.round(np.random.uniform(0.0, 1.0, len(cible_nodes_unique_100)), 2)

# Répétition des données pour atteindre 100 lignes
products_100 = np.repeat(products, np.ceil(100 / len(products)).astype(int))
cible_nodes_products_100 = np.repeat(cible_nodes_products_100, np.ceil(100 / len(cible_nodes_unique_100)).astype(int))
note_products_100 = np.repeat(note_products_100, np.ceil(100 / len(cible_nodes_unique_100)).astype(int))

# Tronquer les données pour atteindre exactement 100 lignes
products_100 = products_100[:100]
cible_nodes_products_100 = cible_nodes_products_100[:100]
note_products_100 = note_products_100[:100]

# Création du DataFrame Pandas
liste_product_table_100 = pd.DataFrame({
    'product': products_100,
    'cible_node': cible_nodes_products_100,
    'note_product': note_products_100,
    'poids': np.nan  # À remplir plus tard
})

# Sélection des 30 premières lignes pour créer liste_product_table_30
liste_product_table = liste_product_table_100.head(100)

# Affichage du DataFrame liste_product_table_30
liste_product_table


Unnamed: 0,product,cible_node,note_product,poids
0,B,B,0.37,
1,B,B,0.37,
2,B,B,0.37,
3,B,B,0.37,
4,B,B,0.37,
...,...,...,...,...
95,Q,B,0.16,
96,Q,B,0.16,
97,Q,B,0.16,
98,Q,B,0.16,


### Transformation en spark DataFrame

In [13]:
# Définition du schéma pour liste_product_table_100
schema = StructType([
    StructField("product", StringType(), True),
    StructField("cible_node", StringType(), True),
    StructField("note_product", FloatType(), True),
    StructField("poids", FloatType(), True)
])

In [15]:
# Echantillon de 10 lignes pour transaction_table
transaction_table_sample = transaction_table.head(10)

# Echantillon de 10 lignes pour node_table
node_table_sample = node_table.head(10)

# Echantillon de 10 lignes pour liste_product_table
liste_product_table_sample = liste_product_table.head(10)

In [16]:
# Convertir en Spark DataFrame
transaction_table = spark.createDataFrame(transaction_table_sample)
node_table= spark.createDataFrame(node_table_sample)
liste_product_table = spark.createDataFrame(liste_product_table_sample)

In [None]:
# Afficher l'échantillon de transaction_table Spark
transaction_table.show(truncate=False)

*Besoin : Nous souhaitons calculer le poids associé à chaque produit de la chaine d’approvisionnement de l’entreprise X.*

Le poids est calculé en multipliant la valeur de la colonne "result" (de la table "transaction_table") par la valeur de la colonne "note" (de la table "node_table") et par la valeur de la colonne "note_product" (de la table "liste_product_table"). Cette multiplication est conditionnelle sur le fait que la valeur de la colonne "action" est "stop" dans la table "transaction_table".
Formule : 
poids= result(si action="stop" - transaction_table )*Note(node_table)*note_product(liste_product_table)

Question
	Pouvez-vous proposer une modélisation permettant de répondre au besoin ?

In [18]:
# Jointure avec node_table
merged_table_1 = liste_product_table.join(node_table, on='cible_node', how='left')

In [19]:
# Jointure avec transaction_table avec condition sur l'action "Stop"
filtered_table = merged_table_1.join(transaction_table.filter(F.col('action') == 'Stop')
                                .select('cible_node', 'result', 'action'), on='cible_node', how='left')

In [22]:
# Calcul du poids
final_table_spark = filtered_table.withColumn('poids', F.col('result') * F.col('note_product') * F.col('note'))

In [23]:
resultats_entreprise_X = final_table_spark.filter(F.col("cible_node") == "X")

# Ou, de manière équivalente, en utilisant une expression directement dans le DataFrame
#resultats_entreprise_X = final_table_spark.filter("cible_node = 'X'")