In [7]:
import requests
from kafka import KafkaProducer
import json
import logging

# Configurer le logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def send_tan_to_kafka(topic, api_url, fields={}):
    # Kafka configuration
    kafka_config = {
        'bootstrap_servers': 'kafka1:9092',  # Update with your Kafka broker
    }

    # Initialize Kafka Producer
    producer = KafkaProducer(
        bootstrap_servers=kafka_config['bootstrap_servers'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    try:
        # Fetch data from TAN API
        response = requests.get(api_url)

        if response.status_code == 200:
            data = response.json()

            # For each entry in the data, process and send it to Kafka
            for entry in data:
                # Process fields based on the provided mapping
                for field in fields:
                    entry[fields[field]] = entry.pop(field, None)

                # Send the data to Kafka
                producer.send(topic, value=entry)
                logger.info(f"Sent: {entry}")

            # Ensure all messages are sent
            producer.flush()
            logger.info(f"Sent {len(data)} records.")
        else:
            logger.error(f"Failed to fetch data: {response.status_code}, {response.text}")

    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed: {e}")
    except Exception as e:
        logger.error(f"Kafka sending failed: {e}")
    finally:
        producer.close()

# API URL for the stop data (using f-string formatting)
latitude = "47.264"
longitude = "-1.585"
api_url = f"https://open.tan.fr/ewp/arrets.json/{latitude}/{longitude}"

# Field mappings from API response to Kafka data schema
fields = {
    "codeLieu": "stop_code",
    "libelle": "stop_name",
    "distance": "stop_distance",
}

# Example of sending the data to Kafka
send_tan_to_kafka("tan_stops", api_url, fields)


INFO:kafka.conn:<BrokerConnection client_id=kafka-python-producer-1, node_id=bootstrap-0 host=kafka1:9092 <connecting> [IPv4 ('172.18.0.8', 9092)]>: connecting to kafka1:9092 [('172.18.0.8', 9092) IPv4]
INFO:kafka.conn:Broker version identified as 2.6
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-producer-1, node_id=bootstrap-0 host=kafka1:9092 <checking_api_versions_recv> [IPv4 ('172.18.0.8', 9092)]>: Connection complete.
INFO:__main__:Sent: {'ligne': [{'numLigne': '109'}, {'numLigne': '116'}, {'numLigne': '2'}, {'numLigne': '2B'}, {'numLigne': '50'}, {'numLigne': '59'}, {'numLigne': '89'}, {'numLigne': 'C2'}], 'stop_code': 'LCAR', 'stop_name': 'Le Cardo', 'stop_distance': '256 m'}
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-producer-1, node_id=1 host=efbb4ef93242:9092 <connecting> [IPv4 ('172.18.0.8', 9092)]>: connecting to efbb4ef93242:9092 [('172.18.0.8', 9092) IPv4]
INFO:__main__:Sent: {'ligne': [{'numLigne': '59'}], 'stop_code': 'AURR', 'stop_name': 'Auror

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Assurez-vous que SparkSession est initialisé
spark = SparkSession.builder.appName("KafkaTanData").getOrCreate()

# Schéma des données reçues de l'API TAN
schema = StructType([
    StructField("stop_code", StringType(), True),
    StructField("stop_name", StringType(), True),
    StructField("stop_distance", FloatType(), True),
])

# Connexion à Kafka et lecture des données
raw_stream = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092") \
    .option("subscribe", "tan_stops") \
    .load()

# Traitez les données reçues depuis Kafka
parsed_stream = raw_stream.selectExpr("CAST(value AS STRING) AS message") \
    .select(from_json(col("message"), schema).alias("data")) \
    .select("data.stop_code", "data.stop_name", "data.stop_distance")

# Affichez les résultats pour vérifier
parsed_stream.show()


25/03/24 00:10:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

+---------+---------------+-------------+
|stop_code|      stop_name|stop_distance|
+---------+---------------+-------------+
|     AURR|         Aurore|         NULL|
|     LRHE|     Les Roches|         NULL|
|     BDLA|Bout des Landes|         NULL|
|     AURR|         Aurore|         NULL|
|     LRHE|     Les Roches|         NULL|
|     CORA|        Conraie|         NULL|
|     BDLA|Bout des Landes|         NULL|
|     LRHE|     Les Roches|         NULL|
|     LCAR|       Le Cardo|         NULL|
|     AURR|         Aurore|         NULL|
|     LRHE|     Les Roches|         NULL|
|     BDLA|Bout des Landes|         NULL|
|     LCAR|       Le Cardo|         NULL|
|     LRHE|     Les Roches|         NULL|
|     CORA|        Conraie|         NULL|
|     LCAR|       Le Cardo|         NULL|
|     CORA|        Conraie|         NULL|
|     LCAR|       Le Cardo|         NULL|
|     LCAR|       Le Cardo|         NULL|
|     AURR|         Aurore|         NULL|
+---------+---------------+-------