In [1]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, when, sum, round, month, year, first
import logging

In [2]:
from utils.read import read_csv_files

In [3]:
# Define the path variable
CLEANED_DATA_PATH = "/spark-data/capstone_crm/data/cleaned/"

# Sales Team Performance Assessment Analysis

In [4]:
try:  
    logging.info("loading csv files")
    # read customers data
    sales_team_df = read_csv_files(f"{CLEANED_DATA_PATH}sales_team.csv")
    # read transactions data
    transactions_df = read_csv_files(f"{CLEANED_DATA_PATH}transactions.csv")
    # read products data
    products_df = read_csv_files(f"{CLEANED_DATA_PATH}products.csv")
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)     

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/07 13:44:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/07 13:44:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/07 13:44:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
try:    
    # Calculate percentage of target achieved (calculate top performer and under performer )
    sales_team_df = sales_team_df.withColumn(
        "Target_Achieved_Percentage",
        round((col("Sales_Achieved") / col("Sales_Target")) * 100, 2))

    # Categorize performance
    sales_team_df = sales_team_df.withColumn(
        "Performance_Category",
        when(col("Target_Achieved_Percentage") >= 100, "Top Performer")
        .when(col("Target_Achieved_Percentage") >= 80, "Average Performer")
        .otherwise("Under Performer")
    )

except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)

### Analyze top performer 

In [6]:
try:
    logging.info("Top 10 Performer in Sales Team")   
    #Calculate top performer 
    top_performer_sales_team_df = sales_team_df.orderBy(col("Target_Achieved_Percentage").desc())
    top_performer_sales_team_df = top_performer_sales_team_df.select("Name","Target_Achieved_Percentage","Performance_Category").limit(10)
    top_performer_sales_team_df.show()
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)    

                                                                                

+--------------------+--------------------------+--------------------+
|                Name|Target_Achieved_Percentage|Performance_Category|
+--------------------+--------------------------+--------------------+
|       Angela Miller|                    255.61|       Top Performer|
|Dr. Wayne Spencer...|                    241.47|       Top Performer|
|          John Terry|                     211.0|       Top Performer|
|        Melissa Ruiz|                    204.62|       Top Performer|
|    Brian Wallace Md|                    192.62|       Top Performer|
|  Edward Delacruz Ii|                    190.76|       Top Performer|
|     Samantha Watson|                     183.9|       Top Performer|
|         Andrew Rice|                    175.15|       Top Performer|
|        Kelly Ibarra|                    174.66|       Top Performer|
|    Cameron Peterson|                    154.29|       Top Performer|
+--------------------+--------------------------+--------------------+



#### Analyze under performer

In [7]:
try:
    logging.info("Under Performer in Sales Team")   
    # calculate under performer
    least_performer_sales_team_df = sales_team_df.orderBy(col("Target_Achieved_Percentage"))
    least_performer_sales_team_df = least_performer_sales_team_df.select("Name","Target_Achieved_Percentage","Performance_Category") \
                                .orderBy(col("Target_Achieved_Percentage")).limit(10)
    least_performer_sales_team_df.show()
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)    

+-----------------+--------------------------+--------------------+
|             Name|Target_Achieved_Percentage|Performance_Category|
+-----------------+--------------------------+--------------------+
|     David Miller|                     15.97|     Under Performer|
|       Erin Ramos|                     26.74|     Under Performer|
| Jasmine Cross Md|                     27.46|     Under Performer|
|  James Nicholson|                     28.92|     Under Performer|
|Pamela Pennington|                     30.13|     Under Performer|
|  Brittany Taylor|                     34.12|     Under Performer|
|      William Lee|                     35.98|     Under Performer|
|    Heather Henry|                      37.4|     Under Performer|
|     Teresa Evans|                     39.74|     Under Performer|
|    Jasmine Davis|                     48.96|     Under Performer|
+-----------------+--------------------------+--------------------+



#### Top Region On Sales Achieved

In [8]:
try:
    logging.info("Top Region On Sales Achieved")    
    # calculate Top regions on Sales
    top_regions_df  = sales_team_df.groupBy("Region") \
                    .agg(sum("Sales_Achieved").alias("Total_Sales_Achieved"),first("Sales_Target").alias("Sales_Target"))
    
    top_regions_df = top_regions_df.orderBy(col("Total_Sales_Achieved").desc()).limit(10)
    top_regions_df.show()          
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)

+-------------+--------------------+------------+
|       Region|Total_Sales_Achieved|Sales_Target|
+-------------+--------------------+------------+
|         Utah|            112756.0|     18095.0|
|       Oregon|             91110.0|     19974.0|
|    Louisiana|             61886.0|     31635.0|
|West Virginia|             54771.0|     17436.0|
|      Arizona|  46357.425531914894|     14202.0|
|        Idaho|             46224.0|     15272.0|
|         Ohio|  44029.425531914894|     40598.0|
|     Kentucky|             41932.0|     18428.0|
|     Illinois|             41695.0|     15699.0|
|      Montana|             40145.0|     27915.0|
+-------------+--------------------+------------+



