In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Employee Attrition Prediction") \
    .getOrCreate()

# Load dataset
file_path = "C:\\Users\\admin\\Desktop\\2024\\Data-Analyst\\Assignments\\Projects\\last project 4\\Project-4\\Resources\\employee_attrition.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Display schema
data.printSchema()

# Drop unnecessary columns
columns_to_drop = ["EmployeeCount", "EmployeeNumber", "Over18", "StandardHours", "DailyRate", "HourlyRate", "MonthlyRate"]
data = data.drop(*columns_to_drop)

# Encode categorical variables and assemble features
categorical_cols = [field for (field, dtype) in data.dtypes if dtype == "string" and field != "Attrition"]
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_cols]
assembler = VectorAssembler(inputCols=[column + "_index" for column in categorical_cols] + [field for (field, dtype) in data.dtypes if dtype == "int" or dtype == "double"], outputCol="features")
label_indexer = StringIndexer(inputCol="Attrition", outputCol="label")

# Pipeline for feature transformation
pipeline = Pipeline(stages=indexers + [label_indexer, assembler])
data_transformed = pipeline.fit(data).transform(data)

# Select features and label
data_transformed = data_transformed.select("features", "label")

# Split data into training and test sets
train_data, test_data = data_transformed.randomSplit([0.8, 0.2], seed=42)

# Initialize Logistic Regression model
lr = LogisticRegression(maxIter=1000)

# Train model
lr_model = lr.fit(train_data)

# Make predictions
predictions = lr_model.transform(test_data)

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

# Display confusion matrix
predictions.groupBy("label", "prediction").count().show()

# Save processed data to Parquet
data_transformed.write.parquet("employee_attrition_processed.parquet")

# Load data from Parquet
data_loaded = spark.read.parquet("employee_attrition_processed.parquet")
data_loaded.show()

# Collect predictions for visualization
preds = predictions.select("label", "prediction").toPandas()

# Confusion matrix
conf_matrix = pd.crosstab(preds['label'], preds['prediction'], rownames=['Actual'], colnames=['Predicted'], margins=True)

# Plot confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.show()


ModuleNotFoundError: No module named 'pyspark'