In [16]:
import requests as rq
import os
import uuid
import json
import random
from datetime import datetime, date, time
from random_word import RandomWords


In [17]:
DEFAULT_FIELD_TYPES = [
    {"label": "Simple Text", "value": "simple-text"},
    {"label": "Long Text", "value": "long-text"},
    {"label": "Email", "value": "email"},
    {"label": "Uuid", "value": "uuid"},
    {"label": "Number", "value": "number"},
    {"label": "Integer", "value": "integer"},
    {"label": "Float", "value": "float"},
    {"label": "Date", "value": "date"},
    {"label": "Time", "value": "time"},
    {"label": "Datetime", "value": "datetime"},
    {"label": "Boolean", "value": "boolean"},
    {"label": "Select", "value": "select"},
]

# USERNAME = "contemptible-keeping"
# PASSWORD = "y2ebP9TCisagk3CvCe8FILfczoAIBUUB"
# USERNAME = "said-clipping"
# PASSWORD = "aJ1EESCHFC3i2lUIXAES2gnVm4FqYjtb"
USERNAME = "possessive-league"
PASSWORD = "HyDyFLRzN0a2DyFI1p0x5lgTc2QNej1J"

BASE_URL = "http://localhost:2525"
# BASE_URL = "https://network-core-system-production.up.railway.app"
# BASE_URL = "http://45.136.19.222:2525"

CREATE_INDIVIDUALS = True

In [18]:
def create_timestamped_folder(suffix="folder", base_path="data"):
    """
    Crée un dossier avec un nom contenant un préfixe et un horodatage au format YYYYMMDDHHMMSS.

    Args:
        prefix (str): Le préfixe du dossier.
        base_path (str): Le chemin de base où créer le dossier. Par défaut, le dossier actuel.

    Returns:
        str: Le chemin complet du dossier créé.
    """
    # Générer l'horodatage au format YYYYMMDDHHMMSS
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    # Construire le nom du dossier
    folder_name = f"{timestamp}_{suffix}"
    folder_path = os.path.join(base_path, folder_name)
    # Créer le dossier
    os.makedirs(folder_path, exist_ok=True)
    return folder_path

def save_json(data, file_name):
    with open(file_name, 'w') as f:
        json.dump(data, f, indent=4)


def load_json(file_name):
    with open(file_name, 'r') as f:
        return json.load(f)


def get_random_word():
    r = RandomWords()
    return r.get_random_word()


def generate_random_value(field_type):
    # print('field_type', field_type)
    if field_type == "simple-text":
        return get_random_word()
    elif field_type == "long-text":
        return ' '.join(get_random_word() for _ in range(10))
    elif field_type == "email":
        return f"{get_random_word()}@{get_random_word()}.com"
    elif field_type == "uuid":
        return str(uuid.uuid4())
    elif field_type == "number":
        return random.uniform(1, 100)
    elif field_type == "integer":
        return random.randint(1, 100)
    elif field_type == "float":
        return round(random.uniform(1.0, 100.0), 2)
    elif field_type == "date":
        return date.today().isoformat()
    elif field_type == "time":
        return time(random.randint(0, 23), random.randint(0, 59), random.randint(0, 59)).isoformat()
    elif field_type == "datetime":
        return datetime.now().isoformat()
    elif field_type == "boolean":
        return random.choice([True, False])
    elif field_type == "select":
        return ';'.join(get_random_word() for _ in range(random.randint(1, 5)))
    else:
        raise ValueError(f"Unsupported field type: {field_type}")


def make_get_request(url, jwt_token=None):
    """Make a get request to the given url with the given jwt token."""
    try:
        if jwt_token:
            headers = {'Authorization': 'Bearer ' + jwt_token}
            response = rq.get(url, headers=headers)
        else:
            response = rq.get(url)
        return response
    except Exception as e:
        print(e)
        return None


def make_post_request(url, data, jwt_token=None):
    """Make a post request to the given url with the given jwt token."""
    try:
        if jwt_token:
            headers = {'Authorization': 'Bearer ' + jwt_token}
            response = rq.post(url, headers=headers, json=data)
        else:
            response = rq.post(url, json=data)
        return response
    except Exception as e:
        print(e)
        return None


