In [16]:
import pyspark
print(pyspark.__version__)

3.5.4


In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark_Tutorial").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x106279e90>


25/02/22 15:54:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [20]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample data
data = [
    (1, 2.0, 3.0, 5.0), (2, 3.0, 4.0, 7.0), (3, 4.0, 5.0, 9.0), (4, 5.0, 6.0, 11.0), (5, 6.0, 7.0, 13.0),
    (6, 7.0, 8.0, 15.0), (7, 8.0, 9.0, 17.0), (8, 9.0, 10.0, 19.0), (9, 10.0, 11.0, 21.0), (10, 11.0, 12.0, 23.0),
    (11, 12.0, 13.0, 25.0), (12, 13.0, 14.0, 27.0), (13, 14.0, 15.0, 29.0), (14, 15.0, 16.0, 31.0), (15, 16.0, 17.0, 33.0),
    (16, 17.0, 18.0, 35.0), (17, 18.0, 19.0, 37.0), (18, 19.0, 20.0, 39.0), (19, 20.0, 21.0, 41.0), (20, 21.0, 22.0, 43.0)
]
df = spark.createDataFrame(data, ["ID", "Feature1", "Feature2", "Label"])

# Feature engineering
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="Features")
df = assembler.transform(df).select("Features", "Label")

# Train-Test Split
train_df, test_df = df.randomSplit([0.8, 0.2])

# Define and train the model
lr = LinearRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(train_df)

# Make predictions
predictions = model.transform(test_df)
predictions.show()


25/02/22 15:54:02 WARN Instrumentation: [12515e81] regParam is zero, which might cause numerical instability and overfitting.
25/02/22 15:54:02 WARN Instrumentation: [12515e81] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


+-----------+-----+------------------+
|   Features|Label|        prediction|
+-----------+-----+------------------+
|  [5.0,6.0]| 11.0|11.000000000000002|
| [9.0,10.0]| 19.0|              19.0|
|[16.0,17.0]| 33.0|              33.0|
|[17.0,18.0]| 35.0|              35.0|
+-----------+-----+------------------+



In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, count, avg, trim
import os

In [24]:
DATA_FOLDER = "data"
# source https://www.statista.com/statistics/242030/marital-status-of-the-us-population-by-sex/
# the first value is male and the second is for female
MARITAL_STATUS_BY_GENDER = [
    ["Never-married", 47.35, 41.81],
    ["Married-AF-spouse", 67.54, 68.33],
    ["Widowed", 3.58, 11.61],
    ["Divorced", 10.82, 15.09]
]
MARITAL_STATUS_BY_GENDER_COLUMNS = ["marital_status_statistics", "male", "female"]
def read_data(spark: SparkSession) -> DataFrame:
    """
    read data based on the given schema; this is much faster than spark determining the schema
    """
    
    # Define the schema for the dataset
    schema = StructType([
        StructField("age", IntegerType(), True),
        StructField("workclass", StringType(), True),
        StructField("fnlwgt", FloatType(), True),
        StructField("education", StringType(), True),
        StructField("education_num", FloatType(), True),
        StructField("marital_status", StringType(), True),
        StructField("occupation", StringType(), True),
        StructField("relationship", StringType(), True),
        StructField("race", StringType(), True),
        StructField("sex", StringType(), True),
        StructField("capital_gain", FloatType(), True),
        StructField("capital_loss", FloatType(), True),
        StructField("hours_per_week", FloatType(), True),
        StructField("native_country", StringType(), True),
        StructField("income", StringType(), True)
    ])

    # Read the dataset
    data = spark.read \
        .schema(schema) \
        .option("header", "false") \
        .option("inferSchema", "false") \
        .csv(os.path.join(DATA_FOLDER,"*.csv")) 

    data = data.repartition(8)

    float_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, FloatType)]
    for v in float_columns:
        data = data.withColumn(v, data[v].cast(IntegerType()))

    # Get the names of all StringType columns
    string_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

    # Remove leading and trailing spaces in all string columns
    for column in string_columns:
        data = data.withColumn(column, trim(data[column]))

    # Show the first 5 rows of the dataset
    data.show(5)

    return data
def missing_values(data: DataFrame) -> DataFrame:
    """
    count the number of samples with missing values for each row
    remove such samples
    """

    missing_values = data.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in data.columns])

    # Show the missing values count per column
    missing_values.show()

    # Get the number of samples in the DataFrame
    num_samples = data.count()

    # Print the number of samples
    print("Number of samples:", num_samples)  

    data = data.dropna()      
    
    return data
