## Authenticate and Request a token

In [1]:
from msal import PublicClientApplication
from dotenv import load_dotenv
import os
import requests
import time

# Load environment variables from .env file
load_dotenv()
tenant_id = os.getenv('TENANT_ID')
client_id = os.getenv('CLIENT_ID')
workspace_id = os.getenv('WORKSPACE_ID')
lakehouse_id = os.getenv('LAKEHOUSE_ID')
redirect_url_port = os.getenv('REDIRECT_URL_PORT')
api_version = os.getenv('API_VERSION')

app = PublicClientApplication(
   client_id,
   authority= f"https://login.microsoftonline.com/{tenant_id}",   
)

result = None

 # If no cached tokens or user interaction needed, acquire tokens interactively
if not result:
    result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"],
                                                port=f"{redirect_url_port}")

# Get the access token
if "access_token" in result:
    access_token = result["access_token"]
else:
    print(result.get("error"))

In [None]:
print(access_token)

## Request a Livy Session

In [3]:
if access_token:
   api_base_url_mist='https://api.fabric.microsoft.com/v1'
   livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/"+api_version+"/sessions"
   headers = {"Authorization": "Bearer " + access_token}

# Create a Livy session
create_livy_session = requests.post(livy_base_url, headers=headers, json={
  "name": "test pyspark session from python code",
  "archives": [],
  "conf": {    
  },
  "tags": {
  },
  "driverMemory": "7g",
  "driverCores": 1,
  "executorMemory": "7g",
  "executorCores": 1,
  "numExecutors": 2
})
print('The request to create the Livy session is submitted:' + str(create_livy_session.json()))

livy_session_id = create_livy_session.json()['id']
print(livy_session_id)


The request to create the Livy session is submitted:{'id': '462fe66a-9858-408c-813c-b848da3d9e60', 'artifactId': '0db8ae59-a739-4b50-844c-ee2be3519871'}
462fe66a-9858-408c-813c-b848da3d9e60


## List Livy Sessions

In [None]:
livy_session_url = livy_base_url
get_sessions_response = requests.get(livy_session_url, headers=headers)
print(get_sessions_response.json())

## Get details of a Livy Session

In [None]:
livy_session_url = livy_base_url + "/" + livy_session_id
get_session_response = requests.get(livy_session_url, headers=headers)

while get_session_response.json()["state"] != "idle":
    time.sleep(5)
    get_session_response = requests.get(livy_session_url, headers=headers)
    print(get_session_response.json())
    
print(get_session_response.json())

## Execute a statement on a Spark session - Local dataframe

In [None]:

# call get session API
livy_session_url = livy_base_url + "/" + livy_session_id
get_session_response = requests.get(livy_session_url, headers=headers)
print(get_session_response.json())
while get_session_response.json()["state"] != "idle":
    time.sleep(5)
    get_session_response = requests.get(livy_session_url, headers=headers)

execute_statement = livy_session_url + "/statements"
code ="""
df = spark.createDataFrame([{"id": 1, "name": "Mounir"}])
df.show()
"""
execute_statement_response = requests.post(execute_statement, headers=headers, json={
    "code": f"{code}",
    "kind": "pyspark"
    })
print('the statement code is submitted as: ' + str(execute_statement_response.json()))

statement_id = str(execute_statement_response.json()['id'])
get_statement = livy_session_url+ "/statements/" + statement_id
get_statement_response = requests.get(get_statement, headers=headers)

while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
    time.sleep(5)
    print('the statement code is submitted and running : ' + str(execute_statement_response.json()))

    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)

rst = get_statement_response.json()['output']['data']['text/plain']
print(rst)

## Execute a statement on a Spark session - Data on the LakeHouse

In [None]:
# call get session API
livy_session_url = livy_base_url + "/" + livy_session_id
get_session_response = requests.get(livy_session_url, headers=headers)
print(get_session_response.json())
while get_session_response.json()["state"] != "idle":
    time.sleep(5)
    get_session_response = requests.get(livy_session_url, headers=headers)

execute_statement = livy_session_url + "/statements"
code ="""
df = spark.sql("SELECT count(*) as Total, AGE FROM person GROUP BY AGE").show()
df.show()
"""
execute_statement_response = requests.post(execute_statement, headers=headers, json={
    "code": f"{code}",
    "kind": "pyspark"
    })
print('the statement code is submitted as: ' + str(execute_statement_response.json()))

statement_id = str(execute_statement_response.json()['id'])
get_statement = livy_session_url+ "/statements/" + statement_id
get_statement_response = requests.get(get_statement, headers=headers)

while get_statement_response.json()["state"] != "available":
    # Sleep for 5 seconds before making the next request
    time.sleep(5)
    print('the statement code is submitted and running : ' + str(execute_statement_response.json()))

    # Make the next request
    get_statement_response = requests.get(get_statement, headers=headers)

rst = get_statement_response.json()['output']['data']['text/plain']
print(rst)

## Stop and delete a Livy Session

In [9]:
livy_session_url = livy_base_url + "/" + livy_session_id

delete_session_response = requests.delete(livy_session_url, headers=headers)
print(delete_session_response)

<Response [200]>
