In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max as max_, count, when, round, desc, rank, corr
from pyspark.sql.window import Window
import pandas as pd
import os


In [0]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("SmartEmployees")
    .getOrCreate()

)
spark

In [0]:
df = spark.table("workspace.default.smart_employees1")

spark.stop()

In [0]:
df.show()

In [0]:
df.printSchema()

In [0]:
df.show(5,truncate=False)

In [0]:
df.columns
df.count()
df.describe()

In [0]:
df.count()

Convert String Dates To DataType

In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn("join_date", to_date("join_date", "yyyy-MM-dd"))
df = df.withColumn("last_promotion_date", to_date("last_promotion_date", "yyyy-MM-dd"))

**Create tenure_months**

In [0]:
from pyspark.sql.functions import months_between,current_date,round
df=df.withColumn("tenure_months",round(months_between(current_date(),df["join_date"]),1))

**Years_since_last_promotions**

In [0]:
df = df.withColumn("years_since_last_promotion", round(months_between(current_date(), df["last_promotion_date"]) / 12, 1))


**is_high_performer****

In [0]:
from pyspark.sql.functions import when
df=df.withColumn("is_high_performer",when(df["performance_score"]>=4.5,1).otherwise(0))

In [0]:
df.printSchema()
df.select("employee_id", "tenure_months", "years_since_last_promotion", "is_high_performer").show(5)


STEP 4: Exploratory Data Analysis (EDA) in PySpark
This step helps us understand trends, patterns, and problems in the employee data before building ML models.

📌 Goal:
Answer business questions like:

Which departments have the most resignations?

What’s the average salary and performance?

Who are the top employees by salary and tenure?

**Total  Employees Per Department**

In [0]:
df.groupBy("department").count().orderBy("count",ascending=False).show()

**Resignation Rate by Department**

In [0]:
from pyspark.sql.functions import avg
df.groupBy("department")\
  .agg(avg("resignation_flag").alias("resignation_rate"))\
  .orderBy("resignation_rate",ascending=False)\
  .show()

**Average Salary and Performance by Department**

In [0]:
df.groupBy("department") \
  .agg(
      round(avg("salary"), 0).alias("avg_salary"),
      round(avg("performance_score"), 2).alias("avg_performance")
  ) \
  .orderBy("avg_salary", ascending=False) \
  .show()


**Top 5 Longest-Serving Employees

Helps identify:

Loyalty

Promotion opportunities

Valuable mentors
**

In [0]:
df.select("name", "department", "tenure_months", "salary") \
  .orderBy("tenure_months", ascending=False) \
  .show(5, truncate=False)


 **High Performers Who Haven’t Been Promoted Recently**

In [0]:
df.filter("is_high_performer=1 And years_since_last_promotion>3")\
  .select("name","department","years_since_last_promotion","performance_score")\
    .orderBy("years_since_last_promotion",ascending=False)\
      .show(5,truncate=False)



**Department-wise Training Hours**

In [0]:
df.groupBy("department") \
  .agg(round(avg("training_hours"), 1).alias("avg_training_hours")) \
  .orderBy("avg_training_hours", ascending=False) \
  .show()


✅ Summary of EDA Insights
Analysis	Insight
Resignation by dept	Where people leave the most
Salary vs Performance	Efficiency of departments
Tenure vs Promotion	Who deserves a promotion
High performer check	Retention & growth strategies

In [0]:
df.select("resignation_flag").show()

In [0]:
selected_df=df.select("age", "salary", "tenure_months", "training_hours",
    "performance_score", "years_since_last_promotion", "is_high_performer",
    "resignation_flag")

**Combine All Features into a Single Vector**

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WorkforceML").getOrCreate()


In [0]:
from pyspark.ml.feature import VectorAssembler

# Ensure your DataFrame selected_df is already created as shown earlier

assembler = VectorAssembler(
    inputCols=[
        "age", "salary", "tenure_months", "training_hours",
        "performance_score", "years_since_last_promotion", "is_high_performer"
    ],
    outputCol="features"
)

assembled_df = assembler.transform(selected_df)
assembled_df.select("features", "resignation_flag").show(5, truncate=False)



In [0]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# ✅ Define the function to convert selected columns into a feature vector
def to_features(age, salary, tenure, training, score, promotion_gap, high_perf):
    return Vectors.dense([
        float(age), float(salary), float(tenure), float(training),
        float(score), float(promotion_gap), float(high_perf)
    ])

# ✅ Register it as a UDF
to_features_udf = udf(to_features, VectorUDT())

# ✅ Apply the UDF to create a 'features' column
assembled_df = selected_df.withColumn(
    "features",
    to_features_udf(
        "age", "salary", "tenure_months", "training_hours",
        "performance_score", "years_since_last_promotion", "is_high_performer"
    )
).select("features", "resignation_flag")

# ✅ Show result
assembled_df.show(5, truncate=False)


 Split the Data (Training & Testing)
We split the dataset into two parts:

70% Training Data → to teach the model

30% Testing Data → to test how good the model is


In [0]:
train_data, test_data = assembled_df.randomSplit([0.7, 0.3], seed=42)


In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Initialize Spark session (if not already done)
spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

# Simple example DataFrame
data = [(30, 60000, 24, 10, 4.5, 1.5, 1, 0)]
columns = ["age", "salary", "tenure_months", "training_hours", 
           "performance_score", "years_since_last_promotion", 
           "is_high_performer", "resignation_flag"]

selected_df = spark.createDataFrame(data, columns)

# VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "salary", "tenure_months", "training_hours",
                "performance_score", "years_since_last_promotion", 
                "is_high_performer"],
    outputCol="features"
)

assembled_df = assembler.transform(selected_df).select("features", "resignation_flag")
assembled_df.show()