In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("MinTemperaturesFilter")
sc = SparkContext(conf=conf)

In [6]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    # convert to Fahrenheit
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

In [4]:
lines = sc.textFile('../../SparkData/1800_temperatures.xls')

In [5]:
lines.top(5)

['ITE00100554,18001231,TMIN,25,,,E,',
 'ITE00100554,18001231,TMAX,50,,,E,',
 'ITE00100554,18001230,TMIN,31,,,E,',
 'ITE00100554,18001230,TMAX,50,,,E,',
 'ITE00100554,18001229,TMIN,16,,,E,']

In [7]:
parsedLines = lines.map(parseLine)

In [8]:
parsedLines.top(5)

[('ITE00100554', 'TMIN', 75.38),
 ('ITE00100554', 'TMIN', 74.84),
 ('ITE00100554', 'TMIN', 74.84),
 ('ITE00100554', 'TMIN', 74.30000000000001),
 ('ITE00100554', 'TMIN', 74.30000000000001)]

In [15]:
parsedLines.count()

1825

In [12]:
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])

In [14]:
minTemps.count()

730

In [16]:
# now we only need the stationId and the mininum temperature
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

In [17]:
# only return the mininum temperature when trying to combine elements with
#  the same key
minTempsByStation = stationTemps.reduceByKey(lambda x, y: min(x, y))

In [18]:
results = minTempsByStation.collect()

In [23]:
for result in results:
    print("Station: {} -> Min Temperature: {:.2f}".format(result[0], result[1]))

Station: ITE00100554 -> Min Temperature: 5.36
Station: EZE00100082 -> Min Temperature: 7.70