def make_patch_request(url, data, jwt_token=None):
    """Make a patch request to the given url with the given jwt token."""
    try:
        if jwt_token:
            headers = {'Authorization': 'Bearer ' + jwt_token}
            response = rq.patch(url, headers=headers, data=data)
        else:
            response = rq.patch(url, data=data)
        return response
    except Exception as e:
        print(e)
        return None

In [19]:
def create_entity(entity_name):
    """Create an entity with the given name."""
    url = f"{BASE_URL}/entity"
    data = {
        "label": entity_name,
        "description": f"Description of {entity_name}",
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print(f"Entity {entity_name} created successfully.")
    else:
        print(f"Failed to create entity {entity_name}.")
    return response.json()


def create_distribution_channel(channel_name, entity_id):
    """Create a distribution channel with the given name."""
    url = f"{BASE_URL}/distribution-channel"
    data = {
        "entityId": entity_id,
        "label": channel_name,
        "description": f"Description of {channel_name}",
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print(f"Distribution channel {channel_name} created successfully.")
    else:
        print(f"Failed to create distribution channel {channel_name}.")
    return response.json()


def create_node_type(node_type_name, distribution_channel_id, node_type_parent_id=0):
    """Create a node type with the given name."""
    url = f"{BASE_URL}/node-type"
    data = {
        "distributionChannelId": distribution_channel_id,
        "label": node_type_name,
        "description": f"Description of {node_type_name}",
        "nodeTypeParentId": node_type_parent_id,
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print(f"Node type {node_type_name} created successfully.")
    else:
        print(f"Failed to create node type {node_type_name}.")
    return response.json()

def get_distribution_channel(distribution_channel_id):
    """Get the distribution channel with the given id."""
    url = f"{BASE_URL}/distribution-channel/{distribution_channel_id}"
    response = make_get_request(url)
    if response.status_code == 200:
        print(f"Distribution channel {distribution_channel_id} retrieved successfully.")
    else:
        print(f"Failed to retrieve distribution channel {distribution_channel_id}.")
    return response.json()

def list_data_field_type():
    """List all data field types."""
    url = f"{BASE_URL}/data-field-type"
    response = make_get_request(url)
    if response.status_code == 200:
        print("Data field types listed successfully.")
    else:
        print("Failed to list data field types.")
    return response.json()

def create_data_field(data_field_name, node_type_id, data_field_type, is_primary=False):
    """Create a data field with the given name."""
    url = f"{BASE_URL}/data-field"
    data = {
        "nodeTypeId": node_type_id,
        "dataFieldTypeId": data_field_type.get("id"),
        "isPrimaryKey": is_primary,
        "label": data_field_name,
        "description": f"Description of {data_field_name}",
        "optionnal": random.choice([True, False]),
        "fillingType": random.choice(["manual", "automatic", "mixed"]),
        "defaultValue": "",
        "exampleValue": "",
        "selectValues": generate_random_value(data_field_type.get("value")) if data_field_type.get("value") == "select" else "",
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print(f"Data field {data_field_name} created successfully.")
    else:
        print(f"Failed to create data field {data_field_name}.")
    return response.json()

def get_node_type(node_type_id):
    """Get the node type with the given id."""
    url = f"{BASE_URL}/node-type/{node_type_id}"
    response = make_get_request(url)
    if response.status_code == 200:
        print(f"Node type {node_type_id} retrieved successfully.")
    else:
        print(f"Failed to retrieve node type {node_type_id}.")
    return response.json()


def create_node(node_name, node_type_id, node_parent_id=0):
    """Create a node with the given name."""
    url = f"{BASE_URL}/node"
    data = {
        "nodeTypeId": node_type_id,
        "label": node_name,
        "description": f"Description of {node_name}",
        "nodeParentId": node_parent_id,
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print(f"Node {node_name} created successfully.")
    else:
        print(f"Failed to create node {node_name}.")
    return response.json()

def make_auth_request(username, password):
    """Make an auth request with the given username and password."""
    url = f"{BASE_URL}/auth/signin"
    data = {
        "username": username,
        "password": password,
    }
    response = make_post_request(url, data)
    if response.status_code == 201:
        print("Auth request successful.")
    else:
        print("Auth request failed.")
    return response.json()

def create_individual(node_id, data, jwt_token):
    """Create an individual with the given data."""
    url = f"{BASE_URL}/individual"
    data = {
        "nodeId": node_id,
        "data": data,
    }
    response = make_post_request(url, data, jwt_token)
    if response.status_code == 201:
        print(f"Individual created successfully.")
    else:
        print(f"Failed to create individual.")
    return response.json()

In [20]:
NODE_TYPE_NUMBER = 3
DATA_FIELD_NUMBER = 3
INDIVIDUAL_NUMBER = 10

### CREATION D'UNE ENTITE

In [21]:
entity_name = get_random_word()

folder_path = create_timestamped_folder(suffix=entity_name)
print(f"Folder created: {folder_path}")

r_entity = create_entity(entity_name)
entity_data = r_entity.get("data")
save_json(r_entity, os.path.join(folder_path, "entity.json"))

Folder created: data/20250123211020_illogic
Entity illogic created successfully.


### CREATION D'UN RESEAU DE DISTRIBUTION

In [22]:
distribution_channel_name = f"Channel {entity_name} {get_random_word()}"

r_distribution_channel = create_distribution_channel(distribution_channel_name, entity_data.get("id"))

distribution_channel_data = r_distribution_channel.get("data")

save_json(r_distribution_channel, os.path.join(folder_path, "distribution_channel.json"))

Distribution channel Channel illogic suggestion created successfully.


### CREATION DES TYPES DE NOEUDS

In [23]:
node_type_datas = []

node_type_parent_id = 0
for i in range(NODE_TYPE_NUMBER):
    node_type_name = get_random_word()

    r_node_type = create_node_type(
        node_type_name,
        distribution_channel_data.get("id"),
        node_type_parent_id
    )
    node_type_data = r_node_type.get("data")
    node_type_datas.append(node_type_data)

    node_type_parent_id = node_type_data.get("id")

    # juste utile pour debuggage
    # save_json(r_node_type, os.path.join(folder_path, f"node_type_{i}.json"))

Node type pivotman created successfully.
Node type disforest created successfully.
Node type protuberance created successfully.


### RECHARGEMENT DES INFORMATIONS DU RESEAU DE DISTRIBUTION

In [24]:
r_distribution_channel = get_distribution_channel(distribution_channel_data.get("id"))
save_json(r_distribution_channel, os.path.join(folder_path, "distribution_channel.json"))

Distribution channel 4 retrieved successfully.


### LISTER LES TYPES DE CHAMPS DE DONNEES

In [25]:
r_data_field_types = list_data_field_type()

data_field_types = r_data_field_types.get("data")

save_json(r_data_field_types, os.path.join(folder_path, "data_field_types.json"))

Data field types listed successfully.


### CREATION DES CHAMPS DE DONNEES PAR TYPE DE NOEUD

In [26]:
for i, node_type_data in enumerate(node_type_datas):
    for j in range(DATA_FIELD_NUMBER):
        data_field_name = get_random_word()
        data_field_type = random.choice(data_field_types)
        r_data_field = create_data_field(data_field_name, node_type_data.get("id"), data_field_type, is_primary=j == 0)
        # juste utile pour debuggage
        # save_json(r_data_field, os.path.join(folder_path, f"data_field_{i}_{j}.json"))

    r_node_type = get_node_type(node_type_data.get("id"))
    save_json(r_node_type, os.path.join(folder_path, f"node_type_{i}.json"))
    print('-'*50)

Data field levelly created successfully.
Data field unsorry created successfully.
Data field phalluses created successfully.
Node type 10 retrieved successfully.
--------------------------------------------------
Data field airbag created successfully.
Data field lungwort created successfully.
Data field reconsiders created successfully.
Node type 11 retrieved successfully.
--------------------------------------------------
Data field overelaborately created successfully.
Data field islandology created successfully.
Data field kitties created successfully.
Node type 12 retrieved successfully.
--------------------------------------------------


### CREATION DES NOEUDS PAR NIVEAU (TYPE DE NOEUD)

In [27]:
all_node_parents = []

for i, node_type_data in enumerate(node_type_datas):
    node_number = i + 2
    node_parents = []

    for j in range(node_number):
        node_parent_id = 0
        node_name = "Node {} {}".format(
            node_type_data.get("label"),
            get_random_word()
        )

        if i > 0 and all_node_parents[i - 1]:
            node_parent = random.choice(all_node_parents[i - 1])
            node_parent_id = node_parent.get("id")

        r_node = create_node(
            node_name, node_type_data.get("id"), node_parent_id)
        node_data = r_node.get("data")
        # juste utile pour debuggage
        # save_json(r_node, os.path.join(folder_path, f"node_{i}_{j}.json"))

        node_parents.append(node_data)

    all_node_parents.append(node_parents)

    r_node_type = get_node_type(node_type_data.get("id"))
    save_json(r_node_type, os.path.join(folder_path, f"node_type_{i}.json"))
    print('-'*50)

Node Node pivotman arabinosic created successfully.
Node Node pivotman fishline created successfully.
Node type 10 retrieved successfully.
--------------------------------------------------
Node Node disforest anxiously created successfully.
Node Node disforest postparalytic created successfully.
Node Node disforest tablemount created successfully.
Node type 11 retrieved successfully.
--------------------------------------------------
Node Node protuberance oculina created successfully.
Node Node protuberance jargonisation created successfully.
Node Node protuberance gorgonin created successfully.
Node Node protuberance urethra created successfully.
Node type 12 retrieved successfully.
--------------------------------------------------


### AUTHENTIFICATION POUR INSERER LES DONNEES DES INDIVIDUS

In [28]:
r_auth = make_auth_request(USERNAME, PASSWORD)
save_json(r_auth, os.path.join(folder_path, "auth.json"))
jwt_token = r_auth.get("data").get("accessToken")

Auth request successful.


### CREATION DES INDIVIDUS POUR PEUPLER LES NOEUDS

In [29]:
if CREATE_INDIVIDUALS:
    for i in range(len(node_type_datas)):
        r_node_type = load_json(os.path.join(
            folder_path, f"node_type_{i}.json"))
        node_type_data = r_node_type.get("data")

        nodes = node_type_data.get("Node")
        data_fields = node_type_data.get("DataField")
        # recuperer que ceux dont le fillingType est automatic ou mixed
        data_fields = [data_field for data_field in data_fields if data_field.get(
            "fillingType") in ["automatic", "mixed"]]

        for j, node in enumerate(nodes):
            data = {}
            for data_field in data_fields:
                data[data_field.get("slug")] = generate_random_value(
                    data_field.get("dataFieldType").get("value"))

            r_individual = create_individual(node.get("id"), data, jwt_token)
            # juste utile pour debuggage
            # save_json(r_individual, os.path.join(
            #     folder_path, "individual_of_{}.json".format(node.get("value"))))
            # save_json(r_individual, os.path.join(
            #     folder_path, f"individual_{i}_{j}.json"))

        print('-'*50)

Individual created successfully.
Individual created successfully.
--------------------------------------------------
Individual created successfully.
Individual created successfully.
Individual created successfully.
--------------------------------------------------
Failed to create individual.
Failed to create individual.
Failed to create individual.
Failed to create individual.
--------------------------------------------------


In [30]:
# [
#     {
#         expanded: true,
#         type: 'person',
#         data: {
#             name: 'Amy Elsner',
#             title: 'CEO'
#         },
#         children: [
#             {
#                 expanded: true,
#                 type: 'person',
#                 data: {
#                     name: 'Anna Fali',
#                     title: 'CMO'
#                 }
#             },
#             {
#                 expanded: true,
#                 type: 'person',
#                 data: {
#                     name: 'Stephen Shaw',
#                     title: 'CTO'
#                 }
#             }
#         ]
#     }
# ]