# Coursework 1

## Import Libraries

In [1]:
import pandas as pd
import numpy as np
from functools import reduce

## Verification Data

Process the data using library functions for verification with the custom Hadoop implementation.

### Data Load

In [2]:
data_cols = [0, 1, 2, 8]
data_path = 'data/raw/200707hourly.txt'
df_weather = pd.read_csv(data_path, usecols=data_cols)

# remove spaces from the column names
df_weather.columns = df_weather.columns.str.replace(' ', '')

df_weather.shape

(785806, 4)

In [3]:
df_weather.sort_values(by='YearMonthDay', inplace=True)
df_weather.reset_index(drop=True, inplace=True)
df_weather.head()

Unnamed: 0,WbanNumber,YearMonthDay,Time,DryBulbTemp
0,3011,20070701,50,57
1,63846,20070701,159,-
2,63846,20070701,240,-
3,63846,20070701,359,-
4,63846,20070701,439,-


In [4]:
df_weather_clean = df_weather.copy()

# remove temperature values that are all spaces
df_weather_clean.DryBulbTemp = df_weather_clean.DryBulbTemp.str.strip().replace('', '-')

# replace '-' with empty NaN
df_weather_clean = df_weather_clean.replace('-', np.NaN)
df_weather_clean[df_weather_clean.DryBulbTemp.isna()]

Unnamed: 0,WbanNumber,YearMonthDay,Time,DryBulbTemp
1,63846,20070701,159,
2,63846,20070701,240,
3,63846,20070701,359,
4,63846,20070701,439,
5,63846,20070701,540,
...,...,...,...,...
785799,26546,20070719,249,
785800,26546,20070719,47,
785801,41414,20070719,1344,
785802,41414,20070719,1325,


#### Remove Null Values

In [5]:
# drop null values
df_weather_clean = df_weather_clean.dropna()
df_weather_clean.YearMonthDay = df_weather_clean.YearMonthDay.astype('str')
df_weather_clean.DryBulbTemp = df_weather_clean.DryBulbTemp.astype('int')
df_weather_clean.shape

(609806, 4)

In [6]:
df_weather_clean.describe()

Unnamed: 0,WbanNumber,Time,DryBulbTemp
count,609806.0,609806.0,609806.0
mean,37239.640819,1197.046356,73.879857
std,33913.47708,688.94317,11.203061
min,3011.0,10.0,3.0
25%,12897.0,556.0,66.0
50%,23187.0,1156.0,74.0
75%,54831.0,1756.0,81.0
max,94999.0,2359.0,133.0


#### Group By Day

In [7]:
df_weather_clean.query('YearMonthDay == "20070401"').DryBulbTemp.min()

nan

In [8]:
df_weather_grouped = df_weather_clean[['YearMonthDay', 'DryBulbTemp']].groupby(by='YearMonthDay')
df_weather_grouped
df_weather_grouped.agg(['max', 'min', 'mean', 'median', 'var']).reset_index()

Unnamed: 0_level_0,YearMonthDay,DryBulbTemp,DryBulbTemp,DryBulbTemp,DryBulbTemp,DryBulbTemp
Unnamed: 0_level_1,Unnamed: 1_level_1,max,min,mean,median,var
0,20070701,115,32,70.718628,72.0,136.160669
1,20070702,115,32,70.939578,72.0,130.522957
2,20070703,115,33,72.774852,73.0,116.107109
3,20070704,120,29,73.916241,74.0,113.224549
4,20070705,120,33,74.60091,74.0,117.130201
5,20070706,133,38,75.045506,75.0,125.278083
6,20070707,116,32,75.892737,77.0,132.609187
7,20070708,111,36,76.51774,77.0,124.487372
8,20070709,110,36,75.992811,77.0,120.985716
9,20070710,115,35,74.959891,75.0,118.159122


## Mapper

In [9]:
class Mapper():
    """
    This class implements the logic for the mapping functionality.
    """
    def map_input(self, input_stream) -> list:
        """
        Map each item in the input stream to the output. The output is written to stdout.

        Parameters
        ----------
        input_stream : iterable
            The input stream to process.
        """
        output = []

        for item in input_stream:
            mapped_item = self.map(item)
            if mapped_item:
                print(mapped_item)

        return output

    def map(self, item:str) -> str:
        """
        Map the input string to the day the observations if for (key), and the observation temperature (value).

        Parameters
        ----------
        item : str
            The observation data to perform the mapping for.

        Returns
        -------
        output : dict
            A comma seperated string containing the `day` in the format YYYYMMDD and the 'temperature` as an integer.
        """
        # ignore the file headers
        if item.startswith('Wban Number'):
            return None

        # ignore empty lines
        if item == '\n':
            return None

        # tokenize the input line
        tokens = item.split(',')

        # get the day value
        day_value = tokens[1].strip()

        # get the temperature
        temperature_value = tokens[8].strip()
        
        # do not process empty temperature values
        if temperature_value == '-':
            return None
        elif temperature_value == '':
            return None
        else:
            temperature_value = int(temperature_value)

        # return the key and value as a comma seperated string
        return '%s,%s' % (day_value, temperature_value)

