In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, explode, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType
from pymongo import MongoClient
 
#configuration de mongodb
mongo_uri = "mongodb+srv://liticiahammadou99:admin@cluster0.tze2e.mongodb.net"
db_name = "Blablacar"
collection_name = "fares"
 
#connexion a mongodb
client = MongoClient(mongo_uri)
db = client[db_name]
collection = db[collection_name]
 
# Récupération des données de MongoDB
data = list(collection.find())
print(data)
quit()
 
# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("MongoDBtoHDFS") \
    .master("spark://spark-master:7077") \
    .getOrCreate()
 
# Définir le schéma des données pour Spark
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("origin_id", IntegerType(), True),
    StructField("price_cents", IntegerType(), True),
    StructField("price_currency", StringType(), True),
    StructField("schedule", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("timestamp_ingestion", StringType(), True),
    StructField("legs", ArrayType(
        StructType([
            StructField("schedule", StringType(), True),
            StructField("service_type", StringType(), True),
            StructField("arrival", StringType(), True),
            StructField("service_name", StringType(), True),
            StructField("one_luggage", StringType(), True),
            StructField("destination_id", StringType(), True),
            StructField("origin_id", StringType(), True),
            StructField("departure", StringType(), True),
            StructField("via_stations", StringType(), True),
            StructField("bus_number", StringType(), True),
        ])
    ), True)
])
 
# Créer le DataFrame Spark à partir des données MongoDB avec le schéma défini
df = spark.createDataFrame(data, schema=schema)
 
# Exploser les 'legs' pour travailler sur chaque trajet individuellement
df = df.withColumn("leg", explode("legs"))
 
# Transformer les dates en format Timestamp et convertir le prix de centimes en euros
df = df.withColumn("schedule", to_timestamp(col("schedule"))) \
       .withColumn("updated_at", to_timestamp(col("updated_at"))) \
       .withColumn("timestamp_ingestion", to_timestamp(col("timestamp_ingestion"))) \
       .withColumn("leg.schedule", to_timestamp(col("leg.schedule"))) \
       .withColumn("leg.arrival", to_timestamp(col("leg.arrival"))) \
       .withColumn("leg.departure", to_timestamp(col("leg.departure"))) \
       .withColumn("price_euros", col("price_cents") / 100) \
       .withColumn("leg.one_luggage", when(col("leg.one_luggage") == "false", False).otherwise(True)) \
       .withColumn("destination_id", col("leg.destination_id").cast(IntegerType())) \
       .withColumn("origin_id", col("leg.origin_id").cast(IntegerType()))
 
# Sélectionner et renommer les colonnes nécessaires
transformed_df = df.select(
    col("id"),
    col("origin_id"),
    col("destination_id"),
    col("price_euros"),
    col("price_currency"),
    col("schedule"),
    col("updated_at"),
    col("timestamp_ingestion"),
    col("leg.schedule").alias("leg_schedule"),
    col("leg.service_type").alias("service_type"),
    col("leg.arrival").alias("arrival_time"),
    col("leg.departure").alias("departure_time"),
    col("leg.service_name").alias("service_name"),
    col("leg.one_luggage").alias("one_luggage"),
    col("leg.bus_number").alias("bus_number"),
    col("leg.via_stations").alias("via_stations")
)
 
# Sauvegarder les données transformées dans HDFS en format Parquet
transformed_df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/data_transformed")
 
# Arrêter la session Spark
spark.stop()

