In [1]:
from pyspark import SparkContext, SparkConf

## Min, Max Temperature by weather station Program
### Steps followed:
1. Read the weather station data of 1800 file.
2. Parse each line of the dataset.
3. Filter the rows which have "TMIN" and "TMAX" i.e. min, max weather recorded.
4. Create an rdd having K,V pair with K = weather station, V = temperature recorded.
5. Reduce the rdd to save only min and max value per weather station respectively.

In [2]:
def min_temperature():
    conf = SparkConf().setMaster("local").setAppName("MinTemperature")
    sc = SparkContext(conf=conf)

    # station id, date (yyyy-mm-dd), weather-type, temperature
    weather_data = sc.textFile("file:////Users/amoghmishra/Desktop/AmoghM/ApacheSpark/dataset/weather_data.csv")
    #Finding minimum temperature by station
    min_weather_rdd = weather_data.map(parse).filter(lambda x: "TMIN" in x[1])
    min_temp_rdd = min_weather_rdd.map(lambda x: (x[0],x[2])).reduceByKey(lambda x,y: min(x,y)) #min across all station's temperature

    min_result = min_temp_rdd.collect()
    print "Station Id, Minimum Temperature"
    for res in min_result:
        print res[0], res[1]
    print "\n"

    #Finding maximum temperature by station
    max_weather_rdd = weather_data.map(parse).filter(lambda x: "TMAX" in x)
    max_temp_rdd = max_weather_rdd.map(lambda x: (x[0],x[2])).reduceByKey(lambda x,y: max(x,y))
    max_result = max_temp_rdd.collect()

    print "Station Id, Maximum Temperature"
    for res in max_result:
        print res[0], res[1]

In [3]:
def parse(weather_entry):
    entry = weather_entry.split(",")
    station_id, temp_type, temperature = entry[0], entry[2], entry[3]
    temperature = float(temperature)/10 #actual temp = temperature in the dataset / 10

    return (station_id, temp_type, temperature)

In [4]:
if __name__=="__main__":
    min_temperature()

Station Id, Minimum Temperature
ITE00100554 -14.8
EZE00100082 -13.5


Station Id, Maximum Temperature
ITE00100554 32.3
EZE00100082 32.3
