In [2]:
from http.server import BaseHTTPRequestHandler, HTTPServer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
from glob import glob
import json

class HTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/most_affected_country':
            # Create a SparkSession
            spark = SparkSession.builder \
                .appName("Most Affected Country") \
                .getOrCreate()

            # Define the directory containing the CSV files
            directory = "/Users/meghasingh/Desktop/Spark assignment/"

            # Find all CSV files in the directory
            file_paths = glob(os.path.join(directory, "*.csv"))

            # Read each CSV file into a DataFrame, calculate the death/case ratio, and append to a list
            dfs = []
            for file_path in file_paths:
                df = spark.read.csv(file_path, header=True, inferSchema=True)
                df = df.withColumn("Death_Case_Ratio", col("deaths") / col("cases"))
                dfs.append(df)

            # Merge all the DataFrames
            merged_df = dfs[0]
            for df in dfs[1:]:
                merged_df = merged_df.union(df)

            # Find the country with the highest death/case ratio (most affected)
            most_affected_country = merged_df.orderBy(col("Death_Case_Ratio").desc()).select("country").first()[0]

            # Stop the SparkSession
            spark.stop()

            # Send response
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            response = {'most_affected_country': most_affected_country}
            self.wfile.write(json.dumps(response).encode())

        else:
            # Send 404 response for other paths
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b'404 Not Found')

def run_server(port=8000):
    server_address = ('', port)
    httpd = HTTPServer(server_address, HTTPRequestHandler)
    print(f'Starting server on port {port}...')
    httpd.serve_forever()

if __name__ == '__main__':
    run_server()

Starting server on port 8000...


24/03/29 11:50:12 WARN Utils: Your hostname, Megha-ka-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.29.171.44 instead (on interface en0)
24/03/29 11:50:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/29 11:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/29 11:50:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
127.0.0.1 - - [29/Mar/2024 11:50:18] "GET /most_affected_country HTTP/1.1" 200 -
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 60916)
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socketserver.py", line 316, in _handle_