In [None]:
# =============================================================================
# RDF TO MEILISEARCH PIPELINE
# =============================================================================
# This notebook processes RDF/Turtle data through the following pipeline:
# 1. Load RDF data from Turtle files
# 2. Upload to Apache Jena Fuseki triplestore
# 3. Query data from Fuseki as JSON-LD
# 4. Transform data for Meilisearch indexing
# 5. Upload to Meilisearch for full-text search

# =============================================================================
# SECTION 1: SETUP AND DEPENDENCIES
# =============================================================================

In [None]:
!pip3 install rdflib meilisearch

In [1]:
try:
    from rdflib import Graph
    import requests
    import sys
    import os
    import json
    import uuid
    import re
    import time
    import meilisearch

    print("‚úÖ Dependencies loaded successfully!")

except:
    print("‚ùå Dependencies failed to load!")

‚úÖ Dependencies loaded successfully!


In [None]:
# =============================================================================
# SECTION 2: CONFIGURATION
# =============================================================================

In [None]:
# Jena Fuseki Configuration
FUSEKI_URL = ""  
DATASET_NAME = ""  
FUSEKI_USERNAME = "admin"  
FUSEKI_PASSWORD = "" 

# Meilisearch Configuration  
MEILISEARCH_URL = ""  
MEILISEARCH_API_KEY = ""  
INDEX_NAME = ""

# File Configuration
TURTLE_FILE = "20251030_112437.ttl"

print("‚öôÔ∏è Configuration variables set")

‚öôÔ∏è Configuration variables set


In [None]:
# =============================================================================
# SECTION 3: LOAD AND EXPLORE RDF DATA
# =============================================================================

In [None]:
# Initialize RDF graph
print("üìä Loading RDF data...")
graph = Graph()

In [None]:
# Parse the Turtle file
graph.parse(TURTLE_FILE, format="turtle")
print(f"‚úÖ Loaded {len(graph)} triples from {TURTLE_FILE}")

In [None]:
# Explore the data structure
print("\nüîç Sample triples:")
print("=" * 50)
for i, (subject, predicate, obj) in enumerate(graph):
    if i >= 5:
        break
    print(f"Subject: {subject}")
    print(f"Predicate: {predicate}")
    print(f"Object: {obj}")
    print("-" * 40)

In [None]:
# =============================================================================
# SECTION 4: UPLOAD DATA TO JENA FUSEKI
# =============================================================================

In [None]:
# Prepare upload URL and data
upload_url = f"{FUSEKI_URL}/{DATASET_NAME}/data"
turtle_data = graph.serialize(format="turtle")

In [None]:
# Set up headers for turtle data
headers = {
    "Content-Type": "text/turtle"
}

In [3]:
# Set up authentication if needed
auth = None
if FUSEKI_USERNAME and FUSEKI_PASSWORD:
    auth = (FUSEKI_USERNAME, FUSEKI_PASSWORD)

In [None]:
# Upload the data
response = requests.post(
    upload_url,
    data=turtle_data,
    headers=headers,
    auth=auth
)

In [None]:
# Check upload result
print(f"üìã Upload status: {response.status_code}")
if response.status_code == 200:
    print("‚úÖ RDF data successfully uploaded to Jena Fuseki!")
else:
    print(f"‚ùå Upload failed: {response.text}")

In [None]:
# =============================================================================
# SECTION 5: QUERY DATA FROM FUSEKI AS JSON-LD
# =============================================================================

In [4]:
# Construct SPARQL query to get all data
sparql_query = """
CONSTRUCT { ?s ?p ?o }
WHERE { ?s ?p ?o }
"""

In [5]:
# Set up query parameters
params = {
    'query': sparql_query,
    'format': 'application/ld+json'  
}

In [6]:
# Build query URL
query_url = f"{FUSEKI_URL}/{DATASET_NAME}/sparql"

In [7]:
# Execute the query
response = requests.get(query_url, params=params, auth=auth)

print(f"üìã Query status: {response.status_code}")

üìã Query status: 200


