TRANSMITTING PROGRAM

Import all python libraries required for the communication :

In [1]:
import paho.mqtt.client as mqtt
import base64
import time
import uuid
import json
import os

Connect to the MQTT_Broker :

In [2]:
# Online broker for communication with the same network connections.
MQTT_BROKER = "#IP_Adress_of_computer" # Put here the IP Adress of the MQTT_Broker
MQTT_PORT = 1883 # Port for MQTT communication.
USE_TLS = False # Port 8883 is used for secure communication (TLS/SSL).

Define the Topics (one to send the data, one to receive feedback of the receiver) :

In [3]:
TOPIC_IMAGE = "pi/photo"
TOPIC_ACK = "pc/ack"

Global variable :

In [4]:
CHUNK_SIZE = 10_000_000  # Used to decompose large files into smaller pieces of 10 Mo
ack_recu = False # Variable to know if the feedback has been received.
current_message_id = "" # Variable to store the ID of the current message.

Main function that connects to the MQTT_Broker, scans all the data to send and sends each one of them (calling the function send_file()) :

In [5]:
def main():
    # Initialisation of the MQTT connexion :
    client = mqtt.Client()
    client.on_message = on_message # Callback function to handle incoming messages (feedback).

    # Connexion to the MQTT_Broker
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.subscribe(TOPIC_ACK) # Subscribe to the feedback topic.
    client.loop_start()

    # Scan of the folder which contains all the data to send.
    repository = "files_to_send"
    filesnames = next(os.walk(repository), (None, None, []))[2] # [] if no file
    
    for file in filesnames:
        path_file = os.path.join(repository, file)
        if os.path.exists(path_file):
            send_file(path_file, client) # Call the function send_file() to send each file to the receiver.
        else:
            print(f"❌ This data has not been found : {file}")

    time.sleep(2) # Wait some time between each file to be sent to avoid too much traffic.
    client.loop_stop()
    client.disconnect()
    print("[TRANSMITTER] End of the transmission.")

Function that get feedback of the receiver to inform if the data has been successfully received or not.

In [6]:
# Callback function to handle incoming messages (feedback).
def on_message(client, userdata, msg):
        global ack_recu
        ack = json.loads(msg.payload.decode('utf-8'))
        if ack.get("id") == message_id_en_cours:
            ack_recu = True
            print(f"[RECEIVER] '{ack.get('status')}' : the data was succesfully received by the receiver !")

Function that decompose larger files into smaller pieces and then sends each of those pieces to the receiver.

In [7]:
# Function that sends each file to the receiver in chunks.
def send_file(file, client):

    # Get all global variables to know which data is being send / decompose
    global message_id_en_cours, ack_recu
    ack_recu = False
    message_id = str(uuid.uuid4())
    message_id_en_cours = message_id

    # Read the current data to transmit
    with open(file, "rb") as f:
        file_data = f.read()

    # Calculate the number of chunks needed to send the file.
    total_chunks = (len(file_data) + CHUNK_SIZE - 1) // CHUNK_SIZE 
    filename = os.path.basename(file)

    # Inform the sender of how many pieces the data will be send.
    print(f"\n📤 Sending '{filename}' ({len(file_data)} octets) through {total_chunks} chunks...")

    # Decomposition of the data.
    for i in range(total_chunks):
        chunk = file_data[i * CHUNK_SIZE : (i + 1) * CHUNK_SIZE]
        payload = {
            "message_id": message_id,
            "filename": filename,
            "chunk_id": i,
            "total_chunks": total_chunks,
            "data": base64.b64encode(chunk).decode("utf-8") # Small 'chunks' of data.
        }
        # Maximum number of tries to send the chunk (in case of a ponctual error).
        max_try = 3
        for Try in range(1, max_try + 1):
            
            print(f"[TRANSMITTER] Sending {filename} - Chunk {i+1}/{total_chunks} - Try n°{Try}")
            
            client.publish(TOPIC_IMAGE, json.dumps(payload)) # Transmission of each 'chunks' of data.
            time.sleep(0.1)

            # little timeout to wait for the feedback.
            for _ in range(10):  
                if ack_recu:
                    break
                time.sleep(0.5)

            # If the feedback is received, we can stop sending the chunk.
            if ack_recu: 
                print(f"[RECEIVER] {filename} → was successfully sent.")
                break
            else:
                print(f"[RECEIVER] No feedback for {filename}, new try...")

    print(f"File '{filename}' send with success.")
    time.sleep(1.5)

In [8]:
if __name__ == "__main__":
    main()

  client = mqtt.Client()


TimeoutError: timed out