# Mining Massive Datasets: Problem Set 1

## Exercise 4

### a)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("adults").getOrCreate()
sc = spark.sparkContext

In [3]:
adultsRDD = sc.textFile("adult.data") \
    .map(lambda line: line.split(",")) \
    .filter(lambda p: len(p) == 15 ) \
    .map(lambda p: (
            p[3].strip(),
            p[4].strip(),
            p[5].strip(),
            p[9].strip(),
            p[12].strip(),
            p[13].strip(),
            p[14].strip()
    ))

schema = StructType([
    StructField("education", StringType(), True),
    StructField("education_num", StringType(), True),
    StructField("marital_status", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("hours_per_week", StringType(), True),
    StructField("native_country", StringType(), True),
    StructField("income", StringType(), True)
])
dfAdults = spark.createDataFrame(adultsRDD, schema)
dfAdults.show(5)

+---------+-------------+------------------+------+--------------+--------------+------+
|education|education_num|    marital_status|   sex|hours_per_week|native_country|income|
+---------+-------------+------------------+------+--------------+--------------+------+
|Bachelors|           13|     Never-married|  Male|            40| United-States| <=50K|
|Bachelors|           13|Married-civ-spouse|  Male|            13| United-States| <=50K|
|  HS-grad|            9|          Divorced|  Male|            40| United-States| <=50K|
|     11th|            7|Married-civ-spouse|  Male|            40| United-States| <=50K|
|Bachelors|           13|Married-civ-spouse|Female|            40|          Cuba| <=50K|
+---------+-------------+------------------+------+--------------+--------------+------+
only showing top 5 rows



### b)

In [4]:
male_count = dfAdults.filter(dfAdults["sex"]=="Male").count()
df = dfAdults.filter(dfAdults["sex"]=="Male").groupBy("marital_status").count()
df.withColumn("ratio", df["count"]/male_count).show()

+--------------------+-----+--------------------+
|      marital_status|count|               ratio|
+--------------------+-----+--------------------+
|           Separated|  394|0.018081688848095457|
|       Never-married| 5916| 0.27150068838916935|
|Married-spouse-ab...|  213|0.009775126204681047|
|            Divorced| 1771| 0.08127581459385039|
|             Widowed|  168| 0.00770995869664984|
|   Married-AF-spouse|    9|4.130335016062414E-4|
|  Married-civ-spouse|13319|  0.6112436897659477|
+--------------------+-----+--------------------+



### c)

In [5]:
df = dfAdults.filter(dfAdults["income"]==">50K").filter(dfAdults["sex"]=="Female").withColumn("hours_per_week", dfAdults["hours_per_week"].cast(DoubleType())).groupBy("native_country").avg("hours_per_week")
df.show()

+--------------+-------------------+
|native_country|avg(hours_per_week)|
+--------------+-------------------+
|   Philippines| 40.083333333333336|
|       Germany|  36.57142857142857|
|        France|               45.0|
|        Greece|               65.0|
|        Taiwan|              36.75|
|     Nicaragua|               35.0|
|          Hong|               40.0|
|         India|               38.5|
|         China|               33.6|
|         Italy|               42.0|
|          Cuba|               23.0|
|         South| 56.666666666666664|
|          Iran|               40.0|
|       Ireland|               40.0|
|      Thailand|               50.0|
|          Laos|               40.0|
|   El-Salvador|               40.0|
|        Mexico|               35.0|
|      Honduras|               60.0|
|    Yugoslavia|               40.0|
+--------------+-------------------+
only showing top 20 rows



### d)

In [14]:
education_dict = {
    1.0 : "Preschool",
    2.0 : "1st-4th",
    3.0 : "5th-6th",
    4.0 : "7th-8th",
    5.0 : "9th",
    6.0 : "10th",
    7.0 : "11th",
    8.0 : "12th",
    9.0 : "HS-grad",
    10.0 : "-",
    11.0 : "-",
    12.0 : "-",
    13.0 : "-",
    14.0 : "-",
    15.0 : "-",
    16.0 : "Doctorate"
}
education_transform = udf(lambda x : education_dict[x], StringType())
df = dfAdults.withColumn("education_num", dfAdults["education_num"].cast(DoubleType())).groupBy("income").max("education_num")
df = df.withColumn("max(education_num)", education_transform(df["max(education_num)"]))
df.show()

+------+------------------+
|income|max(education_num)|
+------+------------------+
| <=50K|         Doctorate|
|  >50K|         Doctorate|
+------+------------------+



In [15]:
df2 = dfAdults.withColumn("education_num", dfAdults["education_num"].cast(DoubleType())).groupBy("income").min("education_num")
df2 = df2.withColumn("min(education_num)", education_transform(df2["min(education_num)"]))
df2.show()

+------+------------------+
|income|min(education_num)|
+------+------------------+
| <=50K|         Preschool|
|  >50K|           1st-4th|
+------+------------------+



In [17]:
df3 = df.join(df2, "income", "outer")
df3.show()

+------+------------------+------------------+
|income|max(education_num)|min(education_num)|
+------+------------------+------------------+
| <=50K|         Doctorate|         Preschool|
|  >50K|         Doctorate|           1st-4th|
+------+------------------+------------------+