In [8]:
# Process the response
if response.status_code == 200:
    jsonld_data = json.loads(response.text)
    
    print("‚úÖ Successfully retrieved JSON-LD data from Jena!")
    print(f"üìä Data type: {type(jsonld_data)}")
    
else:
    print(f"‚ùå Query failed: {response.status_code}")
    print(response.text)

‚úÖ Successfully retrieved JSON-LD data from Jena!
üìä Data type: <class 'dict'>


In [None]:
# =============================================================================
# SECTION 6: SPARQL QUERIES FOR ENTITY EXTRACTION
# =============================================================================

In [9]:
# Query for Higher Education Institutions
hei_sparql = """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?institution ?type
WHERE {
    ?institution rdf:type ?type .
    FILTER(CONTAINS(LCASE(STR(?type)), "institution") || 
           CONTAINS(LCASE(STR(?type)), "provider") ||
           CONTAINS(LCASE(STR(?type)), "organization"))
}
LIMIT 100
"""

In [10]:
# Query for Learning Opportunity Specifications (Courses)  
los_sparql = """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?course ?type
WHERE {
    ?course rdf:type ?type .
    FILTER(CONTAINS(LCASE(STR(?type)), "course") || 
           CONTAINS(LCASE(STR(?type)), "learning") ||
           CONTAINS(LCASE(STR(?type)), "opportunity") ||
           CONTAINS(LCASE(STR(?type)), "specification"))
}
LIMIT 100
"""

In [11]:
# Query for Learning Opportunity Instances
loi_sparql = """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?instance ?type
WHERE {
    ?instance rdf:type ?type .
    FILTER(CONTAINS(LCASE(STR(?type)), "instance") || 
           CONTAINS(LCASE(STR(?type)), "offering"))
}
LIMIT 100
"""

In [12]:
# Execute HEI query
try:
    hei_response = requests.get(query_url, params={'query': hei_sparql, 'format': 'application/sparql-results+json'}, auth=auth, timeout=15)
    hei_results = hei_response.json()['results']['bindings'] if hei_response.status_code == 200 else []
    print(f"‚úÖ HEI query: {len(hei_results)} results")
except Exception as e:
    print(f"‚ùå HEI query failed: {e}")
    hei_results = []

# Execute LOS query  
try:
    los_response = requests.get(query_url, params={'query': los_sparql, 'format': 'application/sparql-results+json'}, auth=auth, timeout=15)
    los_results = los_response.json()['results']['bindings'] if los_response.status_code == 200 else []
    print(f"‚úÖ LOS query: {len(los_results)} results")
except Exception as e:
    print(f"‚ùå LOS query failed: {e}")
    los_results = []

# Execute LOI query
try:
    loi_response = requests.get(query_url, params={'query': loi_sparql, 'format': 'application/sparql-results+json'}, auth=auth, timeout=15)
    loi_results = loi_response.json()['results']['bindings'] if loi_response.status_code == 200 else []
    print(f"‚úÖ LOI query: {len(loi_results)} results")
except Exception as e:
    print(f"‚ùå LOI query failed: {e}")
    loi_results = []


print(f"üìä Query results: {len(hei_results)} institutions, {len(los_results)} courses, {len(loi_results)} instances")

‚úÖ HEI query: 1 results
‚úÖ LOS query: 7 results
‚úÖ LOI query: 1 results
üìä Query results: 1 institutions, 7 courses, 1 instances


In [None]:
# =============================================================================
# SECTION 7: DATA TRANSFORMATION FOR MEILISEARCH
# =============================================================================

In [15]:
# Helper functions for data processing
def clean_id(uri):
    """Clean URI to make it Meilisearch-compatible ID"""
    clean = re.sub(r'[^a-zA-Z0-9\-_]', '_', uri)
    return re.sub(r'_+', '_', clean).strip('_')

def extract_value(binding, key):
    """Extract value from SPARQL binding result"""
    if key in binding:
        return binding[key]['value']
    return None

