#### Find lowest temperature by station, and entry type

In [1]:
from pyspark import SparkConf, SparkContext
import os

# Set spark to run on all cores "local[*]", set to "local" for 1 core, or replace with a number to specify the exact number
# app name will show up in UI
conf = SparkConf().setMaster("local[*]").setAppName("Temperature")
sc = SparkContext(conf = conf)

# get data path
datadir = os.path.join(os.path.normpath(os.getcwd() + os.sep + os.pardir), 'data')
datafile = os.path.join(datadir,'1800.csv')

In [2]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = int(fields[3])
    return (stationID, entryType, temperature)

In [13]:
lines = sc.textFile(datafile)
parsedLines = lines.map(parseLine)
parsedLines.take(10)

[('ITE00100554', 'TMAX', -75),
 ('ITE00100554', 'TMIN', -148),
 ('GM000010962', 'PRCP', 0),
 ('EZE00100082', 'TMAX', -86),
 ('EZE00100082', 'TMIN', -135),
 ('ITE00100554', 'TMAX', -60),
 ('ITE00100554', 'TMIN', -125),
 ('GM000010962', 'PRCP', 0),
 ('EZE00100082', 'TMAX', -44),
 ('EZE00100082', 'TMIN', -130)]

In [9]:
minTemps = parsedLines.filter(lambda x: 'TMIN' in x[1])

In [15]:
minTemps.take(3)

[('ITE00100554', 'TMIN', -148),
 ('EZE00100082', 'TMIN', -135),
 ('ITE00100554', 'TMIN', -125)]

In [14]:
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
stationTemps.take(3)

[('ITE00100554', -148), ('EZE00100082', -135), ('ITE00100554', -125)]

In [7]:
minTemps = stationTemps.reduceByKey(lambda x, y: min(x, y))

results = minTemps.collect()

for result in results:
    print(result[0] + '{:.2f}'.format(result[1]))

ITE00100554-148.00
EZE00100082-135.00
