# preparing data

In [1]:
!pip install mrjob


Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.7/439.6 kB[0m [31m2.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mrjob
Successfully installed mrjob-0.7.4


In [3]:
import zipfile

# Specify the path to your zip file
zip_file_path = '/content/test_incomes.zip'

# Specify the directory where you want to extract the contents
extracted_dir = '/content'

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_dir)

# Print the path to the extracted directory
print("Files extracted to:", extracted_dir)


Files extracted to: /content


# file 1: total incomes

In [5]:
# total_incomes.py
total_incomes_code = """
from mrjob.job import MRJob

class TotalIncomes(MRJob):

    def mapper(self, _, line):
        income = float(line.strip())
        yield "total", income

    def reducer(self, key, values):
        total = sum(values)
        yield key, total

if __name__ == '__main__':
    TotalIncomes.run()
"""


In [6]:
# Write code to files
with open("total_incomes.py", "w") as f:
    f.write(total_incomes_code)

## train

In [None]:
!python total_incomes.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/total_incomes.root.20231212.174216.009447
Running step 1 of 1...
job output is in /tmp/total_incomes.root.20231212.174216.009447/output
Streaming final output from /tmp/total_incomes.root.20231212.174216.009447/output...
"total"	63168.0
Removing temp directory /tmp/total_incomes.root.20231212.174216.009447...


## test

In [None]:
!python total_incomes.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/total_incomes.root.20231212.163855.875691
Running step 1 of 1...
job output is in /tmp/total_incomes.root.20231212.163855.875691/output
Streaming final output from /tmp/total_incomes.root.20231212.163855.875691/output...
"total"	210015551664.0
Removing temp directory /tmp/total_incomes.root.20231212.163855.875691...



*   Total Incomes (Trial): 63168

*   Total Incomes (Test): 210015551664




# file 2: mean

In [7]:
mean_code = """
from mrjob.job import MRJob

class Mean(MRJob):

    def mapper(self, _, line):
        # Check if the line is a digit and yield key-value pairs
        if line.isdigit():
            yield "mean", int(line)

    def reducer(self, key, values):
        # Calculate the mean
        total = 0
        count = 0
        for value in values:
            total += value
            count += 1
        mean = total / count if count > 0 else 0
        yield key, mean

if __name__ == '__main__':
    Mean.run()

"""

In [8]:
with open("mean.py", "w") as f:
    f.write(mean_code)


## train

In [None]:
!python mean.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mean.root.20231212.165520.991522
Running step 1 of 1...
job output is in /tmp/mean.root.20231212.165520.991522/output
Streaming final output from /tmp/mean.root.20231212.165520.991522/output...
"mean"	63.168
Removing temp directory /tmp/mean.root.20231212.165520.991522...


## test

In [None]:
!python mean.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mean.root.20231212.165855.892467
Running step 1 of 1...
job output is in /tmp/mean.root.20231212.165855.892467/output
Streaming final output from /tmp/mean.root.20231212.165855.892467/output...
"mean"	21001.5551664
Removing temp directory /tmp/mean.root.20231212.165855.892467...



*   Mean (Trial): 63.168

*   Mean (Test): 21001.5551664




# file 3: generalized_mean

In [14]:
# generalized_mean.py
generalized_mean_code = """
from mrjob.job import MRJob
from mrjob.step import MRStep

class GeneralizedMeanJob(MRJob):

    def mapper(self, _, line):
        income = float(line)
        yield None, income

    def reducer(self, _, incomes):
        # Replace 'p' with the desired order of the generalized mean
        p = 2  # You can change this value to calculate for different orders

        # Calculate the mean
        total = 0
        count = 0
        for value in incomes:
            total += value**p
            count += 1
        mean=(total/count)**(1/p)

        yield None, mean

if __name__ == '__main__':
    GeneralizedMeanJob.run()

"""


In [15]:
with open("generalized_mean.py", "w") as f:
    f.write(generalized_mean_code)


## train

In [None]:
import pandas as pd

def calculate_generalized_mean(data, p):
    n = len(data)
    mean = (sum(x**p for x in data) / n)**(1/p)
    return mean

def main():
    # Load the data
    # Replace 'trial_incomes.csv' with 'test_incomes.csv' for the larger dataset
    data = pd.read_csv('trial_incomes.csv', header=None, names=['income'])

    # Extract the income values from the DataFrame
    incomes = data['income'].tolist()

    # Replace 'p' with the desired order of the generalized mean
    p = 2  # You can change this value to calculate for different orders

    # Calculate the generalized mean
    generalized_mean = calculate_generalized_mean(incomes, p)

    # Print the result
    print(f"Generalized Mean (p={p}): {generalized_mean}")

if __name__ == "__main__":
    main()


Generalized Mean (p=2): 665.8561901792308


In [16]:
!python generalized_mean.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/generalized_mean.root.20231213.104122.596869
Running step 1 of 1...
job output is in /tmp/generalized_mean.root.20231213.104122.596869/output
Streaming final output from /tmp/generalized_mean.root.20231213.104122.596869/output...
null	665.8561901792308
Removing temp directory /tmp/generalized_mean.root.20231213.104122.596869...


# testing result for evaluation (using pandas and using mapreduce)

## test

In [None]:
import pandas as pd

def calculate_generalized_mean(data, p):
    n = len(data)
    mean = (sum(x**p for x in data) / n)**(1/p)
    return mean

def main():
    # Load the data
    # Replace 'trial_incomes.csv' with 'test_incomes.csv' for the larger dataset
    data = pd.read_csv('test_incomes.csv', header=None, names=['income'])

    # Extract the income values from the DataFrame
    incomes = data['income'].tolist()

    # Replace 'p' with the desired order of the generalized mean
    p = 2  # You can change this value to calculate for different orders

    # Calculate the generalized mean
    generalized_mean = calculate_generalized_mean(incomes, p)

    # Print the result
    print(f"Generalized Mean (p={p}): {generalized_mean}")

if __name__ == "__main__":
    main()


Generalized Mean (p=2): 52883028.3630099


In [None]:
!python generalized_mean.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/generalized_mean.root.20231212.173044.081014
Running step 1 of 1...
job output is in /tmp/generalized_mean.root.20231212.173044.081014/output
Streaming final output from /tmp/generalized_mean.root.20231212.173044.081014/output...
null	52883028.36300636
Removing temp directory /tmp/generalized_mean.root.20231212.173044.081014...


# file 4:  Max

In [42]:
# maximum.py
maximum_code = """
from mrjob.job import MRJob

class Maximum(MRJob):

    def mapper(self, _, line):
        income = float(line.strip())
        yield "max", income

    def reducer(self, key, values):
        max=0
        for value in values:
          if value>max:
            max=value
        max_income=max
        #method 2
        #max_income = max(values)
        yield key, max_income

if __name__ == '__main__':
    Maximum.run()
"""

In [43]:
with open("maximum.py", "w") as f:
    f.write(maximum_code)

## train

In [44]:
!python maximum.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/maximum.root.20231213.112012.325416
Running step 1 of 1...
job output is in /tmp/maximum.root.20231213.112012.325416/output
Streaming final output from /tmp/maximum.root.20231213.112012.325416/output...
"max"	13473.0
Removing temp directory /tmp/maximum.root.20231213.112012.325416...


## test

In [45]:
!python maximum.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/maximum.root.20231213.112014.784078
Running step 1 of 1...
job output is in /tmp/maximum.root.20231213.112014.784078/output
Streaming final output from /tmp/maximum.root.20231213.112014.784078/output...
"max"	164016448792.0
Removing temp directory /tmp/maximum.root.20231213.112014.784078...





*   Maximum (Trial): 13473
*   Maximum (Test): 164016448792



# file 5: Min

In [46]:
# minimum.py
minimum_code = """
from mrjob.job import MRJob

class Minimum(MRJob):

    def mapper(self, _, line):
        income = float(line.strip())
        yield "min", income

    def reducer(self, key, values):
        min=1e20
        for value in values:
          if value<min:
            min=value
        min_income=min
        # method 2
        #min_income = min(values)
        yield key, min_income

if __name__ == '__main__':
    Minimum.run()
"""

In [47]:
with open("minimum.py", "w") as f:
    f.write(minimum_code)

## train

In [None]:
!python minimum.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/minimum.root.20231212.164611.063312
Running step 1 of 1...
job output is in /tmp/minimum.root.20231212.164611.063312/output
Streaming final output from /tmp/minimum.root.20231212.164611.063312/output...
"min"	1.0
Removing temp directory /tmp/minimum.root.20231212.164611.063312...


## test

In [None]:
!python minimum.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/minimum.root.20231212.164613.687775
Running step 1 of 1...
job output is in /tmp/minimum.root.20231212.164613.687775/output
Streaming final output from /tmp/minimum.root.20231212.164613.687775/output...
"min"	1.0
Removing temp directory /tmp/minimum.root.20231212.164613.687775...





*   Minimum (Trial): 1
*   Minimum (Test): 1



# file 6: sd

In [56]:
# standard_deviation.py
standard_deviation_code = """
from mrjob.job import MRJob
from mrjob.step import MRStep
import math

class StandardDeviation(MRJob):

    def mapper_first(self, key, line):
        # Split the line and extract the values
        a = line.split(',')
        val = float(a[0])
        yield None, val

    def reducer_first(self, _, values):
        arr = list(values)
        n = len(arr)

        # Calculate the mean
        mean = sum(arr) / n

        # Calculate the sum of squared differences
        sum_squared_diff = sum((v - mean) ** 2 for v in arr)

        # Calculate the standard deviation
        std_dev = math.sqrt(sum_squared_diff / n)

        yield None, std_dev

    def steps(self):
        return [
            MRStep(
                mapper=self.mapper_first,
                reducer=self.reducer_first
            )
        ]

if __name__ == '__main__':
    StandardDeviation.run()

"""


In [66]:
# std_deviation.py
std_deviation_code = """
# std_deviation.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class StandardDeviationJob(MRJob):

    def mapper(self, _, line):
        income = float(line)
        yield None, income

    def reducer(self, _, incomes):
        # Calculate mean
        total = 0
        count = 0
        for value in incomes:
            total += value
            count += 1
        mean = total / count

        # Calculate the sum of squared differences from the mean
        sum_squared_diff = sum((x - mean) ** 2 for x in incomes)

        # Calculate the standard deviation
        std_deviation = (sum_squared_diff / count) ** 0.5

        yield None, std_deviation

if __name__ == '__main__':
    StandardDeviationJob.run()
"""

In [67]:
with open("standard_deviation.py", "w") as f:
    f.write(standard_deviation_code)

## train

In [65]:
!python standard_deviation.py trial_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/standard_deviation.root.20231213.113642.222033
Running step 1 of 1...
job output is in /tmp/standard_deviation.root.20231213.113642.222033/output
Streaming final output from /tmp/standard_deviation.root.20231213.113642.222033/output...
null	662.853128359518
Removing temp directory /tmp/standard_deviation.root.20231213.113642.222033...


## test

In [None]:
!python standard_deviation.py test_incomes.csv


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/standard_deviation.root.20231212.172048.941504
Running step 1 of 1...
job output is in /tmp/standard_deviation.root.20231212.172048.941504/output
Streaming final output from /tmp/standard_deviation.root.20231212.172048.941504/output...
null	52883024.19256389
Removing temp directory /tmp/standard_deviation.root.20231212.172048.941504...




*   Standard Deviation (Trial): 663.1848037009443

*   Standard Deviation (Test): 52883026.8369639



# solving without using mapreduce to evaluate

In [None]:
import pandas as pd

# Load the data
trial_df = pd.read_csv("trial_incomes.csv", header=None, names=['Income'])
test_df = pd.read_csv("test_incomes.csv", header=None, names=['Income'])

# Function to calculate generalized mean (you need to replace this with the actual formula)
def generalized_mean(data):
    # Replace this with the actual formula for generalized mean
    return data.mean()

# 1. Total Incomes
total_incomes_trial = trial_df['Income'].sum()
total_incomes_test = test_df['Income'].sum()

# 2. Mean
mean_trial = trial_df['Income'].mean()
mean_test = test_df['Income'].mean()

# 3. Generalized Mean
generalized_mean_trial = generalized_mean(trial_df['Income'])
generalized_mean_test = generalized_mean(test_df['Income'])

# 4. Maximum
max_income_trial = trial_df['Income'].max()
max_income_test = test_df['Income'].max()

# 5. Minimum
min_income_trial = trial_df['Income'].min()
min_income_test = test_df['Income'].min()

# 6. Standard Deviation
std_dev_trial = trial_df['Income'].std()
std_dev_test = test_df['Income'].std()

# Print or use the results as needed
print("Total Incomes (Trial):", total_incomes_trial)
print("Total Incomes (Test):", total_incomes_test)

print("Mean (Trial):", mean_trial)
print("Mean (Test):", mean_test)

print("Generalized Mean (Trial):", generalized_mean_trial)
print("Generalized Mean (Test):", generalized_mean_test)

print("Maximum (Trial):", max_income_trial)
print("Maximum (Test):", max_income_test)

print("Minimum (Trial):", min_income_trial)
print("Minimum (Test):", min_income_test)

print("Standard Deviation (Trial):", std_dev_trial)
print("Standard Deviation (Test):", std_dev_test)


Total Incomes (Trial): 63168
Total Incomes (Test): 210015551664
Mean (Trial): 63.168
Mean (Test): 21001.5551664
Generalized Mean (Trial): 63.168
Generalized Mean (Test): 21001.5551664
Maximum (Trial): 13473
Maximum (Test): 164016448792
Minimum (Trial): 1
Minimum (Test): 1
Standard Deviation (Trial): 663.1848037009443
Standard Deviation (Test): 52883026.8369639
