In [None]:
!pip install pyspark

# Spark Session Initialization

In [None]:
from pyspark.sql import SparkSession

In [None]:
INPUT = 'drive/MyDrive/Google-Colab-Data/assignment1/survey_results_public.csv'

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("HW1") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
df = spark.read.csv(INPUT, header='true')
df = df.select("Country","Salary","SalaryType")

# Task 1

In [None]:
OUTPUT1 = 'drive/MyDrive/Google-Colab-Data/assignment1/task1.csv'

In [None]:
df_task1 = df.select("Country","Salary")
rdd1 = df_task1.rdd

In [None]:
def getValidCountry(rdd):
    return rdd.filter(lambda row:row.Salary != 'NA' and row.Salary != '0')
rdd_validCountry = getValidCountry(rdd1)

In [None]:
def getTotalCount(rdd):
    return rdd.count()
total = getTotalCount(rdd_validCountry)

In [None]:
def getCountryWiseCount(rdd):
    return rdd.map(lambda row:(row.Country,1)).reduceByKey(lambda a,b:a+b).sortByKey()
resultRDD1 = getCountryWiseCount(rdd_validCountry)

In [None]:
def toCSV1(L):
    return ','.join(str(l) for l in L)
ans1 = resultRDD1.map(toCSV1).collect()

In [None]:
with open(OUTPUT1,"w") as f:
    f.write(f'Total, {total}\n')
    for ans in ans1:
        f.write(f'{ans}\n')

# Task 2 *

In [None]:
def countItems(i,partition):
    count = 0
    for _ in partition:
        count += 1
    return i,count

In [None]:
new_rdd = rdd1.repartition(2)

In [None]:
new_rdd_ans = new_rdd.filter(lambda row:row.Salary != 'NA' and row.Salary != '0').map(lambda row:(row.Country,1)).reduceByKey(lambda a,b:a+b).sortByKey()

In [None]:
old_rdd_ans = rdd.filter(lambda row:row.Salary != 'NA' and row.Salary != '0').map(lambda row:(row.Country,1)).reduceByKey(lambda a,b:a+b).sortByKey()

In [None]:
new_rdd_ans.mapPartitionsWithIndex(countItems).collect()

[0, 76, 1, 84]

In [None]:
old_rdd_ans.mapPartitionsWithIndex(countItems).collect()

[0, 98, 1, 62]

# Task 3

In [None]:
OUTPUT3 = 'drive/MyDrive/Google-Colab-Data/assignment1/task3.csv'

In [None]:
df_task3 = df
rdd3 = df_task3.rdd
rdd3 = rdd3.filter(lambda row:row.Salary != 'NA' and row.Salary != '0')

In [None]:
def getCountryCount(rdd):
    return rdd.map(lambda row:(row.Country,1)).reduceByKey(lambda a,b:a+b).sortByKey()
rdd_countryCount = getCountryCount(rdd3)

In [None]:
def getAnnualSalaryRDD(rdd):
    def getSalary(row):
        salary = float(row.Salary.replace(',',''))
        if row.SalaryType == 'Monthly':
            return (row.Country, salary*12)
        elif row.SalaryType == 'Weekly':
            return (row.Country, salary*52)
        return (row.Country, salary)
    return rdd.map(getSalary)
rdd_annualSalary = getAnnualSalaryRDD(rdd3)

In [None]:
def getMinSalary(rdd):
    def formatSalary(X):
        return int(X)
    return rdd.reduceByKey(lambda a,b:min(a,b)).mapValues(formatSalary)
rdd_minSalary = getMinSalary(rdd_annualSalary)

In [None]:
def getMaxSalary(rdd):
    def formatSalary(X):
        return int(X)
    return rdd.reduceByKey(lambda a,b:max(a,b)).mapValues(formatSalary)
rdd_maxSalary = getMaxSalary(rdd_annualSalary)

In [None]:
def getAvgSalary(rdd):
    def average(X):
        return '{:.2f}'.format(X[0]/X[1])
    rdd = rdd.aggregateByKey((0,0),lambda acc, val:(acc[0]+val, acc[1]+1), lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
    rdd = rdd.mapValues(average)
    return rdd
rdd_avgSalary = getAvgSalary(rdd_annualSalary)

In [None]:
def getResult(rdd1,rdd2,rdd3,rdd4):
    rdd = rdd1.join(rdd2).join(rdd3).join(rdd4)
    def combineResult(X):
        return (X[0][0][0], X[0][0][1], X[0][1], X[1])
    return rdd.mapValues(combineResult).sortByKey()
resultRDD3 = getResult(rdd_countryCount,rdd_minSalary,rdd_maxSalary,rdd_avgSalary)

In [None]:
def toCSV3(T):
    S1 = T[0]
    S2 = ','.join(str(t) for t in T[1])
    return S1+','+S2
ans3 = resultRDD3.map(toCSV3).collect()

In [None]:
with open(OUTPUT3,"w") as f:
    for ans in ans3:
        f.write(f'{ans}\n')