In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder.appName("AdultDataset").getOrCreate()

# Load the dataset as an RDD
rdd = spark.sparkContext.textFile("path/to/adult.data")

# Define column names based on the dataset's structure
columns = ["age", "workclass", "fnlwgt", "education", "education_num", "marital_status", 
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss", 
           "hours_per_week", "native_country", "income"]

# Convert the RDD into a DataFrame
def parse_line(line):
    fields = line.split(", ")
    return Row(
        age=int(fields[0]),
        workclass=fields[1],
        fnlwgt=int(fields[2]),
        education=fields[3],
        education_num=int(fields[4]),
        marital_status=fields[5],
        occupation=fields[6],
        relationship=fields[7],
        race=fields[8],
        sex=fields[9],
        capital_gain=int(fields[10]),
        capital_loss=int(fields[11]),
        hours_per_week=int(fields[12]),
        native_country=fields[13],
        income=fields[14]
    )

# Parse the RDD and create a DataFrame
parsed_rdd = rdd.map(parse_line)
df = spark.createDataFrame(parsed_rdd)

# Show the first few rows of the dataframe
df.show(5)

Py4JJavaError: An error occurred while calling o33.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/zhengbinheng/Desktop/MMD/path/to/adult.data
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Input path does not exist: file:/Users/zhengbinheng/Desktop/MMD/path/to/adult.data
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [None]:
from pyspark.sql.functions import col

# Compute the ratio of males for each marital_status
marital_status_gender_ratio = df.groupBy("marital_status").agg(
    (sum((col("sex") == "Male").cast("int")) / count("*")).alias("male_ratio")
)

marital_status_gender_ratio.show()

In [None]:
# Filter the dataframe for females with income > 50K
females_high_income = df.filter((df.sex == "Female") & (df.income == ">50K"))

# Compute the average hours_per_week for each native_country
average_hours_females_50K = females_high_income.groupBy("native_country").agg(
    avg("hours_per_week").alias("avg_hours_per_week")
)

average_hours_females_50K.show()

In [None]:
education_dict = {
    1: "Preschool", 2: "1st-4th", 3: "5th-6th", 4: "7th-8th", 5: "9th",
    6: "10th", 7: "11th", 8: "12th", 9: "HS-grad", 10: "Some-college",
    11: "Bachelors", 12: "Masters", 13: "Doctorate", 14: "Prof-school"
}

# Define a UDF to map education_num to education level
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def education_level(education_num):
    return education_dict.get(education_num, "Unknown")

education_udf = udf(education_level, StringType())

# Add a new column for education level
df_with_education = df.withColumn("education_level", education_udf(df.education_num))

# Get the highest and lowest education level for each income group
education_per_income = df_with_education.groupBy("income").agg(
    # For highest education, we get the max of education_num
    max("education_num").alias("highest_education_num"),
    # For lowest education, we get the min of education_num
    min("education_num").alias("lowest_education_num")
)

# Map the education_num back to education names using the dictionary
education_per_income = education_per_income.withColumn(
    "highest_education", udf(lambda x: education_dict.get(x, "Unknown"), StringType())(col("highest_education_num"))
).withColumn(
    "lowest_education", udf(lambda x: education_dict.get(x, "Unknown"), StringType())(col("lowest_education_num"))
)

education_per_income.select("income", "highest_education", "lowest_education").show()