Cleaning Data Removing Duplicates and Removing which are not having EMPID and Updating Testscore if not present according to status and department updating the average

Employee

In [None]:
import csv
from collections import Counter

def is_valid_email(email):
    return '@' in email and '.' in email

employees = []
# Read employees from the CSV file
with open('../FAKE DATA/employees.csv', mode='r', newline='') as file:
    reader = csv.DictReader(file)
    
    for row in reader:
        if is_valid_email(row['employeeEmail']):  # Check if email is valid
            employees.append(row)

# Find the mode of the gender column
genders = [emp['gender'] for emp in employees if emp['gender']]  # Collect genders ignoring empty
if genders:  
    mode_gender = Counter(genders).most_common(1)[0][0]  # Find the mode
else:
    mode_gender = None

# Replace missing gender values with the mode
for emp in employees:
    if not emp['gender']:  
        emp['gender'] = mode_gender

# Remove duplicates by using employee ID as the key
unique_employees = {emp['id']: emp for emp in employees}.values()

# Convert unique_employees back to a list
final_employees = list(unique_employees)

# Print the unique employees
for emp in final_employees:
    print(emp)

print(f"\nTotal Unique Employees: {len(final_employees)}")

# Define the output file path
output_file_path = 'employees.csv'  # Change this to your desired output CSV path

# Write unique employees to a new CSV file
with open(output_file_path, mode='w', newline='') as file:
    fieldnames = final_employees[0].keys()  # Get the field names from the first employee
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    
    writer.writeheader()  # Write the header
    for emp in final_employees:
        writer.writerow(emp)  # Write each employee's data

print(f"\nUnique employees saved to {output_file_path}")



Certification

In [None]:
import csv
from collections import Counter

def is_valid_url(url):
    return url.startswith('http://') or url.startswith('https://')

# List to store valid certifications
certifications = []

# Read the CSV file
with open('../FAKE DATA/certifications.csv', mode='r', newline='') as file:
    reader = csv.DictReader(file)
    
    for row in reader:
        if is_valid_url(row['certificationLink']):  # Check if certification link is valid
            certifications.append(row)

# Remove duplicates using certification ID as the key
unique_certifications = {cert['id']: cert for cert in certifications}.values()

# Convert unique certifications back to a list
final_certifications = list(unique_certifications)

# Specify the output file path
output_file_path = 'certifications.csv'

# Define the fieldnames
fieldnames = ['id', 'employeeId', 'courseName', 'skills', 'courseDepartment', 'certificationLink','status']

