In [None]:
# Pr√©requis : charger un fichier csv dans hdfs via l'API WEBHDFS

In [1]:
import requests
import os

def upload_with_webhdfs(local_path, hdfs_path):
    """Upload a file to HDFS using WebHDFS API"""
    namenode = 'http://namenode:9870'

    # Verify the local file exists
    if not os.path.exists(local_path):
        raise FileNotFoundError(f"Local file not found: {local_path}")

    # Initiate CREATE operation
    create_url = f"{namenode}/webhdfs/v1{hdfs_path}"
    params = {'op': 'CREATE', 'overwrite': 'true'}
    init = requests.put(create_url, params=params, allow_redirects=False, timeout=10)

    if 300 <= init.status_code < 400 and 'Location' in init.headers:
        # Perform the actual file upload
        with open(local_path, 'rb') as fh:
            put_resp = requests.put(init.headers['Location'], data=fh, timeout=60)

        if put_resp.status_code in (200, 201):
            print(f"Successfully uploaded {local_path} to {hdfs_path}")
        else:
            print(f"Upload failed: {put_resp.status_code}")
    else:
        print(f"CREATE failed: {init.status_code}")

# Example usage
local_path = '/app/hadoop_data/datanode/donnees_ventes_fictives.csv'
hdfs_path = '/tmp/donnees_ventes_fictives.csv'

upload_with_webhdfs(local_path, hdfs_path)

Successfully uploaded /app/hadoop_data/datanode/donnees_ventes_fictives.csv to /tmp/donnees_ventes_fictives.csv


In [2]:
from pyspark.sql import SparkSession

# Create Spark session with HDFS configuration
spark = SparkSession.builder \
    .appName("Read from HDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .config("spark.hadoop.dfs.datanode.use.datanode.hostname", "true") \
    .getOrCreate()

# Path to your file in HDFS
hdfs_path = "hdfs://namenode:9000/tmp/donnees_ventes_fictives.csv"

try:
    # Read the CSV file from HDFS
    # Adjust the parameters based on your CSV format
    df = spark.read.csv(hdfs_path)

    # Show the DataFrame schema
    df.printSchema()

    # Show the first few rows
    df.show(5)

except Exception as e:
    print(f"Error reading from HDFS: {str(e)}")
    import traceback
    traceback.print_exc()
finally:
    # Stop the Spark session
    spark.stop()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)

+----------+-----------+--------+----------------+--------------+----------+
|       _c0|        _c1|     _c2|             _c3|           _c4|       _c5|
+----------+-----------+--------+----------------+--------------+----------+
|magasin_id|magasin_nom|   ville|chiffre_affaires|nombre_clients|      date|
|    Mag104|  Magazin 1|    Lyon|           15627|           130|2025-01-13|
|    Mag105|  Magazin 2|Bordeaux|           13792|            99|2025-01-09|
|    Mag103|  Magazin 3|Toulouse|           15555|           153|2025-01-15|
|    Mag105|  Magazin 4|   Paris|           15253|           181|2025-01-13|
+----------+-----------+--------+----------------+--------------+----------+
only showing top 5 rows



In [None]:
# TP PYSPARK

# Import the necessary libraries
from pyspark.sql import SparkSession

# Create a Spark session with HDFS configuration
spark = SparkSession.builder \
    .appName("TP PYSPARK") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

# Print the Spark version
print("Spark version:", spark.version)

# Example: Read a CSV file from HDFS
# df = spark.read.csv("hdfs://localhost:9000/path/to/your/file.csv")

# Example: Write a DataFrame to HDFS
# df.write.csv("hdfs://localhost:9000/path/to/your/output")

# Stop the Spark session
spark.stop()