In [4]:
import pyTigerGraph as tg
from pydantic import BaseModel, ValidationError, model_validator
from typing import Literal, Dict, Optional
import importlib.util
import pyTigerGraph as tg
import os
import json
from templates.create_local_graph_schema import create_graph_schema_template
from templates.create_abs_data_source import create_abs_data_source_template
from templates.create_postgre_data_source import create_postgre_data_source_template
from templates.create_loading_jobs import create_loading_jobs_template
from templates.create_loading_jobs_for_local_files import create_loading_jobs_template_local_files
from templates.abs_data_source import abs_file_path_template


hostName = "https://2cc2f8bde8df444bb60c6fb83491bb8c.i.tgcloud.io"
graphName = "VWG"
secret = "m2p8nba0uab7dtthbn4o1b30r8tqgg9a"
userName = "user_1"
password = "A1z2e3r4*"
conn = tg.TigerGraphConnection(host=hostName, graphname=graphName, username=userName, password=password)
conn.getToken(secret)

('tb2rs484mpjf17il45hkcf46dapon1qv', 1733413267, '2024-12-05 15:41:07')

In [5]:
data_source_dict = {
    "vertex_data": r"C:\Users\JulienRigot\OneDrive - LIS Data Solutions\Escritorio\code_GORDIAS\base de datos graph\Tigergraph\data\Europe\biggest_europe_cities.csv",  # Chemin vers le fichier de données pour les sommets
    "edge_data": r"C:\Users\JulienRigot\OneDrive - LIS Data Solutions\Escritorio\code_GORDIAS\base de datos graph\Tigergraph\data\Europe\conexiones_biggest_europe_cities.csv"       # Chemin vers le fichier de données pour les arêtes
}


In [8]:
def load_module(module_name: str, file_path: str):
    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module



def load_queries(folder_name: str = 'queries'):
    """This function loads all templates called "query" inside the each python file on folder folder_name.
    NOTE! the name of the .py file should match the query name

    Args:
        folder_name (str, optional): Folder to retrieve queries. Defaults to 'queries'.

    Returns:
        A dictionary with key = query name, value = actual query template
    """
    queries = {}
    for filename in os.listdir(folder_name):
        if filename.endswith('.py'):
            file_path = os.path.join(folder_name, filename)
            module_name = filename[:-3]
            module = load_module(module_name, file_path)
            if hasattr(module, 'query'):
                queries[module_name] = getattr(module, 'query')
    return queries

In [7]:

class ABSConfig(BaseModel):
    """Required attributes for azure blob storage data source."""
    storageaccount: str
    container: str
    connection_key: str



class PostgreConfig(BaseModel):
    host: str
    port: int
    db_user: str
    db_password: str
    db_name: str



class PostgreDataSourceConfig(BaseModel):
    """Validation model for when data source is postgre SQL database
    Raises:
        ValueError: when there are missing keys
    """
    data_source_type: str
    postgre_config: Optional[PostgreConfig] = None
    @model_validator(mode='before')
    def check_postgre_config(cls, values):
        data_source_type = values.get('data_source_type')
        postgre_config = values.get('postgre_config')
        if data_source_type == 'postgre' and not postgre_config:
            raise ValueError('If data_source_type is postgre, you must provide the required keys in postgre_config.')
        return values



class ABSDataSourceConfig(BaseModel):
    """Validation model for when data source is azure blob storage
    Raises:
        ValueError: when there are missing keys
    """
    data_source_type: str
    abs_config: Optional[ABSConfig] = None
    @model_validator(mode='before')
    def check_abs_config(cls, values):
        data_source_type = values.get('data_source_type')
        abs_config = values.get('abs_config')
        if data_source_type == "abs" and not abs_config:
            raise ValueError('If data_source_type is abs, you must provide the required keys in abs_config.')
        return values

