In [0]:
pip install networkx

In [0]:
from pyspark.sql.functions import to_date

csv_file_path = "/Volumes/studentproject_01_exploration/studentproject_01_schema/files/Job History 1.csv"

# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Read the CSV file into a Spark DataFrame
df_spark = spark.read.option("header", "true").csv(csv_file_path)

# Convert "Reporting Date" to date type
df_spark = df_spark.withColumn("Reporting Date", to_date(df_spark["Reporting Date"], "dd-MMM-yy"))

# Filter the DataFrame to only include rows where the "Employee Status" is "Active" and "Reporting Date" is "2024-04-01"
active_df_spark = df_spark.filter(
    (df_spark["Employee Status"] == "Active") &
    (df_spark["Reporting Date"] == "2024-04-01")
)

# Filter the DataFrame to only include rows where the "Employee Status" is not "Active" and "Reporting Date" is "2024-04-01"
inactive_df_spark = df_spark.filter(
    (df_spark["Employee Status"] != "Active") &
    (df_spark["Reporting Date"] == "2024-04-01")
)

In [0]:
row_count = inactive_df_spark.count()
row_count

In [0]:
csv_file_path = "/Volumes/studentproject_01_exploration/studentproject_01_schema/files/Job History Manager Change.csv"

# Read the CSV file into a Spark DataFrame
df_manager_spark = spark.read.option("header", "true").csv(csv_file_path)

# # Display the Spark DataFrame
display(df_manager_spark)

In [0]:
df_manager_spark = df_manager_spark.filter(df_manager_spark['Employee Status (Picklist Label)'] != "Active")
display(df_manager_spark)

In [0]:
# Extract all unique managers from the Supervisor ID column
managers_df = df_manager_spark.select("Supervisor ID").distinct()

# Display the DataFrame containing all unique managers
display(managers_df)

In [0]:
inactive_df_spark = inactive_df_spark.filter(inactive_df_spark['Employee Class'] != "Workforce")
display(inactive_df_spark)

In [0]:
active_df_spark = active_df_spark.filter(active_df_spark['Employee Class'] != "Workforce")
display(active_df_spark)

In [0]:
row_count = inactive_df_spark.count()
row_count

In [0]:
from pyspark.sql.functions import broadcast

# Broadcast join to efficiently map "Supervisor ID" from manager_df with "Person ID" in filtered_df_spark
managers_df = managers_df.join(broadcast(inactive_df_spark), managers_df["Supervisor ID"] == inactive_df_spark["Person ID"], "left_anti")

# Update the DataFrame
display(managers_df)

In [0]:
managers_df = managers_df.withColumnRenamed("Active_Supervisor_ID", "Person_ID")
managers_df = managers_df.filter((managers_df["Supervisor ID"] != "NO_MANAGER") & (managers_df["Supervisor ID"].isNotNull()))
display(managers_df)

In [0]:
managers_df = managers_df.join(active_df_spark, managers_df["Supervisor ID"] == active_df_spark["Person ID"], "inner")
display(managers_df)

In [0]:
managers_df = managers_df.select("Supervisor ID")
display(managers_df)

In [0]:

# Display the updated DataFrame
display(managers_df)

In [0]:
from pyspark.sql.functions import to_date, col, row_number
from pyspark.sql.window import Window

# Convert "Reporting Date" to date type
filtered_df_spark = filtered_df_spark.withColumn("Reporting Date", to_date(filtered_df_spark["Reporting Date"], "dd/MM/yyyy"))

# Define a window partitioned by 'Person ID', ordered by 'Reporting Date' in descending order
window = Window.partitionBy('Person ID').orderBy(col('Reporting Date').desc())

# Add a row number to each row in the window
filtered_df_spark = filtered_df_spark.withColumn('row_number', row_number().over(window))

# Keep only the first row (i.e., the most recent record) for each 'Person ID'
filtered_df_spark = filtered_df_spark.filter(col('row_number') == 1)

# Drop the 'row_number' column
filtered_df_spark = filtered_df_spark.drop('row_number')

# Display the Spark DataFrame
display(filtered_df_spark)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc, col

# Convert "Event Date" to date type
df_manager_spark = df_manager_spark.withColumn("Event Date", to_date(df_manager_spark["Event Date"], "dd/MM/yyyy"))

