# Spark ML Assignment: Fill-in-the-Blanks

Follow the instructions and complete the TODOs below. Solutions are provided for verification.

## Data Cleaning & Type Casting
**Instructions:** Load a dataset, remove unwanted characters, and cast column types.

In [2]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.1/434.1 MB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.9 (from pyspark)
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741299 sha256=12df986b338b65f0069f1d2a9658889e2d95ac06646d676270a02140d61a65c0
  Stored in directory: /home/aymuos/.cache/pip/wheels/62/69/eb/eef3014e40bbcff88f1d6dd762baebf6bf5d0266ba57be1ef8
Successfully built pyspark
Installing collected packages: py4j, py

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import DoubleType, IntegerType

spark = SparkSession.builder.appName("SparkML_Cleaning").getOrCreate()

# TODO: Load the dataset from 'data.csv'

file = "data.csv"
df = spark.read.csv(file, header=True, inferSchema=True)

df.show()



+-----------+-----------+------------------+-----------+
|employee_id| department|               age|     salary|
+-----------+-----------+------------------+-----------+
|          1|      Sales|26.036648168706982| $87,242.29|
|          2|  Marketing| 33.88012179493058| $82,075.61|
|          3|Engineering|49.689412854323926| $70,717.66|
|          4|  Marketing|23.761016654599633| $90,979.39|
|          5|  Marketing|  44.5000539550718| $87,809.97|
|          6|    Finance| 52.26516468575364|$101,828.91|
|          7|Engineering| 39.57885077000015| $83,503.41|
|          8|Engineering|18.157126216341187| $72,532.49|
|          9|Engineering| 38.26845223970014| $72,855.14|
|         10|  Marketing| 34.18881045841313| $45,981.45|
|         11|      Sales| 39.67794753263152| $77,259.89|
|         12|Engineering|42.361223506692575| $74,764.28|
|         13|  Marketing|27.202981193401214| $55,048.89|
|         14|    Finance|26.561036396560745| $74,620.69|
|         15|      Sales| 33.49

In [None]:
# TODO: Remove '$' sign from 'salary' column and cast to DoubleType
df = df.withColumn("salary", regexp_replace(col("salary"), "\\$", ""))
df = df.withColumn("salary", regexp_replace(col("salary"), ",", ""))


df.show()

+-----------+-----------+------------------+---------+
|employee_id| department|               age|   salary|
+-----------+-----------+------------------+---------+
|          1|      Sales|26.036648168706982| 87242.29|
|          2|  Marketing| 33.88012179493058| 82075.61|
|          3|Engineering|49.689412854323926| 70717.66|
|          4|  Marketing|23.761016654599633| 90979.39|
|          5|  Marketing|  44.5000539550718| 87809.97|
|          6|    Finance| 52.26516468575364|101828.91|
|          7|Engineering| 39.57885077000015| 83503.41|
|          8|Engineering|18.157126216341187| 72532.49|
|          9|Engineering| 38.26845223970014| 72855.14|
|         10|  Marketing| 34.18881045841313| 45981.45|
|         11|      Sales| 39.67794753263152| 77259.89|
|         12|Engineering|42.361223506692575| 74764.28|
|         13|  Marketing|27.202981193401214| 55048.89|
|         14|    Finance|26.561036396560745| 74620.69|
|         15|      Sales| 33.49466141740217| 95147.52|
|         

In [10]:
df = df.withColumn("salary", col("salary").cast(DoubleType()))
df.show()

+-----------+-----------+------------------+---------+
|employee_id| department|               age|   salary|
+-----------+-----------+------------------+---------+
|          1|      Sales|26.036648168706982| 87242.29|
|          2|  Marketing| 33.88012179493058| 82075.61|
|          3|Engineering|49.689412854323926| 70717.66|
|          4|  Marketing|23.761016654599633| 90979.39|
|          5|  Marketing|  44.5000539550718| 87809.97|
|          6|    Finance| 52.26516468575364|101828.91|
|          7|Engineering| 39.57885077000015| 83503.41|
|          8|Engineering|18.157126216341187| 72532.49|
|          9|Engineering| 38.26845223970014| 72855.14|
|         10|  Marketing| 34.18881045841313| 45981.45|
|         11|      Sales| 39.67794753263152| 77259.89|
|         12|Engineering|42.361223506692575| 74764.28|
|         13|  Marketing|27.202981193401214| 55048.89|
|         14|    Finance|26.561036396560745| 74620.69|
|         15|      Sales| 33.49466141740217| 95147.52|
|         

In [13]:
print(df.schema["salary"].dataType)

DoubleType()


## Feature Engineering
**Instructions:** Handle missing values, index categorical columns, and one-hot encode.

In [None]:
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder

# TODO: Fill missing values in 'age' column using mean strategy

# TODO: Index 'department' column

# TODO: One-hot encode 'department_index'


## Feature Scaling and Vector Assembler
**Instructions:** Combine features into a vector and apply standard scaling.

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# TODO: Assemble 'age_imputed' and 'salary' into a single feature vector

# TODO: Scale the feature vector


## Exploratory Data Analysis (EDA)
**Instructions:** Perform basic data analysis using Spark and convert to Pandas for visualizations.

In [None]:
# TODO: Display summary statistics for numerical columns


In [None]:
# TODO: Check for null values in each column


In [None]:
# TODO: Analyze salary distribution by department


In [None]:
# TODO: Visualize distributions using Pandas and Matplotlib


In [None]:
# TODO: Visualize age distribution per department


In [None]:
# TODO: Compute correlation between numeric features


## Feature Transformations and Scaling
**Instructions:** Experiment with scaling techniques ([MinMaxScaler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinMaxScaler.html)), bucketing, and feature creation ([VectorAssembler
](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)).


In [None]:
# TODO: Apply MinMaxScaler to 'salary' and 'age'
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# Assemble features before scaling


In [None]:
# TODO: Categorize 'age' into bins (e.g., Young, Mid, Senior)
from pyspark.sql.functions import when



In [None]:
# TODO: Create a new column 'salary_per_year_of_age'


In [None]:
# TODO: Compute average salary per age group
df_binned.groupBy('age_group').agg({'salary': 'avg'}).show()

In [None]:
# TODO: Visualize salary per age group


# Check the libraries and experiment with different parameter values : [OneHotEncoder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html), [Imputer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html), [LinearRegression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html)