# Write the unique certifications to a new CSV file
with open(output_file_path, mode='w', newline='') as output_file:
    writer = csv.DictWriter(output_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write unique certifications
    for cert in final_certifications:
        # Ensure to filter out any unwanted fields
        filtered_cert = {key: cert[key] for key in fieldnames if key in cert}
        writer.writerow(filtered_cert)

# Print unique certification data
for cert in final_certifications:
    print(cert)

# Print total unique certifications
print(f"\nTotal Unique Certifications: {len(final_certifications)}")

print(f"Unique certifications saved to {output_file_path}")


Skill Score

In [None]:
import csv
from collections import defaultdict

# Define departments (for reference)
departments = {
    'Development': ['Reactjs', 'Node', 'Next Js', 'Laravel', 'Angular', 'Flutter', 'React Native'],
    'Data Science': ['Python', 'Applied ML', 'Big Data'],
    'Data Engineering': ['Python', 'MySQL', 'Web Scraping', 'DBT', 'SnowFlake', 'Data Bricks'],
    'Cloud': ['AWS', 'Azure', 'GCP', 'Redis']
}

# Read skill scores from the CSV file
skill_scores = []
with open('../FAKE DATA/skill_scores.csv', mode='r', newline='') as file:
    reader = csv.DictReader(file)
    
    for row in reader:
        # Only keep rows where employeeId is present
        if row['employeeId']:
            # Convert testScore and noOfAttempts to integers (use 0 if conversion fails)
            try:
                row['testScore'] = int(row['testScore']) if row['testScore'] else 0
            except ValueError:
                row['testScore'] = 0

            try:
                row['noOfAttempts'] = int(row['noOfAttempts']) if row['noOfAttempts'] else 0
            except ValueError:
                row['noOfAttempts'] = 0

            skill_scores.append(row)

# Group test scores by department and status
department_scores = defaultdict(lambda: {'accepted': [], 'rejected': []})

for score in skill_scores:
    course_dept = score['courseDepartment']
    test_score = score['testScore']
    status = score['status']

    # Only process if department is valid
    if course_dept in departments:
        if status == 'Accept':
            department_scores[course_dept]['accepted'].append(test_score)
        elif status == 'Reject':
            department_scores[course_dept]['rejected'].append(test_score)

# Calculate average scores for each department
average_scores = {}
for dept, scores in department_scores.items():
    accepted_scores = scores['accepted']
    rejected_scores = scores['rejected']

    # Calculate averages if there are scores
    avg_accepted = sum(accepted_scores) / len(accepted_scores) if accepted_scores else None
    avg_rejected = sum(rejected_scores) / len(rejected_scores) if rejected_scores else None

    average_scores[dept] = {
        'avg_accepted': avg_accepted,
        'avg_rejected': avg_rejected
    }

# Update skill scores with average where needed
for score in skill_scores:
    course_dept = score['courseDepartment']
    
    # Check if department is valid and score is missing
    if course_dept in average_scores:
        if score['testScore'] == 0:  # Assuming a score of 0 means it's missing
            # Update with average based on the status
            if score['status'] == 'Accept':
                # Update with average accepted score if available
                if average_scores[course_dept]['avg_accepted'] is not None:
                    score['testScore'] = average_scores[course_dept]['avg_accepted']
            elif score['status'] == 'Reject':
                # Update with average rejected score if available
                if average_scores[course_dept]['avg_rejected'] is not None:
                    score['testScore'] = average_scores[course_dept]['avg_rejected']

# Specify the output file path for updated skill scores
output_file_path = 'skill_scores.csv'

# Write the updated skill scores to a new CSV file
with open(output_file_path, mode='w', newline='') as output_file:
    fieldnames = ['id', 'employeeId', 'assessmentId', 'courseName', 'skill', 'courseDepartment', 'testScore', 'status', 'noOfAttempts']
    writer = csv.DictWriter(output_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write updated skill scores
    for score in skill_scores:
        writer.writerow(score)

# Print the updated skill scores
for score in skill_scores:
    print(score)

# Print the total number of skill scores processed
print(f"\nTotal Processed Skill Scores: {len(skill_scores)}")
print(f"Updated skill scores saved to {output_file_path}")


In [None]:
#Skilset

In [None]:
import csv

# Define the input file path
input_file_path = '../FAKE DATA/skillsets.csv'  # Change this to your input CSV path
output_file_path = 'skillsets.csv'  # Change this to your desired output CSV path

# Read and filter the CSV data
filtered_data = []
with open(input_file_path, mode='r', newline='') as file:
    reader = csv.DictReader(file)
    
    for row in reader:
        # Check if employeeId is present
        if row['employeeId']:
            # Check if both skillSet and department are present
            if row['skillSet'] or row['department']:  # Retain if at least one is present
                filtered_data.append(row)

# Print the filtered rows
for row in filtered_data:
    print(row)

# Output the number of rows after filtering
print(f"\nTotal Rows After Filtering: {len(filtered_data)}")

# Optional: Save the filtered data to a new CSV file
with open(output_file_path, mode='w', newline='') as output_file:
    fieldnames = ['id', 'employeeId', 'skillSet', 'department']
    writer = csv.DictWriter(output_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write filtered rows
    writer.writerows(filtered_data)

print(f"Filtered data saved to {output_file_path}")

