In [5]:
import pandas as pd
import sqlite3
import json
import os
import select
from sqlite3 import connect
from pandas import read_sql_query
from json import load 
from pandas import DataFrame
from pandas import read_csv
from rdflib import Graph, URIRef, Literal, RDF 
from rdflib.plugins.stores.sparqlstore import SPARQLUpdateStore

In [6]:
class Handler(object): #this is the first class, all the others derive from this one 

    #creating the class 
    def __init__(self, dbPathOrUrl : str):
        self.dbPathOrUrl = dbPathOrUrl

    #creating the methods 
    def getDbPathOrUrl(self): 
        return self.dbPathOrUrl 

    def setDbPathOrUrl(self, pathOrUrl : str): #: boolean 
        self.dbPathOrUrl = pathOrUrl
        return True

class UploadHandler(Handler):

    def pushDataToDb(self, path: str):  #self implied 
        if path.lower().endswith(".csv"): 
            handler = JournalUploadHandler(self.dbPathOrUrl)
            return handler.journalUpload(path) #calling the method after I called the subclass
        elif path.lower().endswith(".json"): 
            handler = CategoryUploadHandler(self.dbPathOrUrl)
            return handler.categoryUpload(path)
        else: 
            return False 

#first case: the path is of the relational database the json file

In [7]:

