In [1]:
# !pip install mysql-connector-python
# !pip install pyspark

In [2]:
import mysql.connector
from pyspark.sql import SparkSession
import csv
import random
from pyspark.sql.functions import lit, col, when

In [3]:
# mydb = mysql.connector.connect(
#   host="localhost",
#   user="root",
#   password="",
#   database="spark_train_one"
# )


In [4]:
class ToDictMixin:
    def to_dict(self):
        return {k: v for k, v in vars(self).items() if not k.startswith("__")}



In [5]:
class Armor(ToDictMixin):
    
    id_counter = 1
    
    def __init__(self, name: str, armor: int = 1, armor_magik: int = 0, mana: int = 0,
                 durability: int = 5, max_durability: int = None):
        self.id = Armor.id_counter
        Armor.id_counter += 1
        self.name = name
        self.armor = armor
        self.armor_magik = armor_magik
        self.mana = mana
        self.durability = durability
        self.max_durability = max_durability if max_durability is not None else durability
        self.broken = False

In [6]:
class Weapon(ToDictMixin):
    
    id_counter = 1
    
    def __init__(self, name: str, atk: int = 1, atk_magik: int = 0, mana: int = 0,
                 mana_cost: int = 1, durability: int = 5, max_durability: int = None):
        self.id = Weapon.id_counter
        Weapon.id_counter += 1
        self.name = name
        self.atk = atk
        self.atk_magik = atk_magik
        self.mana = mana
        self.mana_cost = mana_cost
        self.durability = durability
        self.max_durability = max_durability if max_durability is not None else durability
        self.broken = False

In [7]:
class Hero(ToDictMixin):
    
    id_counter = 1
    
    
    
    def __init__(self, name: str, pv: int, max_pv: int = None, atk: int = None, atk_magik: int = None, 
                 mana: int = None, max_mana: int = None, initiative: int = None, experience: int = None, 
                 experience_to_level_up: int = None, level: int = None, speed: int = None, origine_speed: int = None, weapon_id: int = None, armor_id: int = None):
        self.id = Hero.id_counter
        Hero.id_counter += 1
        self.name = name
        self.pv = pv
        self.max_pv = max_pv if max_pv is not None else pv
        self.atk = atk if atk is not None else 0
        self.atk_magik = atk_magik if atk_magik is not None else 0
        self.mana = mana if mana is not None else 0
        self.max_mana = max_mana if max_mana is not None else mana
        self.initiative = initiative if initiative is not None else 0
        self.experience = experience if experience is not None else 0
        self.experience_to_level_up = experience_to_level_up if experience_to_level_up is not None else 100
        self.level = level if level is not None else 1
        self.speed = speed if speed is not None else 0
        self.origine_speed = origine_speed if origine_speed is not None else 0
        self.items = []
        self.weapon_id : Weapon | None = weapon_id
        self.armor_id : Armor | None = armor_id
    
    
    def equip_weapon(self, weapon):
        if isinstance(weapon, Weapon) :
            self.weapon_id = weapon.id
            return True
        return False
        
    def equip_armor(self, armor):
        if isinstance(armor, Armor) :
            self.armor_id = armor.id
            return True
        return False
    
    # def add_item(self, item):
    #     if isinstance(item, Item) :
    #         self.items.append(item.id)
    #         return True
    #     return False

