# Tutorial: Taming Big Data With Apache Spark and Python - Hands On!
## Exercise 3.2 - Filtering Maximum Temperatures

### Setup

FindSpark

This will circumvent many issues with your system finding spark

In [1]:
import findspark
findspark.init('c:/users/andy/spark')

Load Libraries

In [2]:
from pyspark import SparkConf, SparkContext

Set the file path

In [3]:
data_folder = "C:/Users/Andy/Dropbox/FactoryFloor/Repositories/Tutorial_Udemy_SparkPython/Course_Resources/"

Create the Spark Context

In [4]:
# configure your Spark context; master node is local machine
conf = SparkConf().setMaster("local").setAppName("FriendsByAge")

# create a spark context object
sc = SparkContext(conf = conf)

### Load the Data

In [5]:
# path to file of interest
file_to_open = data_folder + "1800.csv"

# load the file; textFile breaks up a data file so that each row represents a single value in an RDD
lines = sc.textFile(file_to_open)

Inspect the RDD

In [6]:
lines.top(5)

['ITE00100554,18001231,TMIN,25,,,E,',
 'ITE00100554,18001231,TMAX,50,,,E,',
 'ITE00100554,18001230,TMIN,31,,,E,',
 'ITE00100554,18001230,TMAX,50,,,E,',
 'ITE00100554,18001229,TMIN,16,,,E,']

### Define a Parse Line Function

In [7]:
def parseLine(line):
    fields = line.split(',') # split on common
    stationID = fields[0] # first element 
    entryType = fields[2] # third element
    temperature = float(fields[3]) * 0.1 * (9.0/5.0) + 32.0 # fourth element; convert to F, cause we ain't scientists
    return (stationID, entryType, temperature)

### Transformations

Return key pair values of age and number of friends

In [8]:
parsedLines = lines.map(parseLine)

parsedLines.top(5)

[('ITE00100554', 'TMIN', 75.38),
 ('ITE00100554', 'TMIN', 74.84),
 ('ITE00100554', 'TMIN', 74.84),
 ('ITE00100554', 'TMIN', 74.30000000000001),
 ('ITE00100554', 'TMIN', 74.30000000000001)]

Filter by entryType = TMIN

In [9]:
maxTemps = parsedLines.filter(lambda x: "TMAX" in x[1])

Collect the stationID and temperature from filtered set. Basically remove 'TMIN' column.

In [10]:
stationTemps = maxTemps.map(lambda x: (x[0], x[2]))

stationTemps.top(5)

[('ITE00100554', 90.14000000000001),
 ('ITE00100554', 89.42),
 ('ITE00100554', 88.34),
 ('ITE00100554', 87.80000000000001),
 ('ITE00100554', 87.62)]

Aggregate by stationID taking the minimum temperature

In [11]:
maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

maxTemps.top(5)

[('ITE00100554', 90.14000000000001), ('EZE00100082', 90.14000000000001)]

### Actions

Print out the results

In [12]:
results = maxTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

ITE00100554	90.14F
EZE00100082	90.14F