class CategoryUploadHandler(UploadHandler):



    def categoryUpload(self, path: str):  
        # Load JSON data
        try:
            with open(path, "r", encoding="utf-8") as f:
                json_data = json.load(f)
        except FileNotFoundError:
            print(f"Error: The file {path} was not found.")
            return
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON from {path}: {e}")
            return # Exit if JSON is invalid
        except Exception as e:
            print(f"An unexpected error occurred while opening or reading {path}: {e}")
            return

        # Ensure json_data is a list
        if not isinstance(json_data, list):
            print(f"Error: JSON data from {path} is not a list. Found type: {type(json_data)}")
            self._create_empty_tables()
            return

        item_data_rows = []
        category_details_map = {} 
        area_details_map = {}     

        # Changed to map (original_id, quartile) tuple to internal_id
        category_composite_to_internal_id = {} 
        area_name_to_internal_id = {}         
        
        next_internal_category_idx = 0
        next_internal_area_idx = 0
        
        # Set to keep track of processed item identifiers to ensure PK uniqueness
        processed_item_identifiers = set()

        for item_idx, item in enumerate(json_data):
            if not isinstance(item, dict):
                print(f"Warning: Skipping item at index {item_idx} as it is not a dictionary.")
                continue

            # --- Determine Item Main Identifier (Primary Key for item_data) ---
            # This identifier will be taken from the first entry in the "identifiers" list in the JSON.
            identifiers_from_json = item.get("identifiers") # Use .get() for safer access
            item_main_id = None

            if not identifiers_from_json or not isinstance(identifiers_from_json, list) or len(identifiers_from_json) == 0:
                print(f"Warning: Item at index {item_idx} has no 'identifiers' list, it's not a list, or it's empty. Skipping item.")
                continue 

            potential_main_id = identifiers_from_json[0]

            if not isinstance(potential_main_id, str) or not potential_main_id.strip():
                print(f"Warning: Item at index {item_idx} has an invalid or empty first identifier ('{potential_main_id}'). Skipping item.")
                continue 
            
            # Check for duplicate identifiers (PK constraint)
            if potential_main_id in processed_item_identifiers:
                print(f"Warning: Duplicate identifier '{potential_main_id}' found for item at index {item_idx}. Skipping item to maintain PK integrity.")
                continue 
            
            item_main_id = potential_main_id
            processed_item_identifiers.add(item_main_id)
            # 'item_main_id' is now the actual identifier from the file (e.g., "ABCD-1234")

        
            item_associated_category_internal_ids = [] 
            for cat_info in item.get("categories", []): 
                if not isinstance(cat_info, dict) or "id" not in cat_info: 
                    print(f"Warning: Skipping invalid category data for item '{item_main_id}': {cat_info}") 
                    continue 
                  
                original_cat_json_id = cat_info["id"] 
                quartile = cat_info.get("quartile") # Capture quartile, which can be None

                # Create a composite key for the category (original_id, quartile)
                category_key = (original_cat_json_id, quartile)

                # If this composite category (id + quartile) is not yet in our map, create a new internal ID
                if category_key not in category_composite_to_internal_id:
                    internal_cat_id = f"cat-{next_internal_category_idx}"
                    category_composite_to_internal_id[category_key] = internal_cat_id
                    
                    category_details_map[internal_cat_id] = {
                        "original_id": original_cat_json_id,
                        "quartile": quartile
                    }
                    next_internal_category_idx += 1
                else:
                    # If it exists, use the previously assigned internal ID
                    internal_cat_id = category_composite_to_internal_id[category_key]
                  
                item_associated_category_internal_ids.append(internal_cat_id) 

            
            item_associated_internal_area_id = None 
            areas_list_from_json = item.get("areas", [])
            if areas_list_from_json:
                if isinstance(areas_list_from_json, list) and len(areas_list_from_json) > 0:
                    first_area_name = areas_list_from_json[0]
                    if not isinstance(first_area_name, str):
                        print(f"Warning: Skipping non-string area data for item '{item_main_id}': {first_area_name}")
                    else:
                        if first_area_name not in area_name_to_internal_id:
                            internal_area_id = f"area-{next_internal_area_idx}"
                            area_name_to_internal_id[first_area_name] = internal_area_id
                            
                            area_details_map[internal_area_id] = {"name": first_area_name}
                            next_internal_area_idx += 1
                        else:
                            internal_area_id = area_name_to_internal_id[first_area_name]
                        item_associated_internal_area_id = internal_area_id
                else:
                    print(f"Warning: 'areas' field for item '{item_main_id}' is not a non-empty list: {areas_list_from_json}")

            
            
            # 'identifier' (e.g., "ABCD-1234" from the file) is the Primary Key for this table.
            current_item_row = {"identifier": item_main_id} 
            
            for i in range(8): 
                col_name = f"category_{i+1}"
                if i < len(item_associated_category_internal_ids):
                    current_item_row[col_name] = item_associated_category_internal_ids[i]
                else:
                    current_item_row[col_name] = None 
            
            current_item_row["area"] = item_associated_internal_area_id 
            item_data_rows.append(current_item_row)

        # Convert collected data to Pandas DataFrames
        # Only create DataFrame if there are rows to avoid error with empty list
        if item_data_rows:
            item_data_df = pd.DataFrame(item_data_rows)
        else:
            # Define columns for empty DataFrame to match schema if no valid items were processed
            item_data_cols = ["identifier"] + [f"category_{i+1}" for i in range(8)] + ["area"]
            item_data_df = pd.DataFrame(columns=item_data_cols)
        
        categories_table_rows = []
        for internal_id, details in category_details_map.items():
            categories_table_rows.append({
                "id": internal_id, 
                "original_id": details["original_id"],
                "quartile": details["quartile"]
            })
        if categories_table_rows:
            category_details_df = pd.DataFrame(categories_table_rows)
        else:
            category_details_df = pd.DataFrame(columns=["id", "original_id", "quartile"])


        areas_table_rows = []
        for internal_id, details in area_details_map.items():
            areas_table_rows.append({
                "id": internal_id, 
                "name": details["name"]
            })
        if areas_table_rows:
            area_details_df = pd.DataFrame(areas_table_rows)
        else:
            area_details_df = pd.DataFrame(columns=["id", "name"])


        # Write DataFrames to SQLite database
        try:
            with sqlite3.connect(self.dbPathOrUrl) as con:
                item_data_df.to_sql("item_data", con, if_exists="replace", index=False)
                category_details_df.to_sql("categories", con, if_exists="replace", index=False)
                area_details_df.to_sql("areas", con, if_exists="replace", index=False)
            print(f"Data successfully uploaded to {self.dbPathOrUrl}")

        except sqlite3.Error as e:
            print(f"Database error: {e}")
        except Exception as e:
            print(f"An unexpected error occurred during database operation: {e}")

    def _create_empty_tables(self):
        """Helper method to create empty tables with the defined schema if input is problematic."""
        try:
            with sqlite3.connect(self.dbPathOrUrl) as con:
                item_data_cols = ["identifier"] + [f"category_{i+1}" for i in range(8)] + ["area"]
                pd.DataFrame(columns=item_data_cols)\
                  .to_sql("item_data", con, if_exists="replace", index=False)
                
                pd.DataFrame(columns=["id", "original_id", "quartile"])\
                  .to_sql("categories", con, if_exists="replace", index=False)
                
                pd.DataFrame(columns=["id", "name"])\
                  .to_sql("areas", con, if_exists="replace", index=False)
                print(f"Created empty tables in {self.dbPathOrUrl} due to invalid or empty JSON input.")
        except sqlite3.Error as e:
            print(f"Database error while creating empty tables: {e}")
        except Exception as e:
            print(f"An unexpected error occurred while creating empty tables: {e}")

