In [13]:
import requests
import json
import time

In [14]:
livy_url = "http://livy-server:8998"

In [15]:
jars = [
    "https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.2/mongo-spark-connector_2.12-3.0.2.jar",
    "https://repo1.maven.org/maven2/org/mongodb/bson/4.6.1/bson-4.6.1.jar",
    "https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-core/4.6.1/mongodb-driver-core-4.6.1.jar",
    "https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.6.1/mongodb-driver-sync-4.6.1.jar"
]

In [16]:
# Creating a PySpark session with all required JARs
session_data = {
    "kind": "pyspark",  # Use "pyspark" for Python sessions
    "jars": jars,  # List all JAR URLs
    "conf": {
        "spark.app.name": "MongoDB-PySpark-Session",
        "spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2"
    }
}

In [17]:
# Creating the session
response = requests.post(
    f"{livy_url}/sessions",
    headers={"Content-Type": "application/json"},
    data=json.dumps(session_data)
)

In [18]:
if response.status_code == 201:
    session_info = response.json()
    session_id = session_info["id"]
    print(f"‚úÖ Session created with ID: {session_id}")
    
    # Poll until session is ready
    max_attempts = 30
    for attempt in range(max_attempts):
        time.sleep(2)
        status_response = requests.get(f"{livy_url}/sessions/{session_id}")
        
        if status_response.status_code == 200:
            session_state = status_response.json()["state"]
            print(f"Attempt {attempt+1}: Session state = {session_state}")
            
            if session_state in ["idle", "busy"]:
                print(f"\nüéâ Session {session_id} is ready!")
                print(f"Use this in your notebook: %%spark -s {session_id}")
                break
            elif session_state == "error":
                print("‚ùå Session failed to start")
                print("Session logs:", status_response.json().get("log", []))
                break
        else:
            print(f"‚ùå Failed to get session status: {status_response.status_code}")
    
    if attempt == max_attempts - 1:
        print("‚ö†Ô∏è Session creation timeout")
else:
    print(f"‚ùå Error creating session: {response.status_code}")
    print("Response:", response.text)

‚úÖ Session created with ID: 1
Attempt 1: Session state = starting
Attempt 2: Session state = starting
Attempt 3: Session state = starting
Attempt 4: Session state = starting
Attempt 5: Session state = starting
Attempt 6: Session state = starting
Attempt 7: Session state = starting
Attempt 8: Session state = starting
Attempt 9: Session state = starting
Attempt 10: Session state = idle

üéâ Session 1 is ready!
Use this in your notebook: %%spark -s 1


In [22]:
# Test MongoDB connection
code = '''
from pyspark.sql import SparkSession

try:
    df = spark.read.format("mongo") \\
        .option("uri", "mongodb://admin:password123@mongodb:27017/stroke_prediction_test_connection.patients?authSource=admin") \\
        .load()
    
    print("‚úÖ SUCCESS: Connected to MongoDB!")
    print(f"DataFrame schema: {df.schema}")
    print(f"Row count: {df.count()}")
    
    # Show data
    df.show(5, truncate=False)
    
    # Return first 5 rows as JSON for debugging
    result = df.limit(5).toJSON().collect()
    for row in result:
        print(row)
        
except Exception as e:
    print(f"‚ùå ERROR: {str(e)}")
    import traceback
    traceback.print_exc()
'''

# Submit code to Livy
response = requests.post(
    f"{livy_url}/sessions/{session_id}/statements",
    headers={"Content-Type": "application/json"},
    data=json.dumps({
        "code": code,
        "kind": "pyspark"
    })
)


In [23]:
# Submit code to Livy
response = requests.post(
    f"{livy_url}/sessions/{session_id}/statements",
    headers={"Content-Type": "application/json"},
    data=json.dumps({
        "code": code,
        "kind": "pyspark"
    })
)

In [24]:
if response.status_code == 201:
    statement = response.json()
    statement_id = statement['id']
    print(f"‚úÖ Code submitted. Statement ID: {statement_id}")
    
    # Wait for completion and get output
    for _ in range(10):
        time.sleep(2)
        result_resp = requests.get(f"{livy_url}/sessions/{session_id}/statements/{statement_id}")
        if result_resp.status_code == 200:
            result = result_resp.json()
            state = result.get('state')
            print(f"Statement state: {state}")
            
            if state in ['available', 'finished']:
                if 'output' in result and result['output']:
                    output = result['output'].get('data', {})
                    if 'text/plain' in output:
                        print("\n" + "="*50)
                        print("OUTPUT:")
                        print("="*50)
                        print(output['text/plain'])
                break
            elif state in ['error', 'cancelled']:
                print(f"‚ùå Statement failed: {state}")
                break
else:
    print(f"‚ùå Failed to submit code: {response.status_code}")
    print(response.text)

‚úÖ Code submitted. Statement ID: 3
Statement state: available