# Check for missing or incorrect 'Event Date' values
filtered_df_manager_spark = df_manager_spark.filter(col("Event Date").isNotNull())

# Define a window partitioned by 'Person ID', ordered by 'Event Date' in descending order
window = Window.partitionBy('Person ID').orderBy(desc('Event Date'))

# Add a row number to each row in the window
filtered_df_manager_spark = filtered_df_manager_spark.withColumn('row_number', row_number().over(window))

# Keep only the first row (i.e., the most recent record) for each 'Person ID'
filtered_df_manager_spark = filtered_df_manager_spark.filter(filtered_df_manager_spark['row_number'] == 1)

# Drop the 'row_number' column
filtered_df_manager_spark = filtered_df_manager_spark.drop('row_number')

# Filter out rows where 'Supervisor ID' is 'NO_MANAGER'
filtered_df_manager_spark = filtered_df_manager_spark.filter(filtered_df_manager_spark['Supervisor ID'] != 'NO_MANAGER')

# # Display the Spark DataFrame
display(filtered_df_manager_spark)

In [0]:
# Rename the lastly added supervisor column as "Active_Supervisor_ID"
filtered_df_manager_spark = filtered_df_manager_spark.drop("Active_Supervisor_ID")

# Display the updated Spark DataFrame
display(filtered_df_manager_spark)

In [0]:
from pyspark.sql.functions import lit

# Filter df_manager_spark for rows where 'Event Date' is '2024-04-01'
filtered_df_manager_spark = df_manager_spark.filter(col("Event Date") == lit("2024-04-01"))

# Display the filtered Spark DataFrame
display(filtered_df_manager_spark)

In [0]:
# Filter out rows where 'Employee Class (Picklist Label)' is 'Workforce'
filtered_df_manager_spark = filtered_df_manager_spark.filter(filtered_df_manager_spark['Employee Class (Picklist Label)'] != 'Workforce')

# Display the updated Spark DataFrame
display(filtered_df_manager_spark)

#### Make sure all Person IDs and Supervisor IDs exist

In [0]:
# Make sure all Person IDs and Supervisor IDs exist
from pyspark.sql.functions import col

all_person_ids = filtered_df_spark.select('Person ID').distinct().collect()

# Convert all_person_ids to a DataFrame
all_person_ids_df = spark.createDataFrame(all_person_ids)

# Rename the column to 'Person ID'
all_person_ids_df = all_person_ids_df.withColumnRenamed('_1', 'Person ID')

# Join filtered_df_manager_spark with all_person_ids_df on 'Person ID'
filtered_df_manager_spark = filtered_df_manager_spark.join(all_person_ids_df, on='Person ID', how='inner')

# Rename the column in all_person_ids_df to 'Supervisor ID'
all_person_ids_df = all_person_ids_df.withColumnRenamed('Person ID', 'Supervisor ID')

# Join filtered_df_manager_spark with all_person_ids_df on 'Supervisor ID'
filtered_df_manager_spark = filtered_df_manager_spark.join(all_person_ids_df, on='Supervisor ID', how='inner')

display(filtered_df_manager_spark)

In [0]:
display(filtered_df_manager_spark)

In [0]:
# Filter the DataFrame for 'Person ID' = 63402 and display the details
person_details = filtered_df_spark.filter(filtered_df_spark['Person ID'] == 61382)
display(person_details)

In [0]:
from pyspark.sql.functions import col

# Drop all rows with Employee Status "Terminated" in filtered_df_manager_spark
filtered_df_manager_spark = filtered_df_manager_spark.filter(col("Employee Status (Picklist Label)") != "Terminated")

display(filtered_df_manager_spark)

In [0]:
from pyspark.sql.functions import broadcast

# Join filtered_df_spark with filtered_df_manager_spark to add "Supervisor ID" column
filtered_df_spark = filtered_df_spark.join(broadcast(filtered_df_manager_spark.select("Person ID", "Supervisor ID")), on="Person ID", how="left")

display(filtered_df_spark)

# Construct the Graph

In [0]:
import networkx as nx
import matplotlib.pyplot as plt

# Create an empty graph
G = nx.Graph()

# Clear the graph if needed
G.clear()

