Code to map project structure

In [9]:
import os
import re

def get_project_info(folder_path, ignore_extensions=None, code_subfolders=None, output_modules=False):
    if ignore_extensions is None:
        ignore_extensions = []
    else:
        # Ensure all extensions start with a dot and are lowercase
        ignore_extensions = [
            ext.lower() if ext.startswith('.') else f'.{ext.lower()}' 
            for ext in ignore_extensions
        ]
    
    if code_subfolders is None:
        code_subfolders = []
    else:
        # Normalize subfolder paths (remove trailing slashes)
        code_subfolders = [os.path.normpath(subfolder) for subfolder in code_subfolders]
    
    project_structure = []
    file_details = []
    libraries = set()  # To collect unique library imports

    # Precompute absolute paths for code_subfolders for quick checks
    code_subfolders_abs = [
        os.path.normpath(os.path.join(folder_path, subfolder)) 
        for subfolder in code_subfolders
    ]

    # Walk through the directory
    for root, dirs, files in os.walk(folder_path):
        # Exclude the .git and __pycache__ folders from the walk
        if '.git' in dirs:
            dirs.remove('.git')
        if '__pycache__' in dirs:
            dirs.remove('__pycache__')

        # Compute the relative path from the folder_path
        relative_root = os.path.relpath(root, folder_path)
        if relative_root == ".":
            relative_root = ""

        # Add directories to the project structure
        for dir_name in dirs:
            if '__pycache__' not in dir_name:  # Skip __pycache__ folders
                dir_path = os.path.join(relative_root, dir_name)
                project_structure.append(os.path.join(dir_path, ""))  # Adding a trailing slash for directories

        # Add files to the project structure
        for file_name in files:
            # Skip .pyc files and other ignored extensions
            if '__pycache__' in file_name or file_name.endswith('.pyc'):
                continue

            file_path = os.path.join(relative_root, file_name)
            project_structure.append(file_path)

            # Determine the file extension
            _, file_ext = os.path.splitext(file_name.lower())

            # Determine if the current file is inside any of the specified code_subfolders
            # A file is inside a code_subfolder if its absolute path starts with any of the code_subfolders' absolute paths
            full_file_path = os.path.normpath(os.path.join(root, file_name))
            include_content = False
            for subfolder_abs in code_subfolders_abs:
                # Ensure the subfolder path is a prefix of the file path
                # Add a separator to prevent partial matching (e.g., 'src' vs 'src_utils')
                if full_file_path.startswith(subfolder_abs + os.sep):
                    include_content = True
                    break
                elif full_file_path == subfolder_abs:
                    include_content = True
                    break

            # Include file details only if it's in specified subfolders and not ignored
            if include_content and file_ext not in ignore_extensions:
                try:
                    with open(full_file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                except (UnicodeDecodeError, PermissionError):
                    content = "<Unable to read file contents>"

                relative_file_path = os.path.relpath(full_file_path, folder_path)
                file_info = f"""File_Name: {file_name}
File_Path: {relative_file_path}
Code_contents:
{content}
----------------
"""
                file_details.append(file_info)

                # If output_modules is True, extract libraries from imports
                if output_modules:
                    imports = re.findall(r'^\s*(?:from|import)\s+([\w\.]+)', content, re.MULTILINE)
                    libraries.update(imports)

    # Create the project structure string
    structure_str = "Project Structure:\n" + "\n".join(project_structure) + "\n\n"

    # Create the file details string
    if file_details:
        details_str = "File Details:\n" + "\n".join(file_details)
    else:
        details_str = "File Details:\n<No files to display>\n----------------"

    # Combine everything into the final output
    final_output = structure_str + details_str

    # If output_modules is True, add library list for requirements.txt format
    if output_modules:
        modules_list = "\n".join(sorted(libraries))
        final_output += f"\n\nRequired Libraries:\n{modules_list}\n"

    return final_output

# Example usage:
if __name__ == "__main__":
    folder = "./"  # Replace with your project path
    ignore = ['.png', '.jpg', '.md', '.pic', '.pyc', 'pyc', '.json', 'json']  # List of file extensions to ignore
    subfolders = ['backend/', 'utils/']  # List of subfolders to include code contents from
    project_info = get_project_info(folder, ignore_extensions=ignore, code_subfolders=subfolders, output_modules=True)
    print(project_info)


Project Structure:
backend\
frontend\
jre\
utils\
.gitignore
.gitlab-ci.yml
coordinates.py
docker-compose.yml
ISSUES_UPLOAD_FILE.xlsx
LICENCE.txt
map.ipynb
producer.ipynb
README.md
spark.ipynb
test.ipynb
wait-for-it.sh
backend\api\
backend\config\
backend\data\
backend\models\
backend\simulator-data\
backend\tests\
backend\app.py
backend\Dockerfile
backend\requirements.txt
backend\api\get_historical_stats.py
backend\api\get_live_stats.py
backend\api\kafka_topics.py
backend\api\register_device.py
backend\api\send_stream.py
backend\config\config.py
backend\data\device_schemas\
backend\data\historical\
backend\data\device_schemas\gps_1.json
backend\data\device_schemas\gps_10.json
backend\data\device_schemas\gps_11.json
backend\data\device_schemas\gps_2.json
backend\data\device_schemas\gps_3.json
backend\data\device_schemas\gps_4.json
backend\data\device_schemas\gps_5.json
backend\data\device_schemas\gps_6.json
backend\data\device_schemas\gps_7.json
backend\data\device_schemas\gps_8.json
b

Code to register device

In [1]:
import requests

# Define the URL of the FastAPI endpoint
url = "http://127.0.0.1:8000/register-device"

# Define the payloads for each sensor
devices = [
    {
        "device_name": "gps_2",
        "schema": {
            "latitude": "float",
            "longitude": "float",
            "timestamp": "string"
        }
    },
    {
        "device_name": "accel_2",
        "schema": {
            "acceleration_x": "float",
            "acceleration_y": "float",
            "acceleration_z": "float",
            "timestamp": "string"
        }
    },
    {
        "device_name": "wind_2",
        "schema": {
            "speed": "float",
            "direction": "float",
            "timestamp": "string"
        }
    },
    {
        "device_name": "player_temperature_2",
        "schema": {
            "temperature": "float",
            "timestamp": "string"
        }
    },
    {
        "device_name": "player_heart_rate_2",
        "schema": {
            "heart_rate": "int",
            "timestamp": "string"
        }
    }
]

# Loop over the devices and send a POST request for each one
for device in devices:
    response = requests.post(url, json=device)
    
    # Print the status code and response for each device
    print(f"Registering device: {device['device_name']}")
    print(f"Status Code: {response.status_code}")
    print(f"Response JSON: {response.json()}\n")

# URL for getting all devices
url = "http://127.0.0.1:8000/device/gps_1"

# Send GET request to the /devices endpoint
response = requests.get(url)

# Print the status code and the response
print(f"Status Code: {response.status_code}")
print(f"Response JSON: {response.json()}")



Registering device: gps_2
Status Code: 200
Response JSON: {'message': "Device 'gps_2' already exists with the same schema."}

Registering device: accel_2
Status Code: 200
Response JSON: {'message': 'Schema for accel_2 saved successfully!'}

Registering device: wind_2
Status Code: 200
Response JSON: {'message': 'Schema for wind_2 saved successfully!'}

Registering device: player_temperature_2
Status Code: 200
Response JSON: {'message': "Device 'player_temperature_2' already exists with the same schema."}

Registering device: player_heart_rate_2
Status Code: 200
Response JSON: {'message': "Device 'player_heart_rate_2' already exists with the same schema."}

Status Code: 200
Response JSON: {'device_name': 'gps_1', 'schema': {'device_name': 'gps_1', 'schema': {'latitude': 'float', 'longitude': 'float', 'timestamp': 'string'}}}


Start Zookeeper:

bash

zookeeper-server-start.bat C:\kafka\kafka_2.12-3.8.0\config\zookeeper.properties


Start the Kafka broker:

bash

kafka-server-start.bat C:\kafka\kafka_2.12-3.8.0\config\server.properties

Code to ggregation LIVE

In [5]:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# Parameters: Define which sensors to use and the number of devices per sensor
USE_SENSORS = {"heart_rate": True, "temperature": False}  # Set to True to enable a sensor type
NUM_DEVICES = 1 # Number of devices to test for each sensor
RUNS = ["run_001"]  # List of runs to test
AGGREGATION_TYPE = "average"  # Aggregation type to use for the requests

# Define the base FastAPI endpoint template
url_template = "http://localhost:8000/get-stats/{device_id}/{run_id}"

# Function to make the GET request and process the response
def check_run(device_id, run_id, agg_type):
    # Format the URL and add the aggregation type as a query parameter
    url = url_template.format(device_id=device_id, run_id=run_id) + f"?agg_type={agg_type}"
    try:
        # Make the GET request to the FastAPI endpoint
        response = requests.get(url)
        
        # Check if the request was successful
        if response.status_code == 200:
            # Print the response from the server
            return f"Success for {device_id}/{run_id} with {agg_type}: {response.json()}"
        else:
            return f"Failed for {device_id}/{run_id} with {agg_type}, status code: {response.status_code}, {response.text}"
    except Exception as e:
        return f"An error occurred for {device_id}/{run_id} with {agg_type}: {e}"

# Main function to handle the request
def main():
    # Dynamically generate the list of devices based on enabled sensors
    devices = {}
    if USE_SENSORS["heart_rate"]:
        devices["heart_rate"] = [f"player_heart_rate_{i}" for i in range(1, NUM_DEVICES + 1)]
    if USE_SENSORS["temperature"]:
        devices["temperature"] = [f"player_temperature_{i}" for i in range(1, NUM_DEVICES + 1)]

    # Use ThreadPoolExecutor to run requests concurrently
    with ThreadPoolExecutor(max_workers=10) as executor:
        # Submit tasks to the executor for each device and run combination
        futures = {
            executor.submit(check_run, device, run, AGGREGATION_TYPE): (device, run)
            for sensor_type in devices
            for device in devices[sensor_type]
            for run in RUNS
        }
        
        # Process the results as they complete
        for future in as_completed(futures):
            try:
                result = future.result()
                print(result)
            except Exception as exc:
                device, run = futures[future]
                print(f"An error occurred for {device}/{run} with {AGGREGATION_TYPE}: {exc}")

# Run the main function
if __name__ == "__main__":
    main()


Success for player_heart_rate_1/run_001 with average: {'device_id': 'player_heart_rate_1', 'aggregation': 'average', 'average_heart_rate': 105.98305084745763}


Code to get instantaneous latest Kafka read

In [2]:
import requests
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

# Define the sensors and runs you want to check
sensor = "player_heart_rate_1"
runs = ["run_001"]
limit = 1 # Fetch the latest messages with this limit
base_url = "http://127.0.0.1:8000/get-topic-messages/{device_id}/{run_id}?limit={limit}"

# Function to make the request and process the response
def fetch_kafka_messages(device_id, run_id, limit):
    url = base_url.format(device_id=device_id, run_id=run_id, limit=limit)
    try:
        # Make the request to the FastAPI endpoint
        response = requests.get(url)
        # Check the status of the response
        if response.status_code == 200:
            response_json = response.json()
            print(f"Response for {device_id}/{run_id}:", json.dumps(response_json, indent=4))
            # Get the actual messages returned (if any)
            messages = response_json.get('messages', [])
            if messages:
                print(f"Latest Message for {device_id}/{run_id}:", json.dumps(messages[0], indent=4))
            else:
                print(f"No messages returned for {device_id}/{run_id}.")
        else:
            print(f"Error for {device_id}/{run_id}: {response.status_code}, {response.text}")
    except Exception as e:
        print(f"An error occurred for {device_id}/{run_id}: {e}")

# Main function to run the requests concurrently
def main():
    # Use ThreadPoolExecutor to run requests concurrently
    with ThreadPoolExecutor(max_workers=5) as executor:
        # Submit tasks to the executor for each sensor and run combination
        futures = [executor.submit(fetch_kafka_messages, sensor, run, limit) for run in runs]
        
        # Process the results as they complete
        for future in as_completed(futures):
            try:
                future.result()  # This will raise any exception from the fetch_kafka_messages function
            except Exception as exc:
                print(f"An error occurred: {exc}")

# Run the main function
if __name__ == "__main__":
    main()

Response for player_heart_rate_1/run_001: {
    "device_id": "player_heart_rate_1",
    "run_id": "run_001",
    "messages": [
        {
            "heart_rate": 88,
            "timestamp": "2024-12-06T17:50:29.327413"
        }
    ]
}
Latest Message for player_heart_rate_1/run_001: {
    "heart_rate": 88,
    "timestamp": "2024-12-06T17:50:29.327413"
}


Code to start a stream and sibscribe to alarms

In [None]:
import aiohttp
import asyncio
from typing import Union

# Parameters: Define which sensors to use and the number of devices per sensor
USE_SENSORS = {"heart_rate": True, "temperature": False}  # Set to True to enable a sensor type
NUM_DEVICES = 1  # Number of devices to test for each sensor

# Define the base FastAPI endpoint templates
start_stream_url_template = "http://localhost:8000/start-stream/{device_id}/{run_id}"
get_notification_url_template = "http://localhost:8000/get-notification/{device_id}/{run_id}"

# Example triggers for heart rate and temperature
triggers = {
    "heart_rate": [70, 140],  # Min and max threshold for heart rate
    "temperature": [36.5, 38.0],  # Min and max threshold for temperature
}

# Asynchronous function to start the stream for a device and run
async def start_stream(device_id, run_id, triggers, table_preappend=None):
    url = start_stream_url_template.format(device_id=device_id, run_id=run_id)
    async with aiohttp.ClientSession() as session:
        try:
            # Make the POST request to the start-stream endpoint
            payload = {
                "triggers": triggers,
                "window_seconds": 1,
                "table_preappend": table_preappend or "threshold",
                "exclude_normal": True
            }
            async with session.post(url, json=payload) as response:
                if response.status == 200:
                    print(f"Stream started successfully for {device_id}/{run_id}")
                else:
                    text = await response.text()
                    print(f"Failed to start stream for {device_id}/{run_id} with status code: {response.status}, {text}")
        except Exception as e:
            print(f"An error occurred while starting the stream for {device_id}/{run_id}: {e}")

# Asynchronous function to subscribe to the notification endpoint for a device and run
async def subscribe_to_notifications(device_id, run_id, table_preappend: Union[str, None] = None):
    url = get_notification_url_template.format(device_id=device_id, run_id=run_id)
    async with aiohttp.ClientSession() as session:
        try:
            params = {"table_preappend": table_preappend} if table_preappend else {}
            async with session.get(url, params=params) as response:
                if response.status == 200:
                    print(f"Subscribed successfully to notifications for {device_id}/{run_id}")
                    async for line in response.content:
                        decoded_line = line.decode('utf-8').strip()
                        
                        # Only print meaningful updates
                        if decoded_line.startswith("data:"):
                            print(f"Update for {device_id}/{run_id}: {decoded_line}")
                else:
                    text = await response.text()
                    print(f"Failed to subscribe to notifications for {device_id}/{run_id} with status code: {response.status}, {text}")
        except Exception as e:
            print(f"An error occurred while subscribing to notifications for {device_id}/{run_id}: {e}")


# Main asynchronous function to handle the test request for streaming notifications
async def main_streaming_test():
    # Define the run_id to be used for all tests
    run_id = "run_001"
    
    # Step 1: Start the streams for enabled sensor types
    start_tasks = []
    if USE_SENSORS["heart_rate"]:
        start_tasks.extend(
            start_stream(f"player_heart_rate_{i}", run_id, {"heart_rate": triggers["heart_rate"]}, table_preappend="threshold")
            for i in range(1, NUM_DEVICES + 1)
        )
    if USE_SENSORS["temperature"]:
        start_tasks.extend(
            start_stream(f"player_temperature_{i}", run_id, {"temperature": triggers["temperature"]}, table_preappend="threshold")
            for i in range(1, NUM_DEVICES + 1)
        )
    await asyncio.gather(*start_tasks)

    # Step 2: Wait for 5 seconds before subscribing to notifications
    await asyncio.sleep(5)

    # Step 3: Subscribe to notifications for enabled sensor types
    subscribe_tasks = []
    if USE_SENSORS["heart_rate"]:
        subscribe_tasks.extend(
            subscribe_to_notifications(f"player_heart_rate_{i}", run_id, table_preappend="threshold")
            for i in range(1, NUM_DEVICES + 1)
        )
    if USE_SENSORS["temperature"]:
        subscribe_tasks.extend(
            subscribe_to_notifications(f"player_temperature_{i}", run_id, table_preappend="threshold")
            for i in range(1, NUM_DEVICES + 1)
        )
    await asyncio.gather(*subscribe_tasks)

# Run the main_streaming_test function
await main_streaming_test()


Stream started successfully for player_heart_rate_1/run_001
Subscribed successfully to notifications for player_heart_rate_1/run_001
Update for player_heart_rate_1/run_001: data: {'device_id': 'player_heart_rate_1_run_001', 'updates': [{'heart_rate': 175, 'timestamp': datetime.datetime(2024, 12, 6, 18, 40, 31, 610168), 'heart_rate_status': 'high'}]}
Update for player_heart_rate_1/run_001: data: {'device_id': 'player_heart_rate_1_run_001', 'updates': [{'heart_rate': 184, 'timestamp': datetime.datetime(2024, 12, 6, 18, 41, 22, 284917), 'heart_rate_status': 'high'}]}


CancelledError: 

Code to get current speed

In [13]:
import requests

def stop_stream(device_id, run_id):
    url = f"http://localhost:8000/stop-stream/{device_id}/{run_id}"
    try:
        # Make the POST request to the stop-stream endpoint
        response = requests.post(url)
        if response.status_code == 200:
            print(f"Stream stopped successfully for {device_id}/{run_id}")
        else:
            text = response.text
            print(f"Failed to stop stream for {device_id}/{run_id} with status code: {response.status_code}, {text}")
    except Exception as e:
        print(f"An error occurred while stopping the stream for {device_id}/{run_id}: {e}")

stop_stream("player_heart_rate_1", "run_001")

Stream stopped successfully for player_heart_rate_1/run_001


In [5]:
import aiohttp
import asyncio

# Define the FastAPI endpoint template for the get-speed endpoint
get_speed_url_template = "http://localhost:8000/get-speed/{device_id}/{run_id}?type={type}"


# Asynchronous function to test the get-speed endpoint
async def test_get_speed(device_id, run_id, metric_type):
    url = get_speed_url_template.format(device_id=device_id, run_id=run_id, type=metric_type)
    async with aiohttp.ClientSession() as session:
        try:
            # Make the GET request to the get-speed endpoint
            async with session.get(url) as response:
                if response.status == 200:
                    data = await response.json()
                    # Validate that the response contains the expected fields
                    if "device_id" in data and "run_id" in data and metric_type in data:
                        print(f"Success for {device_id}/{run_id}: {data}")
                    else:
                        print(f"Unexpected response structure for {device_id}/{run_id}: {data}")
                else:
                    text = await response.text()
                    print(f"Failed for {device_id}/{run_id} with status code: {response.status}, {text}")
        except Exception as e:
            print(f"An error occurred for {device_id}/{run_id}: {e}")

# Main asynchronous function to handle the test requests for multiple device and run combinations
async def main_get_speed_test():
    # Define a single run_id and generate test cases for 10 devices
    run_id = "run_001"
    metric_type = "acceleration"  # Or "acceleration"
    test_cases = [{"device_id": f"gps_{i}", "run_id": run_id} for i in range(1, 2)]

    # Create tasks to test each device_id and run_id pair concurrently
    test_tasks = [test_get_speed(case["device_id"], case["run_id"], metric_type) for case in test_cases]
    
    # Run all tasks concurrently
    await asyncio.gather(*test_tasks)

# Run the main_get_speed_test function
await main_get_speed_test()


Success for gps_1/run_001: {'device_id': 'gps_1', 'run_id': 'run_001', 'acceleration': 2.4514464937988407}


Code to Clean up Topics

In [None]:
import requests

BASE_URL = "http://127.0.0.1:8000"

def test_list_topics():
    """
    Test the /list-topics endpoint to ensure it returns a list of Kafka topics.
    """
    response = requests.get(f"{BASE_URL}/list-topics")
    if response.status_code == 200:
        print("List Topics: Success")
        print("Response:", response.json())
    else:
        print("List Topics: Failed")
        print("Error:", response.json())

def test_delete_topic_by_device_name(device_id, run_id):
    """
    Test the /delete-topic/{device_id}/{run_id} endpoint to delete a specific Kafka topic.
    """
    response = requests.delete(f"{BASE_URL}/delete-topic/{device_id}/{run_id}")
    if response.status_code == 200:
        print(f"Delete Topic {device_id}.{run_id}: Success")
        print("Response:", response.json())
    else:
        print(f"Delete Topic {device_id}.{run_id}: Failed")
        print("Error:", response.json())

def test_delete_all_topics():
    """
    Test the /delete-all-topics endpoint to delete all Kafka topics.
    """
    response = requests.delete(f"{BASE_URL}/delete-all-topics")
    if response.status_code == 200:
        print("Delete All Topics: Success")
        print("Response:", response.json())
    else:
        print("Delete All Topics: Failed")
        print("Error:", response.json())

if __name__ == "__main__":
    # Run tests
    print("Testing /list-topics endpoint:")
    test_list_topics()
    
    # print("\nTesting /delete-topic/{device_id}/{run_id} endpoint:")
    # # Substitute 'test_device' and 'test_run' with actual values
    # test_delete_topic_by_device_name("test_device", "test_run")
    
    # print("\nTesting /delete-all-topics endpoint:")
    # test_delete_all_topics()


Testing /list-topics endpoint:
List Topics: Success
Response: {'message': 'Topics retrieved successfully', 'topics': []}


Code to delete all historical data

In [25]:
import requests

BASE_URL = "http://127.0.0.1:8000"

def test_delete_all_historical_data():
    """
    Test the /delete-all endpoint to ensure it deletes all historical data files.
    """
    response = requests.delete(f"{BASE_URL}/delete-all")
    
    if response.status_code == 200:
        print("Delete All Historical Data: Success")
        print("Response:", response.json())
    else:
        print("Delete All Historical Data: Failed")
        print("Error:", response.json())

if __name__ == "__main__":
    print("Testing /delete-all endpoint:")
    test_delete_all_historical_data()


Testing /delete-all endpoint:
Delete All Historical Data: Success
Response: {'message': 'All historical data files and folders deleted successfully', 'deleted_items_count': 2}


In [3]:
import requests

# Define the URL of the endpoint
url = "http://localhost:8000/team-statistics"

# Send the GET request
try:
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        # Print the response data (the statistics)
        print("Team Statistics Data:", response.json())
    else:
        print(f"Failed to fetch data. Status code: {response.status_code}")
        print("Error:", response.text)

except requests.exceptions.RequestException as e:
    # Catch any exception raised during the request (e.g., connection errors)
    print(f"An error occurred: {e}")


Team Statistics Data: {'Total Distance Per Game': {'run_001': 47.0, 'run_002': 44.37, 'run_003': 37.76, 'run_004': 46.11, 'run_005': 46.29, 'run_006': 41.02, 'run_007': 34.35, 'run_008': 41.91, 'run_009': 37.81, 'run_010': 47.76, 'run_011': 40.45, 'run_012': 39.09, 'run_013': 36.76, 'run_014': 52.88, 'run_015': 45.11}, 'Average Speed Per Game': 9.8, 'Max Speed Per Game': 13.88}


In [3]:
from pyspark.sql import SparkSession

# Stop any existing SparkSession
spark = SparkSession.getActiveSession()
if spark is not None:
    spark.stop()

spark = SparkSession.builder \
    .appName("TestApp") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .config("spark.local.dir", "/tmp/spark-temp") \
    .config("spark.driver.host", "0.0.0.0") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","10g") \
    .getOrCreate()


In [7]:
print("Master:", spark.sparkContext.master)

Master: spark://spark-master:7077


In [8]:
spark.sparkContext

In [6]:
data = [("John", 30), ("Jane", 25), ("Mike", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

Py4JJavaError: An error occurred while calling o82.sessionState.
: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:115)
	at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [4]:
df.show()

Py4JJavaError: An error occurred while calling o55.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (DESKTOP-5UBV34D executor driver): java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:271)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:271)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
