In [1]:
# 1. Uninstall broken packages
!pip uninstall numpy pandas scipy tensorflow -y

# 2. Clear pip cache
!pip cache purge

# 3. Reinstall with compatible versions
!pip install numpy==1.26.4 pandas==2.1.4 scipy==1.11.4 tensorflow==2.15.0


Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Successfully uninstalled numpy-1.26.4
Found existing installation: pandas 2.1.4
Uninstalling pandas-2.1.4:
  Successfully uninstalled pandas-2.1.4
Found existing installation: SciPy 1.11.4
Uninstalling SciPy-1.11.4:
  Successfully uninstalled SciPy-1.11.4
Found existing installation: tensorflow 2.20.0
Uninstalling tensorflow-2.20.0:
  Successfully uninstalled tensorflow-2.20.0
Files removed: 222
Collecting numpy==1.26.4
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m316.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting pandas==2.1.4
  Downloading pandas-2.1.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting scipy==1.11.4
  Downloading scipy-1.11.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)


In [17]:
from hdfs import InsecureClient
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras import losses, metrics
import pandas as pd


In [3]:
# Establish connection to HDFS
connection_url = "http://hdfs-namenode:9870"
hdfs_client = InsecureClient(connection_url, user='root')
hdfs_client.status('/')

{'accessTime': 0,
 'blockSize': 0,
 'childrenNum': 1,
 'fileId': 16385,
 'group': 'supergroup',
 'length': 0,
 'modificationTime': 1770299451979,
 'owner': 'root',
 'pathSuffix': '',
 'permission': '755',
 'replication': 0,
 'snapshotEnabled': True,
 'storagePolicy': 0,
 'type': 'DIRECTORY'}

In [23]:
# Load Model
model_path = "../models/weather_lstm_model_fixed.h5"

custom_objects = {
    'mse': losses.MeanSquaredError(),
    'mae': metrics.MeanAbsoluteError()
}

model = keras.models.load_model(
    model_path,
    custom_objects=custom_objects,
    compile=False  # Don't compile during load
)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss=losses.MeanSquaredError(),
    metrics=[metrics.MeanAbsoluteError()]
)

TypeError: Error when deserializing class 'InputLayer' using config={'batch_shape': [None, 24, 9], 'dtype': 'float32', 'sparse': False, 'ragged': False, 'name': 'input_layer'}.

Exception encountered: Unrecognized keyword arguments: ['batch_shape']

In [24]:
raw_path = '/weather/raw'

locations = []
for item in hdfs_client.list(raw_path):
    if item.startswith('location='):
        location = item.replace('location=', '')
        locations.append(location)

locations

['Albuquerque',
 'Atlanta',
 'Beersheba',
 'Boston',
 'Charlotte',
 'Chicago',
 'Dallas',
 'Denver',
 'Detroit',
 'Eilat',
 'Haifa',
 'Houston',
 'Indianapolis',
 'Jacksonville',
 'Jerusalem',
 'Kansas City',
 'Las Vegas',
 'Los Angeles',
 'Miami',
 'Minneapolis',
 'Montreal',
 'Nahariyya',
 'Nashville',
 'New York',
 'Philadelphia',
 'Phoenix',
 'Pittsburgh',
 'Portland',
 'Saint Louis',
 'San Antonio',
 'San Diego',
 'San Francisco',
 'Seattle',
 'Tel Aviv District',
 'Toronto',
 'Vancouver']

In [25]:
def fetch_recent_data(location, hours=48):
    # Path to location partition
    partition_path = f'/weather/raw/location={location}'
    
    # Read parquet files
    parquet_files = []
    for file in hdfs_client.list(partition_path):
        if file.endswith('.parquet'):
            file_path = f'{partition_path}/{file}'
            parquet_files.append(file_path)

    # Problem:
    # with hdfs_client.read(parquet_files[0]) as f:
    #     print(f)

    # return parquet_files    # For debugging purposes -> works fine
    
    # # Read all parquet files
    dfs = []
    for file_path in parquet_files:
        with hdfs_client.read(file_path) as reader:
            df = pd.read_parquet(reader)
            dfs.append(df)

    return dfs    # For debugging purposes -> problematic
    
    # # Combine all dataframes
    # combined_df = pd.concat(dfs, ignore_index=True)
    
    # # Convert timestamp to datetime
    # combined_df['timestamp'] = pd.to_datetime(combined_df['timestamp'])
    
    # # Sort by timestamp
    # combined_df = combined_df.sort_values('timestamp')
    
    # # Filter to recent hours
    # cutoff_time = datetime.now() - timedelta(hours=hours)
    # recent_df = combined_df[combined_df['timestamp'] >= cutoff_time]
    
    # logger.info(f"Fetched {len(recent_df)} records for {location}")
    # return recent_df

In [26]:
fetch_recent_data('Albuquerque')

UnsupportedOperation: seek

In [27]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Create Spark configuration
conf = SparkConf()
conf.setAppName("HDFS Explorer")
conf.setMaster("local[*]")  # Use all available cores, adjust if needed
conf.set("spark.hadoop.fs.defaultFS", "hdfs://hdfs-namenode:9000")
conf.set("spark.sql.warehouse.dir", "/user/hive/warehouse")

# Create Spark session
spark = SparkSession.builder \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print("Spark session created successfully!")


Spark Version: 3.5.0
Spark session created successfully!


In [31]:
weather_df = spark.read.parquet("hdfs://hdfs-namenode:9000/weather/raw/location=Albuquerque/part-00000-882bebbd-6110-4ef2-aabe-be8519df2aa4.c000.snappy.parquet")

Py4JJavaError: An error occurred while calling o42.parquet.
: java.net.ConnectException: Call From 90a2a689bb38/172.18.0.9 to hdfs-namenode:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:913)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:828)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1616)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
	at jdk.proxy2/jdk.proxy2.$Proxy33.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:965)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at jdk.proxy2/jdk.proxy2.$Proxy34.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1739)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1753)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:756)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:205)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:711)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:833)
	at org.apache.hadoop.ipc.Client$Connection.access$3800(Client.java:414)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1677)
	at org.apache.hadoop.ipc.Client.call(Client.java:1502)
	... 37 more


In [32]:
print(f"Default FS: {spark.sparkContext._jsc.hadoopConfiguration().get('fs.defaultFS')}")

Default FS: hdfs://hdfs-namenode:9000


In [33]:
df = spark.read.parquet("webhdfs://hdfs-namenode:9870/weather/raw/location=Albuquerque")
df.show(5)

+----------------+-----------+-------------+---------+-----------+-----------+--------+----------+--------------------+-------------------+
|       timestamp|       city|      country| latitude|  longitude|temperature|humidity|wind_speed|         ingested_at|         event_time|
+----------------+-----------+-------------+---------+-----------+-----------+--------+----------+--------------------+-------------------+
|2026-02-11T23:00|Albuquerque|United States|35.084492|-106.651138|        9.4|    52.0|       6.5|2026-02-05T13:54:...|2026-02-11 23:00:00|
|2026-02-11T23:00|Albuquerque|United States|35.084492|-106.651138|        9.4|    52.0|       6.5|2026-02-05T13:57:...|2026-02-11 23:00:00|
+----------------+-----------+-------------+---------+-----------+-----------+--------+----------+--------------------+-------------------+