ServerSelectionTimeoutError: No replica set members found yet, Timeout: 30s, Topology Description: <TopologyDescription id: 66dad4c92db415b196f4ded0, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('cluster0-shard-00-00.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>, <ServerDescription ('cluster0-shard-00-01.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>, <ServerDescription ('cluster0-shard-00-02.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>]>

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, explode, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType
from pymongo import MongoClient
import pandas as pd
 
# Configuration de MongoDB
mongo_uri = "mongodb+srv://liticiahammadou99:admin@cluster0.tze2e.mongodb.net/Blablacar?retryWrites=true&w=majority&serverSelectionTimeoutMS=50000"
db_name = "Blablacar"
collection_name = "fares"
 
# Connexion à MongoDB pour récupérer les données
client = MongoClient(mongo_uri)
db = client[db_name]
collection = db[collection_name]
 
# Récupération des données de MongoDB
data = list(collection.find())
 
# Convertir les données en DataFrame Pandas pour faciliter la manipulation avec Spark
df_pandas = pd.DataFrame(data)
 
# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("MongoDBtoHDFS") \
    .master("spark://spark-master:7077") \
    .getOrCreate()
 
# Convertir le DataFrame Pandas en DataFrame Spark
df = spark.createDataFrame(df_pandas)
 
# Définir le schéma des données pour Spark
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("origin_id", IntegerType(), True),
    StructField("price_cents", IntegerType(), True),
    StructField("price_currency", StringType(), True),
    StructField("schedule", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("timestamp_ingestion", StringType(), True),
    StructField("legs", ArrayType(
        StructType([
            StructField("schedule", StringType(), True),
            StructField("service_type", StringType(), True),
            StructField("arrival", StringType(), True),
            StructField("service_name", StringType(), True),
            StructField("one_luggage", StringType(), True),
            StructField("destination_id", StringType(), True),
            StructField("origin_id", StringType(), True),
            StructField("departure", StringType(), True),
            StructField("via_stations", StringType(), True),
            StructField("bus_number", StringType(), True),
        ])
    ), True)
])
 
# Appliquer le schéma à partir des données récupérées
df = spark.createDataFrame(df_pandas, schema=schema)
 
# Exploser les 'legs' pour travailler sur chaque trajet individuellement
df = df.withColumn("leg", explode("legs"))
 
# Transformer les dates en format Timestamp et convertir le prix de centimes en euros
df = df.withColumn("schedule", to_timestamp(col("schedule"))) \
       .withColumn("updated_at", to_timestamp(col("updated_at"))) \
       .withColumn("timestamp_ingestion", to_timestamp(col("timestamp_ingestion"))) \
       .withColumn("leg.schedule", to_timestamp(col("leg.schedule"))) \
       .withColumn("leg.arrival", to_timestamp(col("leg.arrival"))) \
       .withColumn("leg.departure", to_timestamp(col("leg.departure"))) \
       .withColumn("price_euros", col("price_cents") / 100) \
       .withColumn("leg.one_luggage", when(col("leg.one_luggage") == "false", False).otherwise(True)) \
       .withColumn("destination_id", col("leg.destination_id").cast(IntegerType())) \
       .withColumn("origin_id", col("leg.origin_id").cast(IntegerType()))
 
# Sélectionner et renommer les colonnes nécessaires
transformed_df = df.select(
    col("id"),
    col("origin_id"),
    col("destination_id"),
    col("price_euros"),
    col("price_currency"),
    col("schedule"),
    col("updated_at"),
    col("timestamp_ingestion"),
    col("leg.schedule").alias("leg_schedule"),
    col("leg.service_type").alias("service_type"),
    col("leg.arrival").alias("arrival_time"),
    col("leg.departure").alias("departure_time"),
    col("leg.service_name").alias("service_name"),
    col("leg.one_luggage").alias("one_luggage"),
    col("leg.bus_number").alias("bus_number"),
    col("leg.via_stations").alias("via_stations")
)
 
# Sauvegarder les données transformées dans HDFS en format Parquet
transformed_df.write.mode("overwrite").parquet("hdfs://namenode:9000/data/data_transformed")
 
# Arrêter la session Spark
spark.stop()

ServerSelectionTimeoutError: cluster0-shard-00-02.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms),cluster0-shard-00-01.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms),cluster0-shard-00-00.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 50.0s, Topology Description: <TopologyDescription id: 66dad6232db415b196f4ded2, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('cluster0-shard-00-00.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-00.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>, <ServerDescription ('cluster0-shard-00-01.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-01.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>, <ServerDescription ('cluster0-shard-00-02.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-02.tze2e.mongodb.net:27017: timed out (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>

In [7]:
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError, ConnectionFailure
import pandas as pd
 
# Configuration de MongoDB avec des timeouts prolongés et des paramètres TLS
mongo_uri = "mongodb+srv://liticiahammadou99:admin@cluster0.tze2e.mongodb.net/Blablacar?retryWrites=true&w=majority&readPreference=primaryPreferred&connectTimeoutMS=60000&socketTimeoutMS=60000&tls=true"
db_name = "Blablacar"
collection_name = "fares"
 
# Fonction pour se connecter à MongoDB avec des tentatives de reconnexion
def connect_to_mongo(retries=5):
    attempt = 0
    while attempt < retries:
        try:
            client = MongoClient(mongo_uri)
            db = client[db_name]
            collection = db[collection_name]
            # Vérifie la connexion au cluster
            client.admin.command('ping')
            print("Connexion réussie à MongoDB.")
            return collection
        except (ServerSelectionTimeoutError, ConnectionFailure) as err:
            attempt += 1
            print(f"Erreur de connexion à MongoDB (tentative {attempt}/{retries}) : {err}")
            if attempt >= retries:
                print("Impossible de se connecter à MongoDB après plusieurs tentatives.")
                return None
 
# Connexion à MongoDB
collection = connect_to_mongo()
if not collection:
    print("Impossible de se connecter à MongoDB, vérifiez votre configuration et l'état du cluster.")
    exit(1)
 
# Récupération des données depuis MongoDB
try:
    data = list(collection.find())
    if not data:
        print("Aucune donnée trouvée dans la collection.")
    else:
        print(f"Récupéré {len(data)} documents depuis MongoDB.")
except Exception as e:
    print(f"Erreur lors de la récupération des données : {e}")

Erreur de connexion à MongoDB : No replica set members found yet, Timeout: 30s, Topology Description: <TopologyDescription id: 66dad6682db415b196f4ded3, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('cluster0-shard-00-00.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>, <ServerDescription ('cluster0-shard-00-01.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>, <ServerDescription ('cluster0-shard-00-02.tze2e.mongodb.net', 27017) server_type: Unknown, rtt: None>]>
Impossible de se connecter à MongoDB, vérifiez votre configuration.
Erreur lors de la récupération des données : 'NoneType' object has no attribute 'find'