In [8]:
noms_personnages = [
    "Aria", "Balthazar", "Cassandra", "Darian", "Elena", "Felix", "Gabrielle", "Hadrien",
    "Isadora", "Jasper", "Kaïra", "Landon", "Maïlys", "Nolan", "Olivia", "Philemon",
    "Quintus", "Raphael", "Saoirse", "Thaddeus", "Ursula", "Valentin", "Wilhelm", "Xander",
    "Yara", "Zander", "Aidan", "Brianna", "Caleb", "Dahlia", "Evelyn", "Finnegan", "Gwendolyn",
    "Harvey", "Imogen", "Jaden", "Kendra", "Liam", "Matilda", "Niamh", "Ophelia", "Percival",
    "Qiana", "Riley", "Sebastian", "Tristan", "Uriel", "Violet", "Wyatt", "Xiomara", "Yvette", "Zelda"
]
noms_armors = [
    "Armure de cuir", "Armure de fer", "Armure de plaque", "Armure de mailles", "Armure de glace",
    "Armure d'os", "Armure d'écailles", "Armure de dragon", "Armure de lumière", "Armure de ténèbres",
    "Armure de cristal", "Armure de mithril", "Armure de diamant", "Armure de feu", "Armure de foudre",
    "Armure de glace", "Armure de vent", "Armure de l'eau", "Armure de terre", "Armure de roche",
    "Armure de bois", "Armure de métal", "Armure de magma", "Armure de néant", "Armure de luminescence",
    "Armure de brume", "Armure de tempête", "Armure de mort", "Armure de vie", "Armure de guérison",
    "Armure de force", "Armure de sagesse", "Armure de chance", "Armure de la nature", "Armure des anges",
    "Armure des démons", "Armure des héros", "Armure des rois", "Armure des elfes", "Armure des nains",
    "Armure des gnomes", "Armure des orcs", "Armure des gobelins", "Armure des trolls", "Armure des géants",
    "Armure de lave", "Armure de lumière divine", "Armure de l'ombre", "Armure de l'océan", "Armure des vents",
    "Armure des montagnes"
]
noms_weapons = [
    "Épée de cristal", "Hache de guerre", "Bâton de feu", "Dague empoisonnée", "Arc long",
    "Masse d'armes", "Lance d'argent", "Baguette magique", "Gourdin", "Katana",
    "Fouet de combat", "Fleuret du duelliste", "Marteau de guerre", "Glaive", "Rapière",
    "Arbalète", "Hallebarde", "Gantelet de force", "Canne de mage", "Livre de sortilèges",
    "Fléau", "Arc court", "Cimeterre", "Couteau de lancer", "Poignard",
    "Épée bâtarde", "Hache double", "Bâton de glace", "Dague runique", "Arc de chasse",
    "Marteau de Thor", "Lance de la foudre", "Baguette d'illusion", "Gourdin épineux", "Wakizashi",
    "Fouet barbelé", "Épée courte", "Marteau-piqueur", "Glaive du serpent", "Rapière en argent",
    "Arbalète à répétition", "Hallebarde du jugement", "Gantelet du titan", "Canne à feu", "Livre de connaissances",
    "Fléau des ténèbres", "Arc de la lune noire", "Cimeterre démoniaque", "Couteau de jet empoisonné", "Poignard de l'ombre",
]

In [9]:
# Génération de 50 personnages, armors et weapons aléatoires
hero_list = []
armor_list = []
weapon_list = []
for i in range(50):
    hero = Hero(name=noms_personnages[i], pv=random.randint(50, 200), atk=random.randint(1, 50), atk_magik=random.randint(1, 50),
                mana=random.randint(1, 100), initiative=random.randint(1, 100), experience=random.randint(1, 1000),
                experience_to_level_up=random.randint(100, 1000), level=random.randint(1, 10), speed=random.randint(1, 100),
                origine_speed=random.randint(1, 100), weapon_id=random.randint(1, 50), armor_id=random.randint(1, 50))
    hero_list.append(hero)
    
    armor = Armor(name=noms_armors[i], armor=random.randint(1, 10), armor_magik=random.randint(1, 5),
                  mana=random.randint(1, 5), durability=random.randint(1, 10),
                  max_durability=random.randint(10, 20))
    armor_list.append(armor)

    weapon = Weapon(name=noms_weapons[i], atk=random.randint(1, 10), atk_magik=random.randint(1, 5),
                    mana=random.randint(1, 5), mana_cost=random.randint(1, 5),
                    durability=random.randint(1, 10), max_durability=random.randint(10, 20))
    weapon_list.append(weapon)
print(hero_list[25].to_dict())

hero_attribute_names = list(hero_list[0].to_dict().keys())
armor_attribute_names = list(armor_list[0].to_dict().keys())
weapon_attribute_names = list(weapon_list[0].to_dict().keys())
hero_list[25].equip_weapon(weapon_list[25])
print(hero_attribute_names)
print(armor_attribute_names)
print(weapon_attribute_names)