# Convert PySpark DataFrame to Pandas DataFrame
pd_filtered_df_spark = filtered_df_spark.toPandas()

# Define the values to be replaced
values_to_replace = ['Customer & Corp Affairs', 'Chief Operations Office', 'Commercial', 'Executive Management', 'Finance & Group Services', 'HSE', 'ICT', 'People', 'Strategy', 'WA Region']

# Replace the values in the 'BU' column of the copied DataFrame
pd_filtered_df_spark['BU'] = pd_filtered_df_spark['BU'].replace(values_to_replace, 'Corporate')

# Define a color map
COLOR_MAP = {
    'Major Projects': 'salmon',
    'Infrastructure': 'slateblue',
    'Rail': 'lightgreen',
    'Building': 'khaki',
    'Corporate': 'chocolate',
    'TEK': 'lightblue'
}

# Iterate over the rows of the Pandas DataFrame
for index, row in pd_filtered_df_spark.iterrows():
    # Add the row as a node to the graph
    G.add_node(index, attr_dict=row.to_dict())

# Print the number of nodes in the graph
print(f"Number of nodes: {G.number_of_nodes()}")

In [0]:
# # Get the count of each unique value in the 'BU' column
value_counts = pd_filtered_df_spark['BU'].value_counts()

# # Print the count of each unique value
print(value_counts)

In [0]:
# # Define a color map
color_map = {
     'Major Projects': 'salmon',
     'Infrastructure': 'slateblue',
     'Rail': 'lightgreen',
     'Building': 'khaki',
     'Corporate': 'chocolate',
     'TEK': 'lightblue'
}

# # Get the colors of the nodes
node_colors = [color_map.get(G.nodes[node].get('attr_dict').get('BU', 'NULL'), 'lightpink') for node in G.nodes]

# # Get the labels of the nodes
node_labels = {node: G.nodes[node].get('attr_dict').get('Person ID', 'N/A') for node in G.nodes}

# # Get the spring layout positions
pos = nx.random_layout(G)

# # Draw the graph
nx.draw(G, pos, with_labels=True, node_color=node_colors, labels=node_labels, font_size=5, node_size=10)

# # Show the plot
plt.show()

In [0]:
# Get the colors of the nodes
color_map = COLOR_MAP
node_colors = [color_map.get(G.nodes[node].get('attr_dict').get('BU', 'NULL'), 'lightpink') for node in G.nodes]

# Get the spring layout positions
pos = nx.random_layout(G)

# Draw the graph
nx.draw(G, pos, with_labels=False, node_color=node_colors, node_size=5)

# Show the plot
plt.show()

### Undirected Graph

In [0]:
# Add edges from 'Person ID' to 'Supervisor ID' with 'is_manager' attribute
for node in G.nodes:
    supervisor_id = G.nodes[node].get('attr_dict').get('Supervisor ID')
    
    # Check if 'Supervisor ID' equals to another node's 'Person ID'
    for other_node in G.nodes:
        person_id = G.nodes[other_node].get('attr_dict').get('Person ID')
        
        if supervisor_id == person_id:
            G.add_edge(node, other_node, is_manager=True)

# Get the colors of the nodes
color_map = COLOR_MAP
node_colors = [color_map.get(G.nodes[node].get('attr_dict').get('BU', 'NULL'), 'lightpink') for node in G.nodes]

labels = {node: G.nodes[node].get('attr_dict').get('Person ID', 'N/A') for node in G.nodes}

# Draw the new graph with labels
nx.draw(G, pos, node_color=node_colors, with_labels=False, node_size=5)

## Directed Graph

In [0]:
# Create an empty graph
Gr = nx.DiGraph()
Gr.clear()

# Iterate over the rows of the DataFrame
for index, row in pd_filtered_df_spark.iterrows():
    # Add the row as a node to the graph
    Gr.add_node(index, attr_dict=row.to_dict())

# Print the number of nodes in the graph
print(f"Number of nodes: {Gr.number_of_nodes()}")

# Get the colors of the nodes
color_map = COLOR_MAP
node_colors = [color_map.get(Gr.nodes[node].get('attr_dict').get('BU', 'NULL'), 'pink') for node in Gr.nodes]

# Get the spring layout positions
pos = nx.random_layout(Gr)

