In [87]:
!pip install pyspark
import pyspark
print(pyspark.__version__)

3.5.4


In [88]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark_Tutorial").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f0f07801ed0>


In [89]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample data
data = [
    (1, 2.0, 3.0, 5.0), (2, 3.0, 4.0, 7.0), (3, 4.0, 5.0, 9.0), (4, 5.0, 6.0, 11.0), (5, 6.0, 7.0, 13.0),
    (6, 7.0, 8.0, 15.0), (7, 8.0, 9.0, 17.0), (8, 9.0, 10.0, 19.0), (9, 10.0, 11.0, 21.0), (10, 11.0, 12.0, 23.0),
    (11, 12.0, 13.0, 25.0), (12, 13.0, 14.0, 27.0), (13, 14.0, 15.0, 29.0), (14, 15.0, 16.0, 31.0), (15, 16.0, 17.0, 33.0),
    (16, 17.0, 18.0, 35.0), (17, 18.0, 19.0, 37.0), (18, 19.0, 20.0, 39.0), (19, 20.0, 21.0, 41.0), (20, 21.0, 22.0, 43.0)
]
df = spark.createDataFrame(data, ["ID", "Feature1", "Feature2", "Label"])

# Feature engineering
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="Features")
df = assembler.transform(df).select("Features", "Label")

# Train-Test Split
train_df, test_df = df.randomSplit([0.8, 0.2])

# Define and train the model
lr = LinearRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(train_df)

# Make predictions
predictions = model.transform(test_df)
predictions.show()

+-----------+-----+------------------+
|   Features|Label|        prediction|
+-----------+-----+------------------+
| [9.0,10.0]| 19.0|19.000000000000004|
|[15.0,16.0]| 31.0| 30.99999999999999|
|[16.0,17.0]| 33.0| 32.99999999999999|
+-----------+-----+------------------+



In [90]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, count, avg, trim
import os

In [91]:
DATA_FOLDER = "word-count/data"
# source https://www.statista.com/statistics/242030/marital-status-of-the-us-population-by-sex/
# the first value is male and the second is for female
MARITAL_STATUS_BY_GENDER = [
    ["Never-married", 47.35, 41.81],
    ["Married-AF-spouse", 67.54, 68.33],
    ["Widowed", 3.58, 11.61],
    ["Divorced", 10.82, 15.09]
]
MARITAL_STATUS_BY_GENDER_COLUMNS = ["marital_status_statistics", "male", "female"]

In [92]:
def read_data(spark: SparkSession) -> DataFrame:
    """
    read data based on the given schema; this is much faster than spark determining the schema
    """

    # Define the schema for the dataset
    schema = StructType([
        StructField("age", IntegerType(), True),
        StructField("workclass", StringType(), True),
        StructField("fnlwgt", FloatType(), True),
        StructField("education", StringType(), True),
        StructField("education_num", FloatType(), True),
        StructField("marital_status", StringType(), True),
        StructField("occupation", StringType(), True),
        StructField("relationship", StringType(), True),
        StructField("race", StringType(), True),
        StructField("sex", StringType(), True),
        StructField("capital_gain", FloatType(), True),
        StructField("capital_loss", FloatType(), True),
        StructField("hours_per_week", FloatType(), True),
        StructField("native_country", StringType(), True),
        StructField("income", StringType(), True)
    ])

    # Read the dataset
    data = spark.read \
        .schema(schema) \
        .option("header", "false") \
        .option("inferSchema", "false") \
        .csv(os.path.join(DATA_FOLDER,"*.csv"))

    data = data.repartition(8)

    float_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, FloatType)]
    for v in float_columns:
        data = data.withColumn(v, data[v].cast(IntegerType()))

    # Get the names of all StringType columns
    string_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

    # Remove leading and trailing spaces in all string columns
    for column in string_columns:
        data = data.withColumn(column, trim(data[column]))

    # Show the first 5 rows of the dataset
    data.show(5)

    return data