{'id': 26, 'name': 'Zander', 'pv': 114, 'max_pv': 114, 'atk': 50, 'atk_magik': 1, 'mana': 93, 'max_mana': 93, 'initiative': 99, 'experience': 93, 'experience_to_level_up': 532, 'level': 2, 'speed': 13, 'origine_speed': 49, 'items': [], 'weapon_id': 7, 'armor_id': 8}
['id', 'name', 'pv', 'max_pv', 'atk', 'atk_magik', 'mana', 'max_mana', 'initiative', 'experience', 'experience_to_level_up', 'level', 'speed', 'origine_speed', 'items', 'weapon_id', 'armor_id']
['id', 'name', 'armor', 'armor_magik', 'mana', 'durability', 'max_durability', 'broken']
['id', 'name', 'atk', 'atk_magik', 'mana', 'mana_cost', 'durability', 'max_durability', 'broken']


In [10]:
# Écriture de la table dans un fichier CSV
def csv_geenerator(path, items_list):
    with open(path, mode="w", newline='') as file:
        fieldnames = list(items_list[0].to_dict().keys())
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        for item in items_list:
            writer.writerow({k: v.encode('utf-8').decode('ascii', 'ignore') if isinstance(v, str) else v for k, v in item.to_dict().items()})



In [11]:
csv_geenerator("assets/csv/hero.csv", hero_list)
csv_geenerator("assets/csv/armor.csv", armor_list)
csv_geenerator("assets/csv/weapon.csv", weapon_list)

In [12]:
# mycursor = mydb.cursor()

# mycursor.execute("SELECT * FROM yourtable")

# myresult = mycursor.fetchall()

# for x in myresult:
#   print(x)


spark = SparkSession.builder.appName('Create_Hero').getOrCreate()

df_hero = spark.read.csv("assets/csv/hero.csv", header=True, inferSchema=True)
df_armor = spark.read.csv("assets/csv/armor.csv", header=True, inferSchema=True)
df_weapon = spark.read.csv("assets/csv/weapon.csv", header=True, inferSchema=True)



In [13]:
# Affichage des données du DataFrame
df_hero.show()
df_armor.show()
df_weapon.show()
# Création d'une vue temporaire à partir du DataFrame
df_hero.createOrReplaceTempView("hero")
df_armor.createOrReplaceTempView("armor")
df_weapon.createOrReplaceTempView("weapon")


+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      12|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|      27|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|      28|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [14]:
print(spark.catalog.listTables())