class GraphGenerator():
    def __init__(self, conn: tg.TigerGraphConnection, 
                graph_name: str, 
                data_source_dict: Dict,
                data_source_type: Literal['local_files', 'abs', 'postgre'] = 'local_files',
                config_data: Optional[dict] = None):
        
        self.graph_name = graph_name
        self.data_source_dict = data_source_dict
        self.conn = conn 
        self.data_source_type = data_source_type

        # if data source is distinct from local files, check if all required keys are present in the config_data dict
        if self.data_source_type == 'abs':
            self.data_source = self.graph_name+'_abs_data_source'
            try:
                self.config = ABSDataSourceConfig(
                    data_source_type=data_source_type,
                    abs_config=config_data
                )
            except ValidationError as e:
                print("Validation error:", e)
        elif self.data_source_type == 'postgre':
            self.data_source = self.graph_name+'_postgre_data_source'
            try:
                self.config = PostgreDataSourceConfig(
                    data_source_type = data_source_type,
                    postgre_config = config_data)
            except ValidationError as e:
                print("Validation error:", e)



    def create_local_graph(self):
        create_graph_statement = create_graph_schema_template.substitute({'graph_name': self.graph_name, 
                                                                        'schema_change_job': 'schema_change_job_'+str(self.graph_name)})
        try:
            print(self.conn.gsql(create_graph_statement))
            # update conn object to point to new graph
            self.conn.graphname = self.graph_name 
            self.conn.echo() 
            #create secret and save it as a file - theres prob a better way to do this 
            secret = self.conn.createSecret(alias=self.graph_name+'_secret')
            self.conn.gsqlSecret = secret
            info = {}
            info['graphname'] = self.graph_name
            info['secret'] = secret
            with open(f"credentials_{self.graph_name}.json", 'w') as f:
                json.dump(info, f)
        except Exception as e:
            print(f"Something went wrong while creating the graph: {str(e)}")



    def create_data_source(self):
        # if source is local files, we dont need to create a datasource
        if self.data_source_type == 'local_files':
            return 
        if self.data_source_type == 'abs':
            create_data_source_statement = create_abs_data_source_template.substitute({
                    'graph_name': self.graph_name, 
                    'abs_datasource': self.data_source,
                    'azure_key': self.config.abs_config.connection_key})
        elif self.data_source_type == 'postgre':
            create_data_source_statement = create_postgre_data_source_template.substitute({
                    'graph_name': self.graph_name, 
                    'postgre_datasource': self.data_source,
                    'host': self.config.postgre_config.host,
                    'port': self.config.postgre_config.port,
                    'db_user': self.config.postgre_config.db_user,
                    'db_password': self.config.postgre_config.db_password,
                    'db_name': self.config.postgre_config.db_name})
        try:
            current_ds = self.conn.gsql(f"USE GRAPH {self.graph_name} SHOW DATA_SOURCE *")
            if self.data_source not in current_ds:
                print(self.conn.gsql(create_data_source_statement))
            else:
                print(f"Data source {self.data_source} already exists for graph {self.graph_name}!")
        except Exception as e:
            print(f"Something went wrong while creating the datasource: {str(e)}")



    def create_loading_jobs(self):
        self.data_source_dict['graph_name'] = self.graph_name

        if self.data_source_type == 'abs':
            # if datasource is ABS we need to construct the absolute path to each file in ABS
            main_path = abs_file_path_template.substitute({'abs_datasource': '$'+self.data_source,
                                                    'storage_account': self.config.abs_config.storageaccount,
                                                    'container': self.config.abs_config.container})
            self.data_source_dict = {key: '\"'+main_path+value+'\"' for key, value in self.data_source_dict.items() if key !='graph_name'}
            create_loading_jobs_statement = create_loading_jobs_template.substitute(self.data_source_dict)

        elif self.data_source_type == 'postgre':
            # in case of postgre, each data_sources[key] = query so no changes are needed
            create_loading_jobs_statement = create_loading_jobs_template.substitute(self.data_source_dict)

        elif self.data_source_type == 'local_files':
            # for local files we don't need to define each filename = datasource, we do it from the command runLoadingJobFromFile
            create_loading_jobs_statement = create_loading_jobs_template_local_files.substitute({'graph_name': 
                                                                                                self.graph_name})

        # we'll use this to iterate through the loading jobs :-) so delete graphname so it doesn't explode
        del self.data_source_dict['graph_name']
        try:
            print(self.conn.gsql(create_loading_jobs_statement))
        except Exception as e:
            print(f"Something went wrong while creating the loading jobs: {str(e)}")


    def load_from_files(self):
        created_jobs = self.conn.gsql(f"USE GRAPH {self.graph_name} SHOW JOB *")
        jobs_to_rerun = []
        for job, file_name in self.data_source_dict.items():
            job_name = job.replace('_data', '')
            if job_name in created_jobs:
                try:
                    print(f"---- Uploading file {file_name} for job {job_name} ---- \n")
                    print(self.conn.runLoadingJobWithFile(filePath = file_name, 
                                                        fileTag = "MyDataSource" , 
                                                        jobName = f"load_{job_name}", timeout=32000))
                except Exception as e:
                    print(f"Job {job_name} went bad: {str(e)}")
                    jobs_to_rerun.append(job_name)
            else:
                print(f"Job {job_name} doesn't exist.")
        print(f"Jobs that failed: {jobs_to_rerun}")



    def load_from_abs(self):
        jobs = list(self.data_source_dict.keys())
        jobs = ['load_'+j.replace('_data', '') for j in jobs]
        created_jobs = self.conn.gsql(f"USE GRAPH {self.graph_name} SHOW JOB *")
        jobs_to_rerun = []
        for j in jobs:
            if j in created_jobs:
                execute_statement = f"USE GRAPH {self.graph_name} RUN LOADING JOB {j}"
                print(f"---- Running job {j} ---- \n")
                try:
                    self.conn.gsql(execute_statement)
                except Exception as e:
                    print(f"Job {j} went bad: {str(e)}")
                    jobs_to_rerun.append(j)
        print(f"Jobs that failed: {jobs_to_rerun}")



    def load_from_postgre(self):
        """I am not sure if we need another method for postgre stuff.
        I think we prob could use load_from_abs since the loading jobs are already created
        and pointing to the sql queries, so no further changes would be needed (?). 
        I'll leave it here just in case
        """
        pass


    def execute_loading_jobs(self):
        if self.data_source_type == 'abs':
            self.load_from_abs()
        elif self.data_source_type == 'local_files':
            self.load_from_files()


    def create_queries(self):
        queries_to_install = load_queries()
        for name, statement in queries_to_install.items():
            st = statement.substitute({'graph_name': self.graph_name})
            try:
                print(f"---- Creating query {name} ---- \n")
                conn.gsql(st)
                print(f"---- Installing query {name} ---- \n")
                conn.gsql(f"USE GRAPH {self.graph_name} INSTALL QUERY {name}")
            except Exception as e:
                print(f"Query {name} went bad: {str(e)}")



    def execute_pipeline(self):
        self.create_local_graph()
        self.create_data_source()
        self.create_loading_jobs()
        self.execute_loading_jobs()
        self.create_queries()


In [9]:
graph_generator = GraphGenerator(
    conn=conn,
    graph_name="VWG",
    data_source_dict=data_source_dict,
    data_source_type='local_files'  # Ou 'abs' ou 'postgre' selon votre source de données
)

In [10]:
graph_generator.create_loading_jobs()

Something went wrong while creating the loading jobs: Using graph 'VWG'
Semantic Check Fails: The vertex type country does not exist in the graph VWG
Failed to create loading jobs: [load_country].