def extract_language_value(binding, key):
    """Extract value with language info from SPARQL binding"""
    if key in binding:
        value = binding[key]['value']
        lang = binding[key].get('xml:lang', 'en')
        return {'value': value, 'language': lang}
    return None

In [16]:
# Initialize document collections
hei_documents = []
los_documents = []
loi_documents = []

In [17]:
# Build Higher Education Institution documents
print("üèõÔ∏è Processing Higher Education Institutions...")
for result in hei_results:
    institution_uri = extract_value(result, 'institution')
    
    doc = {
        'id': clean_id(institution_uri),
        'original_uri': institution_uri,
        'type': 'HigherEducationInstitution',
        'entity_type': 'institution',
        'identifier_type': extract_value(result, 'identifierType'),
        'identifier_scheme': extract_value(result, 'identifierScheme'),
        'identifier_notation': extract_value(result, 'identifierNotation'),
    }
    hei_documents.append(doc)

print(f"üìä Built {len(hei_documents)} Higher Education Institution documents")

# Build Learning Opportunity Specification documents  
print("üìö Processing Learning Opportunity Specifications...")
for result in los_results:
    course_uri = extract_value(result, 'course')
    
    # Handle learning outcomes (concatenated with |)
    outcomes = []
    outcomes_str = extract_value(result, 'learningOutcomes')
    if outcomes_str:
        outcomes = [outcome.strip() for outcome in outcomes_str.split('|') if outcome.strip()]
    
    doc = {
        'id': clean_id(course_uri),
        'original_uri': course_uri,
        'type': 'LearningOpportunitySpecification',
        'entity_type': 'course',
        'title': extract_value(result, 'title'),
        'description': extract_value(result, 'description'),
        'language': extract_value(result, 'language'),
        'version': extract_value(result, 'version'),
        'is_active': extract_value(result, 'isActive'),
        'publisher': extract_value(result, 'publisher'),
        'isced_code': extract_value(result, 'iscedCode'),
        'learning_outcomes': outcomes,
        'learning_outcomes_count': len(outcomes)
    }
    los_documents.append(doc)

print(f"üìä Built {len(los_documents)} Learning Opportunity Specification documents")

# Build Learning Opportunity Instance documents
print("üéì Processing Learning Opportunity Instances...")
for result in loi_results:
    instance_uri = extract_value(result, 'instance')
    
    doc = {
        'id': clean_id(instance_uri),
        'original_uri': instance_uri,
        'type': 'LearningOpportunityInstance',
        'entity_type': 'course_instance',
        'title': extract_value(result, 'title'),
        'achievement_specification': extract_value(result, 'achievementSpec'),
        'default_language': extract_value(result, 'defaultLang'),
        'homepage': extract_value(result, 'homepage'),
        'provider': extract_value(result, 'provider'),
        'period_label': extract_value(result, 'periodLabel')
    }
    loi_documents.append(doc)

print(f"üìä Built {len(loi_documents)} Learning Opportunity Instance documents")

üèõÔ∏è Processing Higher Education Institutions...
üìä Built 1 Higher Education Institution documents
üìö Processing Learning Opportunity Specifications...
üìä Built 7 Learning Opportunity Specification documents
üéì Processing Learning Opportunity Instances...
üìä Built 1 Learning Opportunity Instance documents


In [18]:
# Combine all documents
all_documents = hei_documents + los_documents + loi_documents
print(f"\nüìã Total documents for Meilisearch: {len(all_documents)}")

# Display sample documents if available
if hei_documents:
    print(f"\nüìÑ Sample Higher Education Institution:")
    print(json.dumps(hei_documents[0], indent=2))

if los_documents:
    print(f"\nüìÑ Sample Learning Opportunity Specification:")
    print(json.dumps(los_documents[0], indent=2))

if loi_documents:
    print(f"\nüìÑ Sample Learning Opportunity Instance:")
    print(json.dumps(loi_documents[0], indent=2))


üìã Total documents for Meilisearch: 9