[Table(name='armor', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='hero', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='weapon', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [15]:

# Exécution d'une requête SQL pour sélectionner les héros avec plus de 100 points de vie
result = spark.sql("SELECT * FROM hero WHERE pv > 100")

# Affichage des résultats
result.show(n=500, truncate=False)

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|id |name     |pv |max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|4  |Darian   |127|127   |16 |27       |7   |7       |15        |505       |979                   |9    |91   |31           |[]   |1        |19      |
|5  |Elena    |200|200   |25 |6        |49  |49      |66        |484       |902                   |3    |20   |44           |[]   |32       |2       |
|6  |Felix    |150|150   |30 |47       |35  |35      |68        |347       |709                   |6    |94   |39           |[]   |48       |33      |
|9  |Isadora  |188|188   |30 |13       |59  |59      |40        |724       |462               

In [16]:
for table in spark.catalog.listTables():
    print(table)

Table(name='armor', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)
Table(name='hero', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)
Table(name='weapon', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)


In [17]:
weapon_column = spark.sql(f'''SELECT weapon_id FROM hero
                             WHERE weapon_id IS NOT NULL
                          ''').collect()

for row in weapon_column:
    print(row[0])

21
32
1
1
32
48
21
45
21
36
41
2
26
40
10
11
48
47
47
7
14
50
15
17
17
26
12
21
46
15
34
26
18
5
12
44
11
49
29
31
49
47
33
35
16
49
30
34
5
1


In [18]:

spark.sql(f'''SELECT * FROM weapon 
            WHERE weapon.name LIKE '%btarde%'
          ''').show()

spark.sql(f'''SELECT * FROM weapon 
            WHERE weapon.id = 26
          ''').show()

spark.sql(f'''SELECT * FROM hero 
            JOIN weapon on hero.weapon_id = weapon.id
          ''').show()

spark.sql(f'''SELECT weapon.* FROM hero 
            JOIN weapon on hero.weapon_id = weapon.id
          ''').show()

+---+---------+---+---------+----+---------+----------+--------------+------+
| id|     name|atk|atk_magik|mana|mana_cost|durability|max_durability|broken|
+---+---------+---+---------+----+---------+----------+--------------+------+
| 26|pe btarde| 10|        1|   1|        4|         3|            19| false|
+---+---------+---+---------+----+---------+----------+--------------+------+

+---+---------+---+---------+----+---------+----------+--------------+------+
| id|     name|atk|atk_magik|mana|mana_cost|durability|max_durability|broken|
+---+---------+---+---------+----+---------+----------+--------------+------+
| 26|pe btarde| 10|        1|   1|        4|         3|            19| false|
+---+---------+---+---------+----+---------+----------+--------------+------+

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+---+--------------------+---+---------+----+---------+----------+---

In [19]:
spark.sql(f'''SELECT * 
            FROM hero 
            JOIN weapon ON hero.weapon_id = weapon.id
            WHERE hero.weapon_id IS NOT NULL
          ''').show()

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+---+--------------------+---+---------+----+---------+----------+--------------+------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id| id|                name|atk|atk_magik|mana|mana_cost|durability|max_durability|broken|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+---+--------------------+---+---------+----+---------+----------+--------------+------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      12| 21|                Flau| 10|        2|   3|        1|         1|            18| false|
|  2|Balthazar| 62|    62| 36|       44|  92|   

In [20]:
weapon_cols = [f"weapon.{col} as weapon_{col}" for col in  spark.table("weapon").columns]
weapon_cols_str = ", ".join(weapon_cols)
weapon_cols_str



'weapon.id as weapon_id, weapon.name as weapon_name, weapon.atk as weapon_atk, weapon.atk_magik as weapon_atk_magik, weapon.mana as weapon_mana, weapon.mana_cost as weapon_mana_cost, weapon.durability as weapon_durability, weapon.max_durability as weapon_max_durability, weapon.broken as weapon_broken'

In [21]:

query = f"""
    SELECT hero.*, {weapon_cols_str}
    FROM hero
    JOIN weapon ON hero.weapon_id = weapon.id
    WHERE hero.weapon_id IS NOT NULL
"""

print(query)
spark.sql('''
    SELECT hero.*, weapon.id as weapon_id, weapon.name as weapon_name
    FROM hero
    JOIN weapon ON hero.weapon_id = weapon.id
    WHERE hero.weapon_id IS NOT NULL
''').show()


    SELECT hero.*, weapon.id as weapon_id, weapon.name as weapon_name, weapon.atk as weapon_atk, weapon.atk_magik as weapon_atk_magik, weapon.mana as weapon_mana, weapon.mana_cost as weapon_mana_cost, weapon.durability as weapon_durability, weapon.max_durability as weapon_max_durability, weapon.broken as weapon_broken
    FROM hero
    JOIN weapon ON hero.weapon_id = weapon.id
    WHERE hero.weapon_id IS NOT NULL

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+---------+--------------------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|weapon_id|         weapon_name|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+---------+--------------------+
|  1|     Aria| 75|    75| 30|     

In [22]:
#####################################
### On spark update doesn't work ###
###################################


# df_hero.createOrReplaceTempView("temp_hero")
# spark.sql(f'''SELECT * 
#             FROM temp_hero
#             WHERE armor IS NULL OR armor = 42
#           ''').show()


# # Perform the update using SQL
# spark.sql("""
#     UPDATE temp_hero
#     SET armor = 42
#     WHERE armor IS NULL
# """)

# # Overwrite the original table with the updated data
# spark.sql("""
#     INSERT OVERWRITE TABLE hero
#     SELECT * FROM temp_hero
# """)

In [23]:
def armor_update_table(updated_df):
    # Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated"
    updated_df = updated_df.withColumn('armor_id', col('armor_updated'))
    # Supprimer la colonne "armor_updated"
    updated_df = updated_df.drop('armor_updated')
    
    return updated_df



# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement
updated_df = df_hero.withColumn('armor_updated', when(col('id') == 18, 18).otherwise(col('armor_id')))


# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated" 
df_hero = armor_update_table(updated_df)

# Afficher le DataFrame mis à jour
df_hero.show()

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      12|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|      27|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|      28|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [24]:
# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement 
updated_df = df_hero.withColumn('armor_updated', when((col('id') < 15) | (col('id') > 20), 14).otherwise(col('armor_id')))

# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated" 
df_hero = armor_update_table(updated_df)

# Afficher le DataFrame mis à jour
df_hero.show(n=50)

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      14|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|      14|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|      14|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [25]:
# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement entre 10 et 15 remplacer par 11
updated_df = df_hero.withColumn('armor_updated', when(col('id').between(10, 15), 11).otherwise(col('armor_id')))


# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated" 
df_hero = armor_update_table(updated_df)

# Afficher le DataFrame mis à jour
df_hero.show()



+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      14|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|      14|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|      14|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [26]:
# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement si hero.armor = 14 et armor.id inferieur a 5
updated_df = df_hero.withColumn('armor_updated', when((col('armor_id') == 14) & (col('id') < 5), 7).otherwise(col('armor_id')))


# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated" 
df_hero = armor_update_table(updated_df)

# Afficher le DataFrame mis à jour
df_hero.show()

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|       7|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|       7|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|       7|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [27]:
# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement
updated_df = df_hero.withColumn('armor_updated', when((col('armor_id') == 18) , 'null' ).otherwise(col('armor_id')))


# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated" 
df_hero = armor_update_table(updated_df)

# Afficher le DataFrame mis à jour
df_hero.show(n=50)

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|       7|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|       7|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|       7|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [28]:
spark.sql(f'''SELECT * 
            FROM hero  
          ''').show()

df_hero.createOrReplaceTempView("hero")

spark.sql(f'''SELECT * 
            FROM hero  
          ''').show()

+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|     name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+---------+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
|  1|     Aria| 75|    75| 30|       32|  58|      58|        32|       700|                   742|    1|   39|           69|   []|       21|      12|
|  2|Balthazar| 62|    62| 36|       44|  92|      92|        97|       762|                   383|    3|   40|           55|   []|       32|      27|
|  3|Cassandra| 98|    98| 13|       14|  61|      61|        88|       896|                   442|    5|   34|           59|   []|        1|      28|
|  4|   Darian|127|   127| 16|       27|   7|       7|        15|       505|                  

In [29]:
spark.sql(f'''SELECT * 
            FROM hero
            WHERE armor_id IS NULL
          ''').show()

+---+----+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
| id|name| pv|max_pv|atk|atk_magik|mana|max_mana|initiative|experience|experience_to_level_up|level|speed|origine_speed|items|weapon_id|armor_id|
+---+----+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+
+---+----+---+------+---+---------+----+--------+----------+----------+----------------------+-----+-----+-------------+-----+---------+--------+



In [30]:
class Request_SQL:
    
    DFs = {}
    spark = SparkSession.builder.appName('Create_Hero').getOrCreate()

    def __init__(self, spark = "temp_spark", files = None, ):
        self.spark = SparkSession.builder.appName(spark).getOrCreate()
        self.read_files(files)
        
    def set_DFs(self, file_name):
        # print(self.get_name_file(file_name))
        if "csv" in file_name.split('.')[-1:]:
            self.DFs[self.get_name_file(file_name)] = spark.read.csv(f"assets/csv/{self.get_name_file(file_name)}.csv", header=True, inferSchema=True)
    
    def read_files(self, files): 
        
        if files is not None:            
            if isinstance(files, list):
                for key, val in enumerate(files):                 
                    self.set_DFs(val)
            else:                            
                self.set_DFs(files)                   
    
    
    #############################
    ########### UTILS ###########
    #############################
    def get_name_file(self, file):
        return file.split('.')[-2:-1:][0].split('/')[-1:][0]
    
    def print_DF(self, name):
        print(self.DFs[name], "\n")
                    
    def print_DFs(self):
        for key,  df in self.DFs.items():
            print(df, "\n")
    
    def print_names_DFs(self):
        for df in self.DFs:
            print(df, "\n")
            
    def print_table(self, name):
        print(self.spark.table(name), "\n")
                    
    def print_tables(self):
        for table in spark.catalog.listTables():
            print(table, "\n")
            

                    
    # def select(self, columns = '*', table, ):

In [31]:
# "assets/csv/hero.csv".split('.')[-2:-1:][0].split('/')[-1:][0]

In [32]:
request = Request_SQL(spark = "temp_hero_test", files = ["hero.csv", "armor.csv", "weapon.csv"])

request.print_DFs()
print("\n")
request.print_DF("hero")
request.print_tables()
request.print_table("hero")
request.print_names_DFs()

DataFrame[id: int, name: string, pv: int, max_pv: int, atk: int, atk_magik: int, mana: int, max_mana: int, initiative: int, experience: int, experience_to_level_up: int, level: int, speed: int, origine_speed: int, items: string, weapon_id: int, armor_id: int] 

DataFrame[id: int, name: string, armor: int, armor_magik: int, mana: int, durability: int, max_durability: int, broken: boolean] 

DataFrame[id: int, name: string, atk: int, atk_magik: int, mana: int, mana_cost: int, durability: int, max_durability: int, broken: boolean] 



DataFrame[id: int, name: string, pv: int, max_pv: int, atk: int, atk_magik: int, mana: int, max_mana: int, initiative: int, experience: int, experience_to_level_up: int, level: int, speed: int, origine_speed: int, items: string, weapon_id: int, armor_id: int] 

Table(name='armor', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True) 

Table(name='hero', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isT

In [33]:
request.print_names_DFs()

hero 

armor 

weapon 



In [34]:
request.spark.sql('''
                SELECT armor.id as armor_id, armor.name as armor_name, hero.id as hero_id, hero.name as hero_name, weapon.id as weapon_id, weapon.name as weapon_name
                FROM hero
                JOIN armor on hero.id = armor.id
                JOIN weapon on armor.id = weapon.id
''').show()

+--------+-----------------+-------+---------+---------+--------------------+
|armor_id|       armor_name|hero_id|hero_name|weapon_id|         weapon_name|
+--------+-----------------+-------+---------+---------+--------------------+
|       1|   Armure de cuir|      1|     Aria|        1|       pe de cristal|
|       2|    Armure de fer|      2|Balthazar|        2|     Hache de guerre|
|       3| Armure de plaque|      3|Cassandra|        3|         Bton de feu|
|       4|Armure de mailles|      4|   Darian|        4|    Dague empoisonne|
|       5|  Armure de glace|      5|    Elena|        5|            Arc long|
|       6|      Armure d'os|      6|    Felix|        6|       Masse d'armes|
|       7| Armure d'cailles|      7|Gabrielle|        7|      Lance d'argent|
|       8| Armure de dragon|      8|  Hadrien|        8|    Baguette magique|
|       9| Armure de lumire|      9|  Isadora|        9|             Gourdin|
|      10| Armure de tnbres|     10|   Jasper|       10|        

In [35]:
# Créer une nouvelle colonne avec la valeur mise à jour conditionnellement
updated_df = df_hero.withColumn('armor_updated', when((col('armor_id') == 6) , 'null' ).otherwise(col('armor_id')))


# Mettre à jour la colonne "armor" avec les valeurs de la colonne "armor_updated" et Supprimer la colonne "armor_updated"
df_hero = armor_update_table(updated_df)
df_hero.createOrReplaceTempView("hero")
request.spark.sql('''
                SELECT armor.id as armor_id, armor.name as armor_name, hero.id as hero_id, hero.name as hero_name
                FROM hero
                JOIN armor on hero.armor_id = armor.id
''').show()


+--------+-----------------+-------+---------+
|armor_id|       armor_name|hero_id|hero_name|
+--------+-----------------+-------+---------+
|       7| Armure d'cailles|      1|     Aria|
|       7| Armure d'cailles|      2|Balthazar|
|       7| Armure d'cailles|      3|Cassandra|
|       7| Armure d'cailles|      4|   Darian|
|      14|    Armure de feu|      5|    Elena|
|      14|    Armure de feu|      6|    Felix|
|      14|    Armure de feu|      7|Gabrielle|
|      14|    Armure de feu|      8|  Hadrien|
|      14|    Armure de feu|      9|  Isadora|
|      11|Armure de cristal|     10|   Jasper|
|      11|Armure de cristal|     11|     Kara|
|      11|Armure de cristal|     12|   Landon|
|      11|Armure de cristal|     13|    Malys|
|      11|Armure de cristal|     14|    Nolan|
|      11|Armure de cristal|     15|   Olivia|
|      45| Armure des gants|     16| Philemon|
|      26|  Armure de brume|     17|  Quintus|
|      44|Armure des trolls|     19|  Saoirse|
|      50| Ar

In [36]:
def generate_join_clause(column_names):
    table_aliases = set()
    join_clause = ''

    for column_name in column_names:
        if '.' in column_name:
            table_name, _ = column_name.split('.', 1)
            table_aliases.add(table_name)

    for table_alias in table_aliases:
        join_clause += f'JOIN {table_alias} ON hero.{table_alias}_id = {table_alias}.{table_alias}_id '

    return join_clause


In [37]:
column_names = ['armor_id', 'armor_name', 'armor_armor', 'armor_armor_magik', 'armor_mana', 'armor_durability', 'armor_max_durability', 'armor_broken', 'hero_id', 'hero_name', 'hero_pv', 'hero_max_pv', 'hero_atk', 'hero_atk_magik', 'hero_mana', 'hero_max_mana', 'hero_initiative', 'hero_experience', 'hero_experience_to_level_up', 'hero_level', 'hero_speed', 'hero_origine_speed', 'hero_items', 'hero_weapon_id', 'hero_armor_id', 'weapon_id', 'weapon_name', 'weapon_atk', 'weapon_atk_magik', 'weapon_mana', 'weapon_mana_cost', 'weapon_durability', 'weapon_max_durability', 'weapon_broken']

join_clause = generate_join_clause(column_names)
print(join_clause)





In [38]:
import requests

url = "https://raw.githubusercontent.com/MicrosoftLearning/dp-203-azure-data-engineer/master/Allfiles/labs/23/adventureworks/products.csv"
local_file_path = "products.csv"

# Télécharger le fichier CSV depuis l'URL
response = requests.get(url)
response.raise_for_status()

# Enregistrer le contenu dans un fichier local
with open(local_file_path, "wb") as file:
    file.write(response.content)

print("Le fichier CSV a été téléchargé et enregistré avec succès.")


Le fichier CSV a été téléchargé et enregistré avec succès.


In [39]:
request.spark.sql(f'''SELECT armor.id as armor_id, armor.name as armor_name, hero.id as hero_id, hero.name as hero_name, hero.weapon_id as hero_weapon_id
                FROM hero
                JOIN weapon on hero.weapon_id = weapon.id JOIN armor on hero.armor_id = armor.id ''').show()

+--------+-----------------+-------+---------+--------------+
|armor_id|       armor_name|hero_id|hero_name|hero_weapon_id|
+--------+-----------------+-------+---------+--------------+
|       7| Armure d'cailles|      1|     Aria|            21|
|       7| Armure d'cailles|      2|Balthazar|            32|
|       7| Armure d'cailles|      3|Cassandra|             1|
|       7| Armure d'cailles|      4|   Darian|             1|
|      14|    Armure de feu|      5|    Elena|            32|
|      14|    Armure de feu|      6|    Felix|            48|
|      14|    Armure de feu|      7|Gabrielle|            21|
|      14|    Armure de feu|      8|  Hadrien|            45|
|      14|    Armure de feu|      9|  Isadora|            21|
|      11|Armure de cristal|     10|   Jasper|            36|
|      11|Armure de cristal|     11|     Kara|            41|
|      11|Armure de cristal|     12|   Landon|             2|
|      11|Armure de cristal|     13|    Malys|            26|
|      1