def feature_engineering(data: DataFrame) -> DataFrame:
    """
    calculate the product of each pair of integer features
    """

    # Create columns consisting of all products of columns of type IntegerType
    integer_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, IntegerType)]
    for i, col1 in enumerate(integer_columns):
        for col2 in integer_columns[i:]:
            product_col_name = f"{col1}_x_{col2}"
            data = data.withColumn(product_col_name, col(col1) * col(col2))

    data.show(5)

    return data

def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)

def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')

def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)

    
    
    data.show(5)
    data.write.format('csv').option('header', 'true').mode('overwrite').save('saved.csv')
    #spark.stop()
    #return data
def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)
def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')

In [26]:
spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

data = read_data(spark)


25/02/22 15:54:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 72|Self-emp-not-inc| 52138|   Doctorate|           16|Married-civ-spouse|Prof-specialty|      Husband|White|  Male|           0|        2392|            25| United-States|  >50K|
| 45|         Private|140581|Some-college|           10|           Widowed| Other-service|    Unmarried|Black|Female|           0|           0|            40| United-States| <=50K|
| 55|       Local-gov| 84564| Prof-school|           15|          Divorced|Prof-specialty|Not-i

In [28]:
data = missing_values(data)

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  1|        0|     1|        0|            1|             0|         0|           0|   0|  0|           1|           1|             1|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+

Number of samples: 32562


In [30]:
data = feature_engineering(data)

+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------------------------+---------------------------+-----------------------------+---------------------------+-----------------------------+-------------------------------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|age_x_age|age_x_fnlwgt|age_x_education_num|age_x_capital_gain|age_x_capital_loss|age_x_hours_per_week|fnlwgt_x_fnlwgt|fnlwgt_x_educatio

25/02/22 15:54:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [32]:
bias_marital_status(data)

+--------------------+--------------------+
|      marital_status|average_capital_gain|
+--------------------+--------------------+
|           Separated|   535.5687804878049|
|       Never-married|  376.58831788823363|
|Married-spouse-ab...|   653.9832535885167|
|            Divorced|   728.4148098131893|
|             Widowed|   571.0715005035247|
|   Married-AF-spouse|   432.6521739130435|
|  Married-civ-spouse|  1764.8595085470085|
+--------------------+--------------------+

+---+----------------+------+------------+-------------+--------------+----------------+-------------+-----+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------

In [34]:
data = join_with_US_gender(spark, data)

In [36]:
def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate() 

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)

    
    
    data.show(5)
    #spark.stop()
main()

+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 72|Self-emp-not-inc| 52138|   Doctorate|           16|Married-civ-spouse|Prof-specialty|      Husband|White|  Male|           0|        2392|            25| United-States|  >50K|
| 45|         Private|140581|Some-college|           10|           Widowed| Other-service|    Unmarried|Black|Female|           0|           0|            40| United-States| <=50K|
| 55|       Local-gov| 84564| Prof-school|           15|          Divorced|Prof-specialty|Not-i

In [43]:
# Spark-sql - Task 1

# Select rows where 'age' is between 30 and 50 (inclusive)
filtered_data = data.filter((data.age >= 30) & (data.age <= 50))

# Convert the filtered PySpark DataFrame to a Pandas DataFrame
pandas_df = filtered_data.toPandas()

# Print summary statistics
print(pandas_df.describe())

                age        fnlwgt  education_num  capital_gain  capital_loss  \
count  16390.000000  1.639000e+04   16390.000000  16390.000000  16390.000000   
mean      39.298109  1.895854e+05      10.434655   1296.275595    102.861928   
std        5.895005  1.068750e+05       2.522236   8181.593110    432.962380   
min       30.000000  1.487800e+04       1.000000      0.000000      0.000000   
25%       34.000000  1.169285e+05       9.000000      0.000000      0.000000   
50%       39.000000  1.778765e+05      10.000000      0.000000      0.000000   
75%       44.000000  2.364150e+05      13.000000      0.000000      0.000000   
max       50.000000  1.455435e+06      16.000000  99999.000000   3900.000000   

       hours_per_week     age_x_age  age_x_fnlwgt  age_x_education_num  \
count    16390.000000  16390.000000  1.639000e+04         16390.000000   
mean        43.261440   1579.090299  7.420700e+06           410.665040   
std         10.885127    469.097536  4.274383e+06        