In [9]:

class CategoryQueryHandler:

    def __init__(self, dbPathOrUrl):
        """
        Initializes the CategoryQueryHandler with the path to the SQLite database.

        Args:
            dbPathOrUrl (str): The path or URL to the SQLite database file.
        """
        self.dbPathOrUrl = dbPathOrUrl

    def _execute_query(self, query: str) -> pd.DataFrame:
        """
        Helper method to execute a SQL query and return the result as a Pandas DataFrame.
        Handles database connection and common errors.

        Args:
            query (str): The SQL query string to execute.

        Returns:
            pd.DataFrame: A DataFrame containing the query results, or an empty DataFrame on error.
        """
        try:
            with sqlite3.connect(self.dbPathOrUrl) as con:
                df = pd.read_sql_query(query, con)
                return df
        except sqlite3.Error as e:
            print(f"Database error during query execution: {e}")
            return pd.DataFrame()  # Return empty DataFrame on error
        except Exception as e:
            print(f"An unexpected error occurred during query execution: {e}")
            return pd.DataFrame()

    def getAllCategories(self):
        
        query = "SELECT id, original_id, quartile FROM categories"
        df = self._execute_query(query)
        return df["original_id"].drop_duplicates()

    def getAllAreas(self):
        
        query = "SELECT id, name FROM areas"
        df = self._execute_query(query)
        return df["name"].drop_duplicates()

    def getCategoriesWithQuartile(self, quartiles: list) -> pd.DataFrame:
        """
        Returns a DataFrame containing all the categories having specified quartiles, with no repetitions.
        If the input collection of quartiles is empty, it returns all categories.

        Args:
            quartiles (list): A list of quartile values (e.g., [1, 2, "Q3"]).

        Returns:
            pd.DataFrame: A DataFrame with columns 'id', 'original_id', 'quartile' for matching categories.
        """
        if not quartiles:
            return self.getAllCategories()
        
        # Ensure quartile values are properly formatted for the SQL IN clause (strings quoted, numbers not)
        formatted_quartiles = []
        for q in quartiles:
            if isinstance(q, str):
                formatted_quartiles.append(f"'{q}'")
            elif pd.isna(q): # Handle NaN values in quartiles if they exist
                formatted_quartiles.append('NULL')
            else:
                formatted_quartiles.append(str(q))

        quartiles_str = ",".join(formatted_quartiles)
        
        query = f"SELECT id, original_id, quartile FROM categories WHERE quartile IN ({quartiles_str})"
        # Add handling for NULL quartiles if 'NULL' is in the input list
        if 'NULL' in formatted_quartiles:
            query += " OR quartile IS NULL"
        
        return self._execute_query(query)

    def getCategoriesAssignedToAreas(self, areas: list) -> pd.DataFrame:
        """
        Returns a DataFrame containing all the categories assigned to particular areas specified as input,
        with no repetitions. If the input collection of areas is empty, it considers all areas.

        Args:
            areas (list): A list of area names (e.g., ["Area A", "Area B"]).

        Returns:
            pd.DataFrame: A DataFrame with columns 'id', 'original_id', 'quartile' for relevant categories.
        """
        item_data_df = self._execute_query("SELECT * FROM item_data")
        categories_df = self._execute_query("SELECT id, original_id, quartile FROM categories")
        areas_df = self._execute_query("SELECT id, name FROM areas")

        if not areas:
            # If no areas specified, get all categories that are linked to any item in item_data
            all_cat_ids_in_items = set()
            for i in range(1, 9):
                col_name = f"category_{i}"
                if col_name in item_data_df.columns:
                    all_cat_ids_in_items.update(item_data_df[col_name].dropna().unique())
            
            if not all_cat_ids_in_items:
                return pd.DataFrame(columns=["id", "original_id", "quartile"])

            return categories_df[categories_df['id'].isin(all_cat_ids_in_items)]
        
        else:
            # Get internal area IDs for the given area names
            target_area_ids = areas_df[areas_df['name'].isin(areas)]['id'].tolist()

            if not target_area_ids:
                return pd.DataFrame(columns=["id", "original_id", "quartile"])

            # Filter item_data to include only items assigned to the target areas
            filtered_item_data_df = item_data_df[item_data_df['area'].isin(target_area_ids)]

            if filtered_item_data_df.empty:
                return pd.DataFrame(columns=["id", "original_id", "quartile"])
            
            # Collect all unique category IDs from the filtered item_data
            assigned_category_ids = set()
            for i in range(1, 9):
                col_name = f"category_{i}"
                if col_name in filtered_item_data_df.columns:
                    assigned_category_ids.update(filtered_item_data_df[col_name].dropna().unique())
            
            if not assigned_category_ids:
                return pd.DataFrame(columns=["id", "original_id", "quartile"])

            # Filter the categories_df to get the details of these categories
            return categories_df[categories_df['id'].isin(assigned_category_ids)]

    def getAreasAssignedToCategories(self, categories: list) -> pd.DataFrame:
        """
        Returns a DataFrame containing all the areas assigned to particular categories specified as input,
        with no repetitions. If the input collection of categories is empty, it considers all categories.

        Args:
            categories (list): A list of original category IDs (e.g., ["cat_123", "cat_456"]).

        Returns:
            pd.DataFrame: A DataFrame with columns 'id', 'name' for relevant areas.
        """
        item_data_df = self._execute_query("SELECT * FROM item_data")
        categories_df = self._execute_query("SELECT id, original_id, quartile FROM categories")
        areas_df = self._execute_query("SELECT id, name FROM areas")

        if not categories:
            # If no categories specified, get all areas that are linked to any item in item_data
            all_area_ids_in_items = item_data_df['area'].dropna().unique().tolist()
            
            if not all_area_ids_in_items:
                return pd.DataFrame(columns=["id", "name"])

            return areas_df[areas_df['id'].isin(all_area_ids_in_items)]
        
        else:
            # Get internal category IDs for the given original category IDs
            target_category_ids = categories_df[categories_df['original_id'].isin(categories)]['id'].tolist()

            if not target_category_ids:
                return pd.DataFrame(columns=["id", "name"])

            # Filter item_data to include only items assigned to the target categories
            # This requires checking across all 8 category columns
            category_mask = pd.Series([False] * len(item_data_df), index=item_data_df.index)
            for i in range(1, 9):
                col_name = f"category_{i}"
                if col_name in item_data_df.columns:
                    # Ensure that non-string IDs are handled (e.g., if internal IDs are numeric)
                    # Convert target_category_ids to the same type as column or vice-versa if needed
                    # For now, assuming they are consistent.
                    category_mask = category_mask | item_data_df[col_name].isin(target_category_ids)
            
            filtered_item_data_df = item_data_df[category_mask]

            if filtered_item_data_df.empty:
                return pd.DataFrame(columns=["id", "name"])

            # Collect all unique area IDs from the filtered item_data
            assigned_area_ids = filtered_item_data_df['area'].dropna().unique().tolist()
            
            if not assigned_area_ids:
                return pd.DataFrame(columns=["id", "name"])

            # Filter the areas_df to get the details of these areas
            return areas_df[areas_df['id'].isin(assigned_area_ids)]

In [10]:
handler = CategoryUploadHandler("categories_database.db")
handler.categoryUpload("scimago.json")


Data successfully uploaded to categories_database.db


In [14]:
query = CategoryQueryHandler("categories_database.db")
query.getCategoriesWithQuartile("Q3")


Unnamed: 0,id,original_id,quartile
