In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, count, avg, trim
import pandas as pd
import os

In [2]:
DATA_FOLDER = "data"
# source https://www.statista.com/statistics/242030/marital-status-of-the-us-population-by-sex/
# the first value is male and the second is for female
MARITAL_STATUS_BY_GENDER = [
    ["Never-married", 47.35, 41.81],
    ["Married-AF-spouse", 67.54, 68.33],
    ["Widowed", 3.58, 11.61],
    ["Divorced", 10.82, 15.09]
]
MARITAL_STATUS_BY_GENDER_COLUMNS = ["marital_status_statistics", "male", "female"]

In [3]:




def read_data(spark: SparkSession) -> DataFrame:
    """
    read data based on the given schema; this is much faster than spark determining the schema
    """
    
    # Define the schema for the dataset
    schema = StructType([
        StructField("age", IntegerType(), True),
        StructField("workclass", StringType(), True),
        StructField("fnlwgt", FloatType(), True),
        StructField("education", StringType(), True),
        StructField("education_num", FloatType(), True),
        StructField("marital_status", StringType(), True),
        StructField("occupation", StringType(), True),
        StructField("relationship", StringType(), True),
        StructField("race", StringType(), True),
        StructField("sex", StringType(), True),
        StructField("capital_gain", FloatType(), True),
        StructField("capital_loss", FloatType(), True),
        StructField("hours_per_week", FloatType(), True),
        StructField("native_country", StringType(), True),
        StructField("income", StringType(), True)
    ])

    # Read the dataset
    data = spark.read \
        .schema(schema) \
        .option("header", "false") \
        .option("inferSchema", "false") \
        .csv(os.path.join(DATA_FOLDER,"*.csv")) 

    data = data.repartition(8)

    float_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, FloatType)]
    for v in float_columns:
        data = data.withColumn(v, data[v].cast(IntegerType()))

    # Get the names of all StringType columns
    string_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

    # Remove leading and trailing spaces in all string columns
    for column in string_columns:
        data = data.withColumn(column, trim(data[column]))

    # Show the first 5 rows of the dataset
    data.show(5)

    return data


In [4]:

def missing_values(data: DataFrame) -> DataFrame:
    """
    count the number of samples with missing values for each row
    remove such samples
    """

    missing_values = data.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in data.columns])

    # Show the missing values count per column
    missing_values.show()

    # Get the number of samples in the DataFrame
    num_samples = data.count()

    # Print the number of samples
    print("Number of samples:", num_samples)  

    data = data.dropna()      
    
    return data



In [5]:
def feature_engineering(data: DataFrame) -> DataFrame:
    """
    calculate the product of each pair of integer features
    """

    # Create columns consisting of all products of columns of type IntegerType
    integer_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, IntegerType)]
    for i, col1 in enumerate(integer_columns):
        for col2 in integer_columns[i:]:
            product_col_name = f"{col1}_x_{col2}"
            data = data.withColumn(product_col_name, col(col1) * col(col2))

    data.show(5)

    return data

def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)

def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')

def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)

    
    
    data.show(5)
    data.write.format('csv').option('header', 'true').mode('overwrite').save('saved.csv')
    #spark.stop()
    #return data


In [6]:
def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)



In [7]:
def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')



In [18]:
def age(data: DataFrame):
    selected_df = data.filter((data['age'] >= 30) & (data['age'] <= 50))

    # Convert the selected PySpark DataFrame to a Pandas DataFrame
    pandas_df = selected_df.toPandas()

    # Print out summary statistics using .describe()
    print(pandas_df.describe())
    return pandas_df

In [9]:
spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