# Draw the subgraph
nx.draw(Gr, pos, with_labels=False, node_color=node_colors, node_size=5)

# Show the plot
plt.show()

In [0]:
# Add edges from 'Person ID' to 'Supervisor ID' with 'manager' attribute
for node in Gr.nodes:
    supervisor_id = Gr.nodes[node].get('attr_dict').get('Supervisor ID')
    
    # Check if 'Supervisor ID' equals to another node's 'Person ID'
    for other_node in Gr.nodes:
        person_id = Gr.nodes[other_node].get('attr_dict').get('Person ID')
        
        if supervisor_id == person_id:
            Gr.add_edge(node, other_node, manager=True)

In [0]:
# Access Person ID for each node in the graph Gr
person_ids = {node: Gr.nodes[node].get('attr_dict', {}).get('Person ID', 'N/A') for node in Gr.nodes}

# Convert the dictionary to a Spark DataFrame
person_ids_df = spark.createDataFrame([(node, person_id) for node, person_id in person_ids.items()], ['node', 'Person ID'])

# Display the DataFrame
display(person_ids_df)

###  Degree of Centrality

In [0]:
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.window import Window

# # Calculate degree centrality for all nodes in the graph Gr without the 'normalized' argument
# all_degree_centrality = nx.degree_centrality(Gr)

# # Create a List of Tuples including 'node', 'Person ID', 'Business Unit', and 'degree centrality'
# data = [
#     (
#         int(node), 
#         Gr.nodes[node].get('attr_dict').get('Person ID'),
#         Gr.nodes[node].get('attr_dict').get('Supervisor ID'), 
#         Gr.nodes[node].get('attr_dict').get('BU'), 
#         Gr.nodes[node].get('attr_dict').get('Gender'),
#         Gr.nodes[node].get('attr_dict').get('Employee Status'),
#         Gr.nodes[node].get('attr_dict').get('JH Position Title (externalCode)'),
#         Gr.nodes[node].get('attr_dict').get('External Title'),
#         Gr.nodes[node].get('attr_dict').get('Last Day of Service'),
#         Gr.nodes[node].get('attr_dict').get('Sub Termination Reason (externalCode)'),
#         Gr.nodes[node].get('attr_dict').get('Sub Termination Reason (Label)'),
#         centrality
#     )
#     for node, centrality in all_degree_centrality.items()
# ]

# # Initialize SparkSession if not already done
# spark = SparkSession.builder.appName("DegreeCentrality").getOrCreate()

# # Create a Spark DataFrame from the list of tuples
# degree_centrality_df = spark.createDataFrame(data, ['node', 'Person ID', 'Supervisor ID', 'BU', 'Gender', 'Employee Status', 'JH Position Title (externalCode)', 'External Title', 'Last Day of Service', 'Sub Termination Reason (externalCode)', 'Sub Termination Reason (Label)', 'degree_centrality'])

# # Display the DataFrame
# display(degree_centrality_df)

###  Eigen Vector Centrality

In [0]:
from pyspark.sql.functions import col, collect_list, count, max

# Calculate eigenvector centrality for all nodes in the graph Gr
eigenvector_centrality = nx.eigenvector_centrality(Gr, max_iter=1000)

# Extract Supervisor ID, Employee Status, and Person ID from each node's attributes
supervisor_info = [
    (
        Gr.nodes[node].get('attr_dict').get('Supervisor ID'),
        Gr.nodes[node].get('attr_dict').get('Employee Status'),
        Gr.nodes[node].get('attr_dict').get('Person ID'),
        eigenvector_centrality[node]
    )
    for node in Gr.nodes
]

# Convert the list to a Spark DataFrame
supervisor_info_df = spark.createDataFrame(supervisor_info, ['Supervisor ID', 'Employee Status', 'Person ID', 'Eigen Vector Centrality'])

# Group by Supervisor ID to aggregate the required information
supervisor_metrics_df = supervisor_info_df.groupBy('Supervisor ID', 'Employee Status').agg(
    collect_list('Person ID').alias('List of Person IDs'),
    count('Person ID').alias('Number of nodes connected'),
    max(col('Eigen Vector Centrality')).alias('Eigen Vector Centrality')
)

# Display the DataFrame
display(supervisor_metrics_df)