In [None]:
#%pip install gradio

Collecting gradio
  Downloading gradio-5.23.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.8.0 (from gradio)
  Downloading gradio_client-1.8.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting httpx>=0.24.1 (from gradio)
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting huggingface-hub>=0.28.1 (from gradio)
  Downloading huggingface_hub-0.29.3-py3-none-any.whl.metadata (13 kB)
Collecting orjson~=3.0 (from gradio)
  Downloading orjson-3.10.16-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (41 kB)
[2K     [90m━━━━━━━━━━━━━━

In [None]:
import gradio as gr
import os
from nClient import (
    create_socket, 
    connect_socket, 
    send_request,
    handle_response_content,
    save_response_content
)

def client_request(host, port, method, path, save_file, custom_headers, body_type, manual_body, uploaded_file):
    """Handle HTTP request via Gradio interface"""
    try:
        # Process host and port
        host = host.strip() if host.strip() else "localhost"
        try:
            port = int(port)
        except:
            port = 8080

        # Clean path
        if not path.startswith("/"):
            path = "/" + path

        # Process custom headers
        headers_list = []
        if custom_headers:
            headers_list = [h.strip() for h in custom_headers.splitlines() if h.strip()]

        # Detect if we expect binary response (for GET)
        is_binary = False
        if method == "GET":
            ext = path.split('.')[-1].lower() if '.' in path else ''
            is_binary = ext in ['png', 'jpg', 'jpeg', 'gif', 'mp3', 'wav', 'mp4', 'avi']

        # Handle body for POST/PUT
        body = None
        content_type = 'text/plain'

        if method in ["POST", "PUT"] and body_type == "Upload file":
            if not uploaded_file:
                return "Error: No file uploaded"
                
            # 'uploaded_file' es la ruta del fichero subido (gr.File con type="filepath")
            ext = os.path.splitext(uploaded_file)[1][1:].lower()
            is_binary = ext in ['png', 'jpg', 'jpeg', 'gif', 'mp3', 'wav', 'mp4', 'avi']
            
            if is_binary:
                content_type = {
                    'jpg': 'image/jpeg',
                    'jpeg': 'image/jpeg',
                    'png': 'image/png',
                    'gif': 'image/gif',
                    'mp3': 'audio/mpeg',
                    'wav': 'audio/wav'
                }.get(ext, 'application/octet-stream')
            
            mode = 'rb' if is_binary else 'r'
            with open(uploaded_file, mode) as f:
                body = f.read()
        elif method in ["POST", "PUT"]:
            body = manual_body
            content_type = 'text/plain'
            is_binary = False

        # Construir la petición: armamos las cabeceras y convertimos a bytes de forma consistente
        request_parts = []
        request_parts.append(f"{method} {path} HTTP/1.1")
        request_parts.append(f"Host: {host}")
        
        if body is not None:
            request_parts.append(f"Content-Type: {content_type}")
            # Calcular el Content-Length en bytes
            if isinstance(body, bytes):
                request_parts.append(f"Content-Length: {len(body)}")
            else:
                body_bytes = body.encode('utf-8')
                request_parts.append(f"Content-Length: {len(body_bytes)}")
        
        request_parts.extend(headers_list)
        request_parts.append("")  # Línea en blanco para separar cabeceras de cuerpo
        
        # Convertimos las cabeceras a bytes
        headers_bytes = "\r\n".join(request_parts).encode('utf-8')
        
        # Construir la petición completa (cabeceras + CRLF + cuerpo)
        if body is not None:
            if isinstance(body, bytes):
                request = headers_bytes + b"\r\n" + body
            else:
                request = headers_bytes + b"\r\n" + body.encode('utf-8')
        else:
            request = headers_bytes + b"\r\n"

        # Enviar la petición
        sock = create_socket()
        connect_socket(sock, host, port)
        response = send_request(sock, request, is_binary=is_binary)
        sock.close()

        if response:
            # Para GET, si se selecciona guardar el archivo
            if method == "GET" and save_file:
                headers, content_type, content = handle_response_content(response, is_binary)
                if headers and content:
                    is_binary_content = content_type and any(t in content_type.lower() 
                        for t in ['image/', 'audio/', 'video/', 'application/octet-stream'])
                    save_filename = os.path.basename(path)
                    if save_response_content(content, save_filename, is_binary_content):
                        return f"=== Response Headers ===\n{headers}\n\nFile saved as: {save_filename}"
            
            # Para otros métodos, se retorna la respuesta
            if isinstance(response, bytes):
                return f"=== Binary Response ===\nLength: {len(response)} bytes"
            return f"=== Response ===\n{response}"
        
        return "No response received"

    except Exception as e:
        return f"Error: {str(e)}"



iface = gr.Interface(
    fn=client_request,
    inputs=[
        gr.Textbox(label="Host", value="localhost"),
        gr.Textbox(label="Port", value="8080"),
        gr.Radio(
            choices=["GET", "POST", "PUT", "DELETE", "HEAD"],
            label="HTTP Method",
            value="GET"
        ),
        gr.Textbox(label="Path", value="/"),
        gr.Checkbox(label="Save file (for GET requests)"),
        gr.Textbox(
            label="Custom Headers (one per line)",
            lines=3,
            placeholder="X-Custom: value"
        ),
        gr.Radio(
            choices=["Manual Body", "Upload file"],
            label="Body Type",
            value="Manual Body"
        ),
        gr.Textbox(
            label="Manual Body",
            lines=5,
            placeholder="Enter request body here..."
        ),
        gr.File(label="Upload File", type="filepath")
    ],
    outputs=gr.Textbox(label="Response", lines=10),
    title="HTTP Client Interface",
    description="Send HTTP requests using nClient functionality"
)

if __name__ == "__main__":
    iface.launch()


* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


Connected (plain) to localhost:8080
Connected (plain) to localhost:8080
Connected (plain) to localhost:8080
Connected (plain) to localhost:8080
Connected (plain) to localhost:8080
Connected (plain) to localhost:8080


In [1]:
import os
import socket
tempfile = None
import pytest
from nServer import SimpleHTTPServer


def make_server(tmp_path):
    # Instantiate server and point its directory to a temp folder
    server = SimpleHTTPServer(host='localhost', port=0)
    server.server_dir = str(tmp_path / "Server")
    # Create structure including private subfolder
    os.makedirs(os.path.join(server.server_dir, "private"), exist_ok=True)
    return server


def send_request(server, request_bytes):
    # Use socketpair to simulate client-server connection
    client_sock, server_sock = socket.socketpair()
    try:
        # Write request into server end
        client_sock.sendall(request_bytes)
        # Process single request
        server.handle_request(server_sock)
        # Read all response bytes
        response = b""
        while True:
            chunk = client_sock.recv(4096)
            if not chunk:
                break
            response += chunk
        return response
    finally:
        client_sock.close()
        server_sock.close()


def test_get_static_file(tmp_path):
    server = make_server(tmp_path)
    # Create a text file under Server directory
    file_path = os.path.join(server.server_dir, "hello.txt")
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, "w") as f:
        f.write("Hello, Test!")

    req = b"GET /hello.txt HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp = send_request(server, req)
    assert b"200 OK" in resp
    assert b"Hello, Test!" in resp


def test_403_on_private(tmp_path):
    server = make_server(tmp_path)
    # Create a private file
    priv = os.path.join(server.server_dir, "private", "secret.txt")
    with open(priv, "w") as f:
        f.write("Top Secret")

    req = b"GET /private/secret.txt HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp = send_request(server, req)
    assert b"403 Forbidden" in resp


def test_put_then_get_and_delete(tmp_path):
    server = make_server(tmp_path)
    # PUT to create new file
    body = b"Data123"
    headers = (
        b"PUT /data.txt HTTP/1.1\r\n"
        b"Host: localhost\r\n"
        b"Content-Type: text/plain\r\n"
        b"Content-Length: 7\r\n\r\n"
    )
    req_put = headers + body
    resp_put = send_request(server, req_put)
    assert b"201 Created" in resp_put

    # GET the created file
    req_get = b"GET /data.txt HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp_get = send_request(server, req_get)
    assert b"200 OK" in resp_get
    assert b"Data123" in resp_get

    # DELETE the file
    req_del = b"DELETE /data.txt HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp_del = send_request(server, req_del)
    assert b"200 OK" in resp_del

    # GET after delete -> 404
    resp_404 = send_request(server, req_get)
    assert b"404 Not Found" in resp_404


def test_resources_crud(tmp_path):
    server = make_server(tmp_path)
    # POST a new resource in category 'items'
    new_obj = b'{"name":"ItemA"}'
    headers = (
        b"POST /resources/items HTTP/1.1\r\n"
        b"Host: localhost\r\n"
        b"Content-Type: application/json\r\n"
        b"Content-Length: " + str(len(new_obj)).encode() + b"\r\n\r\n"
    )
    req_post = headers + new_obj
    resp_post = send_request(server, req_post)
    assert b"201 Created" in resp_post

    # GET the category list
    req_get_list = b"GET /resources/items HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp_list = send_request(server, req_get_list)
    assert b"\"name\": \"ItemA\"" in resp_list

    # GET single object by id=1
    req_get_one = b"GET /resources/items/1 HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp_one = send_request(server, req_get_one)
    assert b"ItemA" in resp_one

    # DELETE the object
    req_del = b"DELETE /resources/items/1 HTTP/1.1\r\nHost: localhost\r\n\r\n"
    resp_del = send_request(server, req_del)
    assert b"200 OK" in resp_del

    # GET list again -> empty array
    resp_empty = send_request(server, req_get_list)
    assert b"[]" in resp_empty


In [None]:
%pytest -q

UsageError: Line magic function `%pytest` not found.