In [93]:
def missing_values(data: DataFrame) -> DataFrame:
    """
    count the number of samples with missing values for each row
    remove such samples
    """

    missing_values = data.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in data.columns])

    # Show the missing values count per column
    missing_values.show()

    # Get the number of samples in the DataFrame
    num_samples = data.count()

    # Print the number of samples
    print("Number of samples:", num_samples)

    data = data.dropna()

    return data

In [94]:
def feature_engineering(data: DataFrame) -> DataFrame:
    """
    calculate the product of each pair of integer features
    """

    # Create columns consisting of all products of columns of type IntegerType
    integer_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, IntegerType)]
    for i, col1 in enumerate(integer_columns):
        for col2 in integer_columns[i:]:
            product_col_name = f"{col1}_x_{col2}"
            data = data.withColumn(product_col_name, col(col1) * col(col2))

    data.show(5)

    return data

def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)

def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')

def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)



    data.show(5)
    data.write.format('csv').option('header', 'true').mode('overwrite').save('saved.csv')
    #spark.stop()
    #return data

In [95]:
def bias_marital_status(data: DataFrame):
    """
    is there bias in capital gain by marital status
    """

    # Calculate the average capital_gain by marital_status
    average_capital_gain = data.groupBy("marital_status").agg(avg("capital_gain").alias("average_capital_gain"))

    # Show the average capital_gain by marital_status
    average_capital_gain.show()

    # Filter data based on marital_status = Divorced
    divorced_data = data.filter(data.marital_status == "Divorced")

    # Show the first 5 rows of the filtered DataFrame
    divorced_data.show(5)

In [96]:
def join_with_US_gender(spark: SparkSession, data: DataFrame):
    """
    join with respect to the marital_status
    """

    # create a data frame from new data
    columns = ["dept_name","dept_id"]
    us_df = spark.createDataFrame(MARITAL_STATUS_BY_GENDER, MARITAL_STATUS_BY_GENDER_COLUMNS)

    return data.join(us_df, data.marital_status == us_df.marital_status_statistics, 'outer')

In [97]:
spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

data = read_data(spark)

+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 72|Self-emp-not-inc| 52138|   Doctorate|           16|Married-civ-spouse|Prof-specialty|      Husband|White|  Male|           0|        2392|            25| United-States|  >50K|
| 45|         Private|140581|Some-college|           10|           Widowed| Other-service|    Unmarried|Black|Female|           0|           0|            40| United-States| <=50K|
| 55|       Local-gov| 84564| Prof-school|           15|          Divorced|Prof-specialty|Not-i

In [98]:
data = missing_values(data)

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  1|        0|     1|        0|            1|             0|         0|           0|   0|  0|           1|           1|             1|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+

Number of samples: 32562


In [99]:
data = feature_engineering(data)

+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------------------------+---------------------------+-----------------------------+---------------------------+-----------------------------+-------------------------------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|age_x_age|age_x_fnlwgt|age_x_education_num|age_x_capital_gain|age_x_capital_loss|age_x_hours_per_week|fnlwgt_x_fnlwgt|fnlwgt_x_educatio

In [100]:
bias_marital_status(data)

+--------------------+--------------------+
|      marital_status|average_capital_gain|
+--------------------+--------------------+
|           Separated|   535.5687804878049|
|       Never-married|  376.58831788823363|
|Married-spouse-ab...|   653.9832535885167|
|            Divorced|   728.4148098131893|
|             Widowed|   571.0715005035247|
|   Married-AF-spouse|   432.6521739130435|
|  Married-civ-spouse|  1764.8595085470085|
+--------------------+--------------------+

+---+----------------+------+------------+-------------+--------------+----------------+-------------+-----+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------

In [102]:
data = join_with_US_gender(spark, data)

In [103]:
def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

    data = read_data(spark)
    # perform basic EDA - count missing values
    data = missing_values(data)
    data = feature_engineering(data)
    bias_marital_status(data)
    data = join_with_US_gender(spark, data)



    data.show(5)
    #spark.stop()

In [104]:
main()

