In [1]:
import pandas as pd

In [9]:
file_path = "file:///home/adithan/temp.csv"
tdf = pd.read_csv(file_path)

In [10]:
tdf.head(5)

Unnamed: 0,itemID,stationID,desc,temp
0,ITE00100554,18000101,TMAX,-75
1,ITE00100554,18000101,TMIN,-148
2,GM000010962,18000101,PRCP,0
3,EZE00100082,18000101,TMAX,-86
4,EZE00100082,18000101,TMIN,-135


In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("Temperature Analysis").getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("itemID", StringType(), True),
    StructField("stationID", StringType(), True),
    StructField("desc", StringType(), True),
    StructField("temp", IntegerType(), True)
])

# Load dataset into Spark DataFrame
temp_df = spark.read.csv(file_path, schema=schema, header=True)

# Convert DataFrame to RDD
temp_rdd = temp_df.rdd

# Part 1: Analysis on TMIN values

# Filter RDD to include only rows with 'desc' column as 'TMIN'
tmin_rdd = temp_rdd.filter(lambda row: row.desc == "TMIN")

# a. Minimum temperature (overall)
overall_min_temp = tmin_rdd.map(lambda row: row.temp).min()
print(f"Overall Minimum Temperature (TMIN): {overall_min_temp}")


[Stage 0:>                                                          (0 + 1) / 1]

Overall Minimum Temperature (TMIN): -148


                                                                                

In [12]:
# b. Minimum temperature for every ItemID
min_temp_by_itemID = tmin_rdd.map(lambda row: (row.itemID, row.temp)).reduceByKey(min)
print("Minimum Temperature for each ItemID:")
for itemID, min_temp in min_temp_by_itemID.collect():
    print(f"ItemID: {itemID}, Min Temperature: {min_temp}")

# c. Minimum temperature for every StationID
min_temp_by_stationID = tmin_rdd.map(lambda row: (row.stationID, row.temp)).reduceByKey(min)
print("Minimum Temperature for each StationID:")
for stationID, min_temp in min_temp_by_stationID.collect():
    print(f"StationID: {stationID}, Min Temperature: {min_temp}")

# Part 2: Analysis on TMAX values

# Filter RDD to include only rows with 'desc' column as 'TMAX'
tmax_rdd = temp_rdd.filter(lambda row: row.desc == "TMAX")

# a. Maximum temperature (overall)
overall_max_temp = tmax_rdd.map(lambda row: row.temp).max()
print(f"Overall Maximum Temperature (TMAX): {overall_max_temp}")

Minimum Temperature for each ItemID:
ItemID: ITE00100554, Min Temperature: -148
ItemID: EZE00100082, Min Temperature: -135
Minimum Temperature for each StationID:
StationID: 18000101, Min Temperature: -148
StationID: 18000102, Min Temperature: -130
StationID: 18000103, Min Temperature: -73
StationID: 18000104, Min Temperature: -74
StationID: 18000105, Min Temperature: -58
StationID: 18000106, Min Temperature: -57
StationID: 18000107, Min Temperature: -50
StationID: 18000108, Min Temperature: -31
StationID: 18000109, Min Temperature: -46
StationID: 18000110, Min Temperature: -75
StationID: 18000111, Min Temperature: -62
StationID: 18000112, Min Temperature: -60
StationID: 18000113, Min Temperature: -60
StationID: 18000114, Min Temperature: -35
StationID: 18000115, Min Temperature: -23
StationID: 18000116, Min Temperature: -37
StationID: 18000117, Min Temperature: -35
StationID: 18000118, Min Temperature: 9
StationID: 18000119, Min Temperature: 34
StationID: 18000120, Min Temperature: 17

In [13]:
# b. Maximum temperature for every ItemID
max_temp_by_itemID = tmax_rdd.map(lambda row: (row.itemID, row.temp)).reduceByKey(max)
print("Maximum Temperature for each ItemID:")
for itemID, max_temp in max_temp_by_itemID.collect():
    print(f"ItemID: {itemID}, Max Temperature: {max_temp}")

# c. Maximum temperature for every StationID
max_temp_by_stationID = tmax_rdd.map(lambda row: (row.stationID, row.temp)).reduceByKey(max)
print("Maximum Temperature for each StationID:")
for stationID, max_temp in max_temp_by_stationID.collect():
    print(f"StationID: {stationID}, Max Temperature: {max_temp}")


Maximum Temperature for each ItemID:
ItemID: ITE00100554, Max Temperature: 323
ItemID: EZE00100082, Max Temperature: 323
Maximum Temperature for each StationID:
StationID: 18000101, Max Temperature: -75
StationID: 18000102, Max Temperature: -44
StationID: 18000103, Max Temperature: -10
StationID: 18000104, Max Temperature: 0
StationID: 18000105, Max Temperature: 10
StationID: 18000106, Max Temperature: 13
StationID: 18000107, Max Temperature: 31
StationID: 18000108, Max Temperature: 29
StationID: 18000109, Max Temperature: 35
StationID: 18000110, Max Temperature: 46
StationID: 18000111, Max Temperature: 66
StationID: 18000112, Max Temperature: 41
StationID: 18000113, Max Temperature: 23
StationID: 18000114, Max Temperature: 41
StationID: 18000115, Max Temperature: 54
StationID: 18000116, Max Temperature: 56
StationID: 18000117, Max Temperature: 84
StationID: 18000118, Max Temperature: 59
StationID: 18000119, Max Temperature: 66
StationID: 18000120, Max Temperature: 60
StationID: 180001

In [14]:
# Stop Spark session
spark.stop()