üìÑ Sample Higher Education Institution:
{
  "id": "http_data_quality-link_eu_examples_provider_AT0005",
  "original_uri": "http://data.quality-link.eu/examples/provider/AT0005",
  "type": "HigherEducationInstitution",
  "entity_type": "institution",
  "identifier_type": null,
  "identifier_scheme": null,
  "identifier_notation": null
}

üìÑ Sample Learning Opportunity Specification:
{
  "id": "urn_schac_courseId_uni-lj_si_123",
  "original_uri": "urn:schac:courseId:uni-lj.si:123",
  "type": "LearningOpportunitySpecification",
  "entity_type": "course",
  "title": null,
  "description": null,
  "language": null,
  "version": null,
  "is_active": null,
  "publisher": null,
  "isced_code": null,
  "learning_outcomes": [],
  "learning_outcomes_count": 0
}

üìÑ Sample Learning Opportunity Instance:
{
  "id": "http_data_quality-link_eu_examples_learning-opportunity_4711_sose2025",
  "original_uri": "http://data.quality-link.eu/examples/learning-o

In [None]:
# =============================================================================
# SECTION 8: CREATE MEILISEARCH INDEX
# =============================================================================


In [20]:
# Set up headers for Meilisearch requests
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {MEILISEARCH_API_KEY}"
}

In [21]:
# Create the index
create_url = f"{MEILISEARCH_URL}/indexes"
index_data = {
    "uid": INDEX_NAME,
    "primaryKey": "id"
}

print(f"üèóÔ∏è Creating index...")

üèóÔ∏è Creating index...


In [22]:
create_response = requests.post(create_url, headers=headers, json=index_data)
print(f"üìã Create index response: {create_response.status_code}")

if create_response.status_code not in [200, 201, 202]:
    print(f"‚ö†Ô∏è  Index might already exist or there was an error: {create_response.text}")

üìã Create index response: 202


In [23]:
# =============================================================================
# SECTION 9: CONFIGURE SEARCH SETTINGS
# =============================================================================

In [24]:
# Define search configuration
settings_url = f"{MEILISEARCH_URL}/indexes/{INDEX_NAME}/settings"

search_settings = {
    "searchableAttributes": [
        "title",
        "description", 
        "learning_outcomes",
        "identifier_notation",
        "entity_type",
        "type"
    ],
    "displayedAttributes": [
        "id",
        "type", 
        "entity_type",
        "title",
        "description",
        "learning_outcomes_count",
        "is_active",
        "original_uri"
    ],
    "filterableAttributes": [
        "type",
        "entity_type", 
        "is_active",
        "language"
    ]
}

In [25]:
# Apply the settings
settings_response = requests.patch(
    settings_url,
    headers=headers,
    json=search_settings
)
print(f"üìã Settings response: {settings_response.status_code}")

üìã Settings response: 202


In [26]:
if settings_response.status_code == 202:
    print("‚úÖ Search settings configured successfully!")
else:
    print(f"‚ö†Ô∏è  Settings configuration issue: {settings_response.text}")

‚úÖ Search settings configured successfully!


In [27]:
# =============================================================================
# SECTION 10: UPLOAD DOCUMENTS TO MEILISEARCH
# =============================================================================

In [28]:
# Skip upload if no documents
if len(all_documents) == 0:
    print("‚ö†Ô∏è  No documents to upload. Make sure to implement the SPARQL queries in Section 7.")
else:
    # Upload the documents
    upload_url = f"{MEILISEARCH_URL}/indexes/{INDEX_NAME}/documents"
    
    upload_response = requests.post(
        upload_url,
        headers=headers,
        json=all_documents
    )
    
    print(f"üìã Upload response: {upload_response.status_code}")
    
    if upload_response.status_code == 202:
        task_info = upload_response.json()
        task_uid = task_info['taskUid']
        print(f"‚úÖ Documents uploaded! Task UID: {task_uid}")
        
        # Monitor the indexing task
        print("‚è≥ Monitoring indexing progress...")
        task_url = f"{MEILISEARCH_URL}/tasks/{task_uid}"
        
        for i in range(15):  # Check up to 15 times
            response = requests.get(task_url, headers={"Authorization": f"Bearer {MEILISEARCH_API_KEY}"})
            
            if response.status_code == 200:
                task_data = response.json()
                status = task_data['status']
                print(f"üìã Task status: {status}")
                
                if status == 'succeeded':
                    print("üéâ SUCCESS! Documents indexed successfully!")
                    break
                elif status == 'failed':
                    print("‚ùå Indexing failed!")
                    print(f"Error: {task_data.get('error', {})}")
                    break
                else:
                    time.sleep(2)  # Wait 2 seconds before checking again
            else:
                print(f"‚ùå Error checking task: {response.status_code}")
                break
    else:
        print(f"‚ùå Upload failed: {upload_response.text}")


