In [1]:
from pyspark.sql import SparkSession, Row

In [2]:
# Initialize Spark Session (works on Windows)
spark = SparkSession.builder.appName("HDFS Reader - Windows") \
    .master("local[*]").getOrCreate()

In [3]:
from io import BytesIO
import requests
from urllib.parse import urlparse, urlunparse

In [4]:
class HDFSClient:
    def __init__(self, host='localhost', namenode_port=9870, datanode_port=9865):
        self.host = host
        self.namenode_port = namenode_port
        self.datanode_port = datanode_port
        self.base_url = f'http://{host}:{namenode_port}/webhdfs/v1'
    
    def _fix_datanode_url(self, url):
        """Replace container hostname with localhost"""
        parsed = urlparse(url)
        new_netloc = f'{self.host}:{self.datanode_port}'
        
        fixed = urlunparse((
            parsed.scheme, new_netloc, parsed.path,
            parsed.params, parsed.query, parsed.fragment
        ))
        
        return fixed
    
    def read_file(self, hdfs_path):
        """Read file from HDFS with redirect handling"""
        url = f'{self.base_url}{hdfs_path}?op=OPEN'
        response = requests.get(url, allow_redirects=False)
        
        if response.status_code == 307:
            redirect_url = response.headers['Location']
            fixed_url = self._fix_datanode_url(redirect_url)
            data_response = requests.get(fixed_url)
            return data_response.content
        elif response.status_code == 200: return response.content
        else:
            raise Exception(f"Error: {response.status_code} - {response.text}")
    
    def list_directory(self, path='/'):
        """List contents of HDFS directory"""
        url = f'{self.base_url}{path}?op=LISTSTATUS'
        response = requests.get(url)
        return response.json()

In [5]:
# Initialize HDFS client
hdfs = HDFSClient(host='localhost', namenode_port=9870, datanode_port=9865)

In [6]:
# Read CSV line by line
from io import StringIO
import csv

In [7]:
def read_hdfs_streaming(hdfs_path, nrows=None):
    content = hdfs.read_file(hdfs_path) 
    
    text_content = content.decode('utf-8')
    csv_reader = csv.reader(StringIO(text_content))
    
    # Get headers
    headers = next(csv_reader)
    
    # Convert rows to Spark Rows
    rows = []
    for i, row in enumerate(csv_reader):
        if nrows and i >= nrows: break
        rows.append(Row(**dict(zip(headers, row))))
    
    # Create DataFrame from Rows
    df_spark = spark.createDataFrame(rows)
    
    return df_spark

In [8]:
# Usage
df = read_hdfs_streaming('/user/salgrade.csv')
df.show()

+-----+-----+-----+
|GRADE|LOSAL|HISAL|
+-----+-----+-----+
|    1|  700| 1200|
|    2| 1201| 1400|
|    3| 1401| 2000|
|    4| 2001| 3000|
|    5| 3001| 9999|
+-----+-----+-----+



In [9]:
print(f"\nRow count: {df.count()}")
print(f"Columns: {df.columns}")
print("\nSchema:")
df.printSchema()


Row count: 5
Columns: ['GRADE', 'LOSAL', 'HISAL']

Schema:
root
 |-- GRADE: string (nullable = true)
 |-- LOSAL: string (nullable = true)
 |-- HISAL: string (nullable = true)



In [10]:
# Filter
print("\nFiltered data (GRADE > 2):")
df.filter(df.GRADE > 2).show()


Filtered data (GRADE > 2):
+-----+-----+-----+
|GRADE|LOSAL|HISAL|
+-----+-----+-----+
|    3| 1401| 2000|
|    4| 2001| 3000|
|    5| 3001| 9999|
+-----+-----+-----+



In [11]:
# SQL Query
df.createOrReplaceTempView("salgrade")
result = spark.sql("SELECT * FROM salgrade WHERE hisal > 2000")
print("\nSQL Query Result:")
result.show()


SQL Query Result:
+-----+-----+-----+
|GRADE|LOSAL|HISAL|
+-----+-----+-----+
|    4| 2001| 3000|
|    5| 3001| 9999|
+-----+-----+-----+



In [12]:
# Aggregation
print("\nAggregation:")
df.groupBy("grade").count().show()


Aggregation:
+-----+-----+
|grade|count|
+-----+-----+
|    1|    1|
|    2|    1|
|    3|    1|
|    4|    1|
|    5|    1|
+-----+-----+