+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|    occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+------------+-------------+------------------+--------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 72|Self-emp-not-inc| 52138|   Doctorate|           16|Married-civ-spouse|Prof-specialty|      Husband|White|  Male|           0|        2392|            25| United-States|  >50K|
| 45|         Private|140581|Some-college|           10|           Widowed| Other-service|    Unmarried|Black|Female|           0|           0|            40| United-States| <=50K|
| 55|       Local-gov| 84564| Prof-school|           15|          Divorced|Prof-specialty|Not-i

In [47]:
from pyspark import SparkContext, SparkConf
DATA = "./word-count/data/*.txt"
OUTPUT_DIR = "counts" # name of the folder

def word_count():
  sc = SparkContext("local","Word count example")
  textFile = sc.textFile(DATA)
  counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b: a + b).filter(lambda word_count: word_count[1] >= 3)
  counts.saveAsTextFile(OUTPUT_DIR)
  print("Number of partitions: ", textFile.getNumPartitions())

word_count()


Number of partitions:  2


In [85]:
sc = SparkContext.getOrCreate()
sc.stop()

In [108]:
import pandas as pd

spark = SparkSession.builder \
        .appName("Read Adult Dataset") \
        .getOrCreate()

def middle_age(spark: SparkSession, data: DataFrame):
  ret_data = data.where((data.age >= 30) & (data.age <= 50))
  ret_data.show()
  df = ret_data.toPandas()
  return df.describe()

middle_age(spark, data)

+---+----------------+------+------------+-------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+---------+------------+-------------------+------------------+------------------+--------------------+---------------+----------------------+---------------------+---------------------+-----------------------+-----------------------------+----------------------------+----------------------------+------------------------------+---------------------------+---------------------------+-----------------------------+---------------------------+-----------------------------+-------------------------------+-------------------------+-----+------+-------------------------+-----+------+
|age|       workclass|fnlwgt|   education|education_num|    marital_status|       occupation| relationship|              race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|age_x_age|age_x_fnlwgt|ag

Unnamed: 0,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,age_x_age,age_x_fnlwgt,age_x_education_num,age_x_capital_gain,...,capital_gain_x_capital_gain,capital_gain_x_capital_loss,capital_gain_x_hours_per_week,capital_loss_x_capital_loss,capital_loss_x_hours_per_week,hours_per_week_x_hours_per_week,male,female,male.1,female.1
count,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,...,16390.0,16390.0,16390.0,16390.0,16390.0,16390.0,6358.0,6358.0,6358.0,6358.0
mean,39.298109,189585.4,10.434655,1296.275595,102.861928,43.26144,1579.090299,7420700.0,410.66504,53336.41,...,16205110.0,0.0,63645.86,198025.6,4689.146797,1990.030933,28.4224,28.048213,28.4224,28.048213
std,5.895005,106875.0,2.522236,8181.59311,432.96238,10.885127,469.097536,4274383.0,120.548608,345858.1,...,117432600.0,0.0,435826.5,875066.5,20548.38327,1076.631521,18.604362,13.580131,18.604362,13.580131
min,30.0,14878.0,1.0,0.0,0.0,1.0,900.0,579060.0,30.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1.0,3.58,11.61,3.58,11.61
25%,34.0,116928.5,9.0,0.0,0.0,40.0,1156.0,4589637.0,330.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1600.0,10.82,15.09,10.82,15.09
50%,39.0,177876.5,10.0,0.0,0.0,40.0,1521.0,6840798.0,403.0,0.0,...,0.0,0.0,0.0,0.0,0.0,1600.0,10.82,15.09,10.82,15.09
75%,44.0,236415.0,13.0,0.0,0.0,48.0,1936.0,9360606.0,480.0,0.0,...,0.0,0.0,0.0,0.0,0.0,2304.0,47.35,41.81,47.35,41.81
max,50.0,1455435.0,16.0,99999.0,3900.0,99.0,2500.0,61475400.0,800.0,4999950.0,...,1409865000.0,0.0,7999920.0,15210000.0,239085.0,9801.0,67.54,68.33,67.54,68.33