data = read_data(spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/15 00:27:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|   education|education_num|    marital_status|     occupation|relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
| 45|  Private|191098|   Bachelors|           13|Married-civ-spouse| Prof-specialty|     Husband|Asian-Pac-Islander|  Male|           0|           0|            40|         China| <=50K|
| 46|  Private|250821| Prof-school|           15|          Divorced|Farming-fishing|   Unmarried|             White|  Male|           0|           0|            48| United-States| <=50K|
| 53|  Private|242859|Some-college|           10|         Separat

                                                                                

In [10]:
data = read_data(spark)



+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|   education|education_num|    marital_status|     occupation|relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
| 45|  Private|191098|   Bachelors|           13|Married-civ-spouse| Prof-specialty|     Husband|Asian-Pac-Islander|  Male|           0|           0|            40|         China| <=50K|
| 46|  Private|250821| Prof-school|           15|          Divorced|Farming-fishing|   Unmarried|             White|  Male|           0|           0|            48| United-States| <=50K|
| 53|  Private|242859|Some-college|           10|         Separat

                                                                                

In [11]:
data = missing_values(data)

                                                                                

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  1|        0|     1|        0|            1|             0|         0|           0|   0|  0|           1|           1|             1|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+

Number of samples: 48952


In [12]:
data = feature_engineering(data)

24/05/15 00:27:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------------------------+---------------------------+-----------------------------+---------------------------+-----------------------------+-------------------------------+
|age|workclass|fnlwgt|   education|education_num|    marital_status|     occupation|relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|age_x_age|age_x_fnlwgt|age_x_education_num|age_x_capital_gain|age_x_capital_loss|age_x_hours_per_week|fnlwgt_x_fnlwgt|fnlwg

In [13]:
bias_marital_status(data)

+--------------------+--------------------+
|      marital_status|average_capital_gain|
+--------------------+--------------------+
|           Separated|   591.5923303834808|
|       Never-married|  450.06171494953895|
|Married-spouse-ab...|   743.4766917293233|
|            Divorced|   703.0880898275632|
|             Widowed|   534.2326732673267|
|   Married-AF-spouse|   507.3235294117647|
|  Married-civ-spouse|   1773.063758249969|
+--------------------+--------------------+

+---+---------+------+------------+-------------+--------------+---------------+--------------+-----+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+----------------

In [14]:
data = join_with_US_gender(spark, data)

In [16]:
def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate() 

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)
    data = age(data)

    
    
    spark.stop()


In [17]:
main()

+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|   education|education_num|    marital_status|     occupation|relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+------+
| 45|  Private|191098|   Bachelors|           13|Married-civ-spouse| Prof-specialty|     Husband|Asian-Pac-Islander|  Male|           0|           0|            40|         China| <=50K|
| 46|  Private|250821| Prof-school|           15|          Divorced|Farming-fishing|   Unmarried|             White|  Male|           0|           0|            48| United-States| <=50K|
| 53|  Private|242859|Some-college|           10|         Separat

                                                                                

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  1|        0|     1|        0|            1|             0|         0|           0|   0|  0|           1|           1|             1|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+

Number of samples: 48952
+---+---------+------+------------+-------------+------------------+---------------+------------+------------------+------+------------+------------+--------------+--------------+-----

                                                                                

                age        fnlwgt  education_num  capital_gain  capital_loss  \
count  32780.000000  3.278000e+04   32780.000000  32780.000000  32780.000000   
mean      39.298109  1.895854e+05      10.434655   1296.275595    102.861928   
std        5.894915  1.068734e+05       2.522198   8181.468310    432.955776   
min       30.000000  1.487800e+04       1.000000      0.000000      0.000000   
25%       34.000000  1.169270e+05       9.000000      0.000000      0.000000   
50%       39.000000  1.778765e+05      10.000000      0.000000      0.000000   
75%       44.000000  2.364150e+05      13.000000      0.000000      0.000000   
max       50.000000  1.455435e+06      16.000000  99999.000000   3900.000000   

       hours_per_week     age_x_age  age_x_fnlwgt  age_x_education_num  \
count    32780.000000  32780.000000  3.278000e+04          32780.00000   
mean        43.261440   1579.090299  7.420700e+06            410.66504   
std         10.884961    469.090380  4.274318e+06        

AttributeError: 'NoneType' object has no attribute 'show'