üìã Upload response: 202
‚úÖ Documents uploaded! Task UID: 25
‚è≥ Monitoring indexing progress...
üìã Task status: processing
üìã Task status: succeeded
üéâ SUCCESS! Documents indexed successfully!


In [29]:
# =============================================================================
# SECTION 11: VERIFY INDEX AND TEST SEARCH
# =============================================================================

In [30]:
# Get index statistics
stats_url = f"{MEILISEARCH_URL}/indexes/{INDEX_NAME}/stats"
stats_response = requests.get(stats_url, headers={"Authorization": f"Bearer {MEILISEARCH_API_KEY}"})

if stats_response.status_code == 200:
    stats = stats_response.json()
    print(f"üìä Index statistics:")
    print(f"   - Documents: {stats.get('numberOfDocuments', 0)}")
    print(f"   - Index size: {stats.get('databaseSize', 0)} bytes")
    print(f"   - Last update: {stats.get('updatedAt', 'N/A')}")
    
    # Get sample documents
    docs_url = f"{MEILISEARCH_URL}/indexes/{INDEX_NAME}/documents?limit=5"
    docs_response = requests.get(docs_url, headers={"Authorization": f"Bearer {MEILISEARCH_API_KEY}"})
    
    if docs_response.status_code == 200:
        docs_data = docs_response.json()
        print(f"\nüìÑ Sample indexed documents:")
        
        # Show document type breakdown
        entity_counts = {}
        for doc in docs_data.get('results', []):
            entity_type = doc.get('entity_type', 'unknown')
            entity_counts[entity_type] = entity_counts.get(entity_type, 0) + 1
        
        print("üìà Document type breakdown:")
        for entity_type, count in entity_counts.items():
            print(f"   - {entity_type}: {count}")

üìä Index statistics:
   - Documents: 8
   - Index size: 0 bytes
   - Last update: N/A

üìÑ Sample indexed documents:
üìà Document type breakdown:
   - institution: 1
   - course: 4


In [31]:
# =============================================================================
# SECTION 12: CREATE SEARCH API KEY
# =============================================================================

In [None]:
# Create a read-only API key for search operations
client = meilisearch.Client(MEILISEARCH_URL, MEILISEARCH_API_KEY)

try:
    search_key = client.create_key({
        'description': 'Read-only key for search operations',
        'actions': [
            'search',               
            'documents.get',         
            'indexes.get',           
            'settings.get',          
            'stats.get',             
            'tasks.get'              
        ],
        'indexes': ['*'],
        'expiresAt': None
    })
    
    print(f"‚úÖ Search API Key created!")
    print(f"üîë Key: {search_key.key}")
    print(f"üÜî Key UID: {search_key.uid}")
    print("‚ö†Ô∏è  Store this key securely - you'll need it for search operations!")
    
except Exception as e:
    print(f"‚ö†Ô∏è  Could not create API key: {e}")

In [None]:
# =============================================================================
# SECTION 13: CREATE BACKUP SNAPSHOT
# =============================================================================

In [None]:
try:
    snapshot_response = client.create_snapshot()
    print("‚úÖ Backup snapshot created successfully!")
    print("üìÅ Snapshot saved to Meilisearch data directory")
except Exception as e:
    print(f"‚ö†Ô∏è  Could not create snapshot: {e}")