In [None]:
# Install mrjob
!pip install mrjob

Import needed libraries

In [None]:
import pandas as pd
from mrjob.job import MRJob
from mrjob.step import MRStep


Read data

In [None]:
url='https://drive.google.com/file/d/16UTzifXtnQWoStzn22AyfEqGDOAGbZW_/view?usp=sharing'
url='https://drive.google.com/uc?id=' + url.split('/')[-2]
df = pd.read_csv(url)
df=df.drop(['Unnamed: 0'],axis=1)
df

In [None]:
df.to_csv('Weather_Data_Clean.csv')

# Objective #1

Find the most frequent weather in each city by counting occurrences of each city with each weather conditions.

In [None]:
%%file MapReduce_1.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class FrequentWeather(MRJob):
    def steps(self):
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
                MRStep(mapper = self.mapper2, reducer=self.reducer2)]

    def mapper1(self, _, row):
        row = row.split(',')
        city, weather = row[1], row[9]
        yield ((city, weather), 1)

    def reducer1(self, city_weather, count):
        yield (city_weather, sum(count))

    def mapper2(self, city_weather, count):
        city, weather  = city_weather
        yield (city, (weather, count))

    def reducer2(self, city, weather_count):
        yield (city, max(weather_count, key = lambda x : x[1]))

if __name__ == "__main__":
    FrequentWeather.run()


In [None]:
%%time
!python  MapReduce_1.py  Weather_Data_Clean.csv

# Objective #2

Find the most frequent wind speed in each city by counting occurrences of each city with each wind speed value.

In [None]:
%%file MapReduce_2.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class FrequentWind(MRJob):
    def steps(self):
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
                MRStep(mapper = self.mapper2, reducer=self.reducer2)]

    def mapper1(self, _, row):
        row = row.split(',')
        city, wind = row[1], row[11]
        yield ((city, wind), 1)

    def reducer1(self, city_wind, count):
        yield (city_wind, sum(count))

    def mapper2(self, city_wind, count):
        city, wind  = city_wind
        yield (city, (wind, count))

    def reducer2(self, city, wind_count):
        yield (city, max(wind_count, key = lambda x : x[1]))

if __name__ == "__main__":
    FrequentWind.run()


In [None]:
%%time
!python  MapReduce_2.py  Weather_Data_Clean.csv

# Objective #3

Find the most frequent temperature in each city by counting occurrences of each city with each temperature degree value.

In [None]:
%%file MapReduce_3.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class FrequentTemp(MRJob):
    def steps(self):
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
                MRStep(mapper = self.mapper2, reducer=self.reducer2)]

    def mapper1(self, _, row):
        row = row.split(',')
        city, temp = row[1], row[10]
        yield ((city,temp),1)

    def reducer1(self, city_temp, values):
        yield (city_temp,sum(values))

    def mapper2(self, city_temp, values):
        city, temp  = city_temp
        yield (city, (temp, values))

    def reducer2(self, city, temp_values):
        yield (city, max(temp_values, key = lambda x : x[1]))

if __name__ == "__main__":
    FrequentTemp.run()

In [None]:
%%time
!python  MapReduce_3.py  Weather_Data_Clean.csv

# Objective #4

Find the most frequent weather in each month by counting occurrences of each month with with each weather conditions.

In [None]:
%%file MapReduce_4.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class FrequentWeatherMonth(MRJob):
    def steps(self):
        return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
                MRStep(mapper = self.mapper2, reducer=self.reducer2)]

    def mapper1(self, _, row):
        row = row.split(',')
        month, weather = row[5], row[9]
        yield ((month, weather), 1)

    def reducer1(self, month_weather, count):
        yield (month_weather, sum(count))

    def mapper2(self, month_weather, count):
        month, weather  = month_weather
        yield (month, (weather, count))

    def reducer2(self, month, weather_count):
        yield (month, max(weather_count, key = lambda x : x[1]))

if __name__ == "__main__":
    FrequentWeatherMonth.run()


In [None]:
%%time
!python  MapReduce_4.py  Weather_Data_Clean.csv