#### Analyze total_sales by Sales Rep and Category

In [9]:
try:
    logging.info("Total sales by Sales Rep and Category")    
    # Join transactions with products to categorize sales by category
    transactions_with_category = transactions_df.join(products_df, "Product_ID")
    # join transaction with category to sales_team_df
    transactions_with_category = transactions_with_category.join(sales_team_df,"Sales_Rep_ID","inner")
    # Calculate total sales by sales rep and category
    category_sales_per_rep = transactions_with_category.groupBy("Sales_Rep_ID", "Category") \
                                    .agg(first("Name").alias("Name"),
                                    sum("Amount").alias("Total_Sales")) \
                                    .orderBy(col("Total_Sales").desc())

    category_sales_per_rep = category_sales_per_rep.select("Name","Category","Total_Sales").orderBy(col("Total_Sales").desc()).limit(10)
    category_sales_per_rep.show()
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)    

+----------------+-----------+-----------------+
|            Name|   Category|      Total_Sales|
+----------------+-----------+-----------------+
|      Erin Ramos|       Home|           3885.0|
|      Erin Ramos|Electronics|3813.780163599182|
|Brian Wallace Md|     Beauty|3732.780163599182|
|     Andrew Rice|Electronics|           3390.0|
|      Bobby Long|Electronics|           3179.0|
|   Cory Cardenas|       Home|3123.780163599182|
|     William Lee|       Home|           2874.0|
|    Robert Payne|   Clothing|2779.780163599182|
|      Erin Ramos|     Beauty|           2754.0|
|     Donna Smith|   Clothing|           2703.0|
+----------------+-----------+-----------------+



#### Calculate Sales by sales Rep on year and month

In [10]:
try:
    logging.info("Sales By Sales Rep on Year and Month")    
    # Extract month from transaction date
    transactions_with_category = transactions_with_category.withColumn("Month", month(col("Date")))
    transactions_with_category = transactions_with_category.withColumn("Year", year(col("Date")))

    # Aggregate sales by month and sales rep
    monthly_sales_per_rep = transactions_with_category.groupBy("Sales_Rep_ID", "Year", "Month") \
                                .agg(first("Name").alias("Name"),
                                sum("Amount").alias("Total_Sales"))

    monthly_sales_per_rep = monthly_sales_per_rep.select("Name","Year","Month","Total_Sales").orderBy(col("Total_Sales").desc()).limit(10)
    monthly_sales_per_rep.show()
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)    


+-----------------+----+-----+-----------------+
|             Name|Year|Month|      Total_Sales|
+-----------------+----+-----+-----------------+
| Brian Wallace Md|2024|    5|           2616.0|
|       Bobby Long|2024|    1|2337.780163599182|
|       Erin Ramos|2024|    6|           2170.0|
|      Donna Smith|2024|    3|2060.780163599182|
|Mitchell Williams|2024|    6|           2014.0|
|   Mary Hernandez|2024|    2|           1984.0|
|     Amber Lawson|2024|    1|1970.780163599182|
|      Andrew Rice|2024|    6|1953.780163599182|
|       Erin Ramos|2024|    3|           1950.0|
|    Michael Cline|2024|    2|           1917.0|
+-----------------+----+-----+-----------------+



In [11]:
try:
    logging.info()    
    # Convert to Pandas for visualization
    final_performance_pd = final_performance_df.limit(20).toPandas()

    # Plot: Target Achieved Percentage
    plt.figure(figsize=(10, 6))
    plt.barh(final_performance_pd['Name'], final_performance_pd['Target_Achieved_Percentage'], color='skyblue')
    plt.xlabel('Target Achieved (%)')
    plt.title('Sales Target Achieved by Each Sales Representative')
    plt.grid(True)
    plt.show()

    # Plot: Revenue Percentage of Target
    plt.figure(figsize=(10, 6))
    plt.barh(final_performance_pd['Name'], final_performance_pd['Revenue_Percentage_Target'], color='orange')
    plt.xlabel('Revenue Percentage of Target (%)')
    plt.title('Revenue Generated as Percentage of Sales Target')
    plt.grid(True)
    plt.show()

    # Pie Chart: Performance Categories
    performance_categories = final_performance_pd['Performance_Category'].value_counts()
    plt.figure(figsize=(8, 8))
    plt.pie(performance_categories, labels=performance_categories.index, autopct='%1.1f%%', startangle=140, colors=['green', 'yellow', 'red'])
    plt.title('Distribution of Sales Performance Categories')
    plt.show()
except Exception as e:
    logging.error("invalid operation performed :{e}", exc_info=True)

ERROR:root:invalid operation performed :{e}
Traceback (most recent call last):
  File "/tmp/ipykernel_99335/2926174729.py", line 2, in <module>
    logging.info()
TypeError: info() missing 1 required positional argument: 'msg'