# test the mapper with a text file
with open('data/raw/sample.txt') as input_file:
    mapper = Mapper()
    mapper.map_input(input_file)

20070401,32
20070401,32
20070401,32
20070403,34
20070401,34
20070401,32
20070402,34
20070401,37
20070401,41
20070401,45
20070401,50
20070401,52
20070402,55
20070403,-54
20070402,54
20070402,54
20070402,52
20070402,50
20070402,46
20070402,45


In [10]:
sample_path = data_path #'data/raw/sample.txt
with open(sample_path) as input_file:
    mapper = Mapper()
    output = list(map(mapper.map, input_file))

output[:10]

[None,
 '20070701,57',
 '20070701,55',
 '20070701,54',
 '20070701,54',
 '20070701,54',
 '20070701,54',
 '20070701,61',
 '20070701,68',
 '20070701,72']

## Reducer

In [11]:
class ReducerValues():
    """
    Helper class to hold the values calculated by the reducer
    """
    def __init__(self, day:str) -> None:
        self.values = []
        self.day = day
        self.max = None
        self.min = None
        self.sum = 0
        self.squared_sum = 0
        self.n = 0

    def add_value(self, value:int) -> None:
        """
        Add a value to the value collection
        """
        # add the value to the collection
        self.values.append(value)

        # update the max and min values
        if self.max is None:
            # initialize the max and min values
            self.max = value
            self.min = value
        else:
            if value > self.max: self.max = value
            if value < self.min: self.min = value

        # update the "running" values
        self.n += 1
        self.sum += value
        self.squared_sum += value * value

    def get_median(self) -> float:
        """
        Get the median value in the values collections.

        Returns
        -------
        median : float
        """
        self.values.sort()
        middle = int(self.n / 2)

        if self.n % 2 == 0:
            return (self.values[middle - 1] + self.values[middle]) / 2.0
        else:
            return self.values[middle]

    def get_variance(self, population:bool=True) -> float:
        """
        Calculate and return the variance of the values collections.

        Parameters
        ----------
        population : bool
            Should the formula for the sample or population variance be used.
        """
        mean = self.sum / float(self.n)

        # please not that I personally think the sample variation should be calculated instead
        #    but the formula as per the course work specification is used (population=True).
        if population:
            return  1.0 / self.n * (self.squared_sum - self.n * mean*mean)
        else:
            return  1.0 / (self.n - 1) * (self.squared_sum - self.n * mean*mean)
        

    def print_output(self, population:bool=True) -> None:
        """
        Print the day values to stdout.

        Parameters
        ----------
        population : bool
            Should the formula for the sample or population variance be used.        
        """
        # calculate the mean
        mean = self.sum / float(self.n)

        # print the output
        print('%s,%d,%d,%.6f,%.1f,%.6f' % (
            self.day, 
            self.max, 
            self.min,
            mean,
            self.get_median(),
            self.get_variance(population)))

class Reducer():
    """
    This class contains the logic to summarize the temperature observations by day.
    """
    def reduce_input(self, input_stream, population:bool=True) -> None:
        """
        Reduce the output from the mapper to calculate the max, min, mean, median, and variance per day.

        Parameters
        ----------
        input_stream : iterable
            The input stream to process.
        population : bool
            Should the formula for the sample or population variance be used.
        """
        current_day = ReducerValues(None)

        for item in input_stream:
            if item:
                # get the day and temperature value
                day_value, temperature_value = item.split(',')
                temperature_value = int(temperature_value)

                if current_day.day == day_value:
                    current_day.add_value(temperature_value)
                else:
                    # if the current day exist show the output
                    if current_day.day:
                        current_day.print_output(population)

                    # the current day have changed, create the new day
                    current_day = ReducerValues(day_value)
                    current_day.add_value(temperature_value)

        # print the last day processed
        if current_day.day == day_value:
            current_day.print_output(population)

with open(data_path) as input_file:
    mapper = Mapper()
    reducer = Reducer()
    
    # perform the mapping task
    mapper_output = map(mapper.map, input_file)

    # remove null values and sort the mapped output
    mapper_output = list(filter(None, mapper_output))
    mapper_output.sort()

    # perform the reduce function
    reducer.reduce_input(mapper_output, population=False)

20070701,115,32,70.718628,72.0,136.160669
20070702,115,32,70.939578,72.0,130.522957
20070703,115,33,72.774852,73.0,116.107109
20070704,120,29,73.916241,74.0,113.224549
20070705,120,33,74.600910,74.0,117.130201
20070706,133,38,75.045506,75.0,125.278083
20070707,116,32,75.892737,77.0,132.609187
20070708,111,36,76.517740,77.0,124.487372
20070709,110,36,75.992811,77.0,120.985716
20070710,115,35,74.959891,75.0,118.159122
20070711,115,33,72.972403,73.0,119.794278
20070712,111,3,72.124993,73.0,127.922534
20070713,111,35,71.838316,72.0,127.548649
20070714,110,40,73.139704,73.0,120.412153
20070715,111,36,73.859110,73.0,124.772278
20070716,114,38,74.339734,74.0,125.147510
20070717,114,39,74.879950,75.0,116.844285
20070718,113,39,75.746743,75.0,107.504686
20070719,90,74,80.974026,81.0,16.709843
