<a href="https://colab.research.google.com/github/iRoseM/stackoverflow-spark-analysis/blob/main/BigData_Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# stackoverflow-spark-analysis Project

**Dataset:**
https://www.kaggle.com/datasets/stackoverflow/stack-overflow-2018-developer-survey

**Objectives:**

1.	Process: the large-scale dataset to handle multi-value, missing data, and categorical encoding.

2.	Analyze: correlations between diverse academic backgrounds and the adoption of special technical roles.

3.	Map: the transition patterns between current used technologies and tools developers aim to learn next.

4.	Develop: a classification model to predict a developer's role based on their technical profile.


## Installing Scala enviroment:

In [7]:
import os
import sys

spark_paths = [p for p in sys.path if 'spark' in p.lower()]
for p in spark_paths:
    if p in sys.path:
        sys.path.remove(p)

!pip install pyspark

# Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("StackOverflow Analysis") \
    .master("local[*]") \
    .getOrCreate()


print("Spark version:", spark.version)

Spark version: 3.4.1


## Read Data


In [9]:
from google.colab import drive
drive.mount('/content/drive')

!cp "/content/drive/MyDrive/survey_results_public.csv" ./

# read the file
df0 = spark.read.csv("survey_results_public.csv", header=True, inferSchema=True)
df0.show(5)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+----------+-----+----------+--------------+--------------+------------------+--------------------+--------------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------+----------------+---------------------+---------------------+---------------------+---------------------+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-----------

In [11]:
# 1) READ DATA
# Print basic info
print(f"df0 rows={df0.count()}, cols={len(df0.columns)}")

df0 rows=98855, cols=129


## **Data Preprocessing**

### Select Required Columns
We selected only columns relevant for analysis because other columns like hobbies and open source contributions don't directly help in salary prediction, and reducing columns decreases memory usage while removing irrelevant features improves model performance.

In [12]:
from pyspark.sql import functions as F

# 2) SELECT REQUIRED COLUMNS
# Define columns we need for analysis
cols = [
    "Respondent", "Country", "DevType", "Employment", "YearsCoding", "Age", "Gender",
    "JobSatisfaction", "UndergradMajor", "EducationTypes", "LanguageWorkedWith", "ConvertedSalary"
]

# Select only columns that exist in the dataset
existing_cols = [c for c in cols if c in df0.columns]
df1 = df0.select(*existing_cols)

print(f"df1 rows={df1.count()}, cols={len(df1.columns)}")
print("Available columns:", df1.columns)

df1 rows=98855, cols=12
Available columns: ['Respondent', 'Country', 'DevType', 'Employment', 'YearsCoding', 'Age', 'Gender', 'JobSatisfaction', 'UndergradMajor', 'EducationTypes', 'LanguageWorkedWith', 'ConvertedSalary']


### Data Quality Check


#### 1. Check Duplicates

In [15]:

# Check for duplicate Respondent IDs
if "Respondent" in df1.columns:
    dup_respondent = df1.groupBy("Respondent").count().filter("count > 1").count()
    print(f"Duplicate Respondent IDs: {dup_respondent}")

    if dup_respondent > 0:
        print("Warning: Found duplicate respondents!")
        df1.groupBy("Respondent").count().filter("count > 1").show(5)
else:
    print("Warning: No 'Respondent' column found")

# Check for completely duplicate rows
dup_full = df1.count() - df1.distinct().count()
print(f"Complete duplicate rows: {dup_full}")

Duplicate Respondent IDs: 0
Complete duplicate rows: 0


#### 2. Check Missing Values

In [18]:

# Comprehensive list of missing value indicators
missing_indicators = [
    '', 'na', 'n/a', 'nan', 'null', 'NA', 'N/A', 'NaN',
    '-', '--', 'none', 'None', 'unknown', 'Unknown','rather not say', ' ', '0'
]

def is_missing_detailed(col):
    """Check if column value is missing (comprehensive check)"""
    return (F.col(col).isNull() |
            (F.trim(F.col(col)) == "") |
            F.lower(F.trim(F.col(col))).isin([x.lower() for x in missing_indicators]))

# Check each column for missing values
print("\nMISSING VALUES BY COLUMN:")
print("-" * 80)
print(f"{'Column Name':<30} {'Missing Count':>15} {'Percentage':>15} {'Sample Values':>20}")
print("-" * 80)

for col_name in df1.columns:
    missing_count = df1.filter(is_missing_detailed(col_name)).count()
    missing_pct = (missing_count / df1.count()) * 100

    # Get sample of missing values (first 3 distinct)
    sample_missing = df1.filter(is_missing_detailed(col_name)) \
                           .select(col_name) \
                           .distinct() \
                           .limit(3) \
                           .collect()
    sample_str = ', '.join([str(row[col_name])[:15] for row in sample_missing]) if sample_missing else 'None'

    print(f"{col_name:<30} {missing_count:>15,} {missing_pct:>14.2f}%    {sample_str:>20}")

print("-" * 80)


MISSING VALUES BY COLUMN:
--------------------------------------------------------------------------------
Column Name                      Missing Count      Percentage        Sample Values
--------------------------------------------------------------------------------
Respondent                                   0           0.00%                    None
Country                                    412           0.42%                      NA
DevType                                  6,757           6.84%                      NA
Employment                               3,534           3.57%                      NA
YearsCoding                              5,020           5.08%                      NA
Age                                     34,281          34.68%                      NA
Gender                                  34,386          34.78%                      NA
JobSatisfaction                         29,579          29.92%                      NA
UndergradMajor                 

### Handle Missingg Values

- Drop Rows with Missing Critical Values

We drop the rows that has multiple missing values in the critical columns

In [13]:
# 3) DROP ROWS WITH MISSING (Country/DevType/Employment)
# Helper function to check missing values
def is_missing(col):
    return (F.col(col).isNull() |
            (F.trim(F.col(col)) == "") |
            (F.lower(F.trim(F.col(col))) == "na"))

# Keep only rows where all 3 critical columns have values
df2 = df1.filter(
    ~is_missing("Country") &
    ~is_missing("DevType") &
    ~is_missing("Employment")
)

print(f"df2 rows after dropping key-missing = {df2.count()}")

df2 rows after dropping key-missing = 90782


- Handle DevType Column

We dropped rows with missing DevType values because DevType is an important categorical column with multiple distinct values that cannot be accurately filled using other methods like, mode, or inference from similar rows.



In [19]:
original_count= df0.count()

df_step1 = df2.filter(~is_missing("DevType"))
step1_count = df_step1.count()
deleted_devtype = original_count - step1_count
print(f"Rows deleted (missing DevType): {deleted_devtype:,}")
print(f"Rows remaining: {step1_count:,} ({step1_count/original_count*100:.1f}%)")

Rows deleted (missing DevType): 8,073
Rows remaining: 90,782 (91.8%)


- Handle Salary Column

We handled missing and zero salary values by replacing them with the mean of valid positive salaries, which is an appropriate method for numerical data to maintain the overall distribution while ensuring no missing values remain for analysis.

In [20]:
# Calculate mean from valid positive salaries
valid_salaries = df_step1 \
    .filter(~is_missing("ConvertedSalary")) \
    .select(F.col("ConvertedSalary").cast("double").alias("salary")) \
    .filter(F.col("salary") > 0)

salary_mean = valid_salaries.agg(F.mean("salary")).collect()[0][0]
print(f"Valid salary mean: ${salary_mean:,.2f}")

# Fill missing or zero salaries with mean
df_step2 = df_step1.withColumn(
    "ConvertedSalary",
    F.when(
        is_missing("ConvertedSalary") |
        (F.col("ConvertedSalary").cast("double") <= 0),
        salary_mean
    ).otherwise(F.col("ConvertedSalary").cast("double"))
)

# Verify no zeros or missing remain
zeros_left = df_step2.filter(F.col("ConvertedSalary") == 0).count()
print(f"Zeros after filling: {zeros_left}")

Valid salary mean: $97,323.10
Zeros after filling: 0


- Handle EducationTypes, UndergradMajor, Gender Columns

We replaced missing values in EducationTypes, UndergradMajor, and Gender with "Unknown" because these are categorical columns with distinct categories that cannot be accurately inferred or predicted from other rows, and filling with "Unknown" preserves the data structure while maintaining transparency about which values were originally missing.

In [21]:
df_step3 = df_step2.withColumn(
    "EducationTypes",
    F.when(is_missing("EducationTypes"), "Unknown")
     .otherwise(F.col("EducationTypes"))
).withColumn(
    "UndergradMajor",
    F.when(is_missing("UndergradMajor"), "Unknown")
     .otherwise(F.col("UndergradMajor"))
).withColumn(
    "Gender",
    F.when(is_missing("Gender"), "Unknown")
     .otherwise(F.col("Gender"))
)

- Handle YearsCoding Column

We filled missing YearsCoding values with "0-2 years" because most respondents who leave this field blank are likely new to programming with little or no experience, and using the lowest experience range is a conservative approach that minimizes bias in the data.

In [22]:

df_step4 = df_step3.withColumn(
    "YearsCoding",
    F.when(is_missing("YearsCoding"), "0-2 years")
     .otherwise(F.col("YearsCoding"))
)

- Handle JobSatisfaction Column

We filled missing JobSatisfaction values by assuming average satisfaction is reasonable when no specific information is provided, and using the mode preserves the original distribution pattern of satisfaction levels among respondents.

In [23]:

# Find most common JobSatisfaction value
mode_job_satisfaction = df_step4 \
    .filter(~is_missing("JobSatisfaction")) \
    .groupBy("JobSatisfaction") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .first()[0]

print(f"Most common JobSatisfaction: '{mode_job_satisfaction}'")

df_step5 = df_step4.withColumn(
    "JobSatisfaction",
    F.when(is_missing("JobSatisfaction"), mode_job_satisfaction)
     .otherwise(F.col("JobSatisfaction"))
)

Most common JobSatisfaction: 'Moderately satisfied'


- Handle Age Column

We filled missing Age values with the most common age range because age distribution in developer surveys typically follows a pattern.

In [24]:
# Find most common Age
mode_age = df_step5 \
    .filter(~is_missing("Age")) \
    .groupBy("Age") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .first()[0]

print(f"Most common Age: '{mode_age}'")

df_step6 = df_step5.withColumn(
    "Age",
    F.when(is_missing("Age"), mode_age)
     .otherwise(F.col("Age"))
)


Most common Age: '25 - 34 years old'


- Handle Employment Column

We filled missing Employment values with the most common employment status because the majority of survey respondents are typically employed full-time, and using the mode ensures consistency while reflecting the dominant pattern in the developer community.

In [25]:
# Find most common Employment
mode_employment = df_step6 \
    .filter(~is_missing("Employment")) \
    .groupBy("Employment") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .first()[0]

print(f"Most common Employment: '{mode_employment}'")

df_step7 = df_step6.withColumn(
    "Employment",
    F.when(is_missing("Employment"), mode_employment)
     .otherwise(F.col("Employment"))
)


Most common Employment: 'Employed full-time'


- Handle Country column

We filled missing Country values with the most common country because the United States typically represents the largest respondent group in Stack Overflow surveys.

In [26]:

# Find most common Country
mode_country = df_step7 \
    .filter(~is_missing("Country")) \
    .groupBy("Country") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .first()[0]

print(f"Most common Country: '{mode_country}'")

df_step8 = df_step7.withColumn(
    "Country",
    F.when(is_missing("Country"), mode_country)
     .otherwise(F.col("Country"))
)

Most common Country: 'United States'


- Handle LanguageWorkedWith column

We filled missing LanguageWorkedWith values by inferring the most common language from respondents with similar characteristics (same country, dev type, employment, years coding, and age), then deleted remaining rows with "Unknown" language since language data is critical for analyzing technology patterns and cannot be reliably filled for all cases.

In [30]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Calculate most common language for each group
window_spec = Window.partitionBy("Country", "DevType", "Employment", "YearsCoding", "Age")

# Get rows with valid language
valid_langs = df_step8.filter(
    ~is_missing("LanguageWorkedWith")
).select(
    "Country", "DevType", "Employment", "YearsCoding", "Age", "LanguageWorkedWith"
)

# Count languages per group
lang_counts = valid_langs.groupBy(
    "Country", "DevType", "Employment", "YearsCoding", "Age", "LanguageWorkedWith"
).count()

# Get the most common language per group (the one with highest count)
window_spec_rank = Window.partitionBy("Country", "DevType", "Employment", "YearsCoding", "Age") \
                         .orderBy(F.col("count").desc())

most_common_langs = lang_counts.withColumn("rank", F.row_number().over(window_spec_rank)) \
                               .filter(F.col("rank") == 1) \
                               .drop("rank", "count") \
                               .withColumnRenamed("LanguageWorkedWith", "inferred_language")

# Join back with original data
df_step9 = df_step8.join(
    most_common_langs,
    on=["Country", "DevType", "Employment", "YearsCoding", "Age"],
    how="left"
)

# Fill missing languages
df_step9 = df_step9.withColumn(
    "LanguageWorkedWith",
    F.when(
        is_missing("LanguageWorkedWith"),
        F.coalesce(F.col("inferred_language"), F.lit("Unknown"))
    ).otherwise(F.col("LanguageWorkedWith"))
).drop("inferred_language")

# DELETE ROWS WHERE LanguageWorkedWith IS "Unknown"
print(f"Rows with 'Unknown' language: {df_step9.filter(F.col('LanguageWorkedWith') == 'Unknown').count()}")

df_step10 = df_step9.filter(F.col("LanguageWorkedWith") != "Unknown")

print(f"Rows after deletion: {df_step10.count()}")

# Check results
print("\nLanguage column after imputation and deletion:")
df_step10.groupBy("LanguageWorkedWith").count().orderBy(F.col("count").desc()).show(10)

# Update final dataframe
df_step9 = df_step10

Rows with 'Unknown' language: 7782
Rows after deletion: 83000

Language column after imputation and deletion:
+--------------------+-----+
|  LanguageWorkedWith|count|
+--------------------+-----+
|                Java| 1746|
|C#;JavaScript;SQL...| 1605|
|JavaScript;PHP;SQ...| 1601|
| JavaScript;HTML;CSS| 1223|
|C#;JavaScript;SQL...|  968|
|JavaScript;PHP;HT...|  842|
|JavaScript;PHP;SQ...|  819|
|Java;JavaScript;S...|  749|
|                  C#|  562|
|   Objective-C;Swift|  493|
+--------------------+-----+
only showing top 10 rows



In [33]:
print("\n" + "="*60)
print("FINAL VALIDATION")
print("="*60)

final_count = df_step9.count()
print(f"Final rows: {final_count:,}")
print(f"Kept {final_count/original_count*100:.1f}% of original data")

# Check for any remaining missing values
print("\nRemaining missing values check:")
for col_name in ["Country", "DevType", "Employment", "YearsCoding", "Age", "Gender",
                 "JobSatisfaction", "UndergradMajor", "EducationTypes",
                 "LanguageWorkedWith", "ConvertedSalary"]:

    if col_name == "ConvertedSalary":
        # For salary, check for nulls only (zeros are now filled with mean)
        remaining = df_step9.filter(F.col(col_name).isNull()).count()
    else:
        remaining = df_step9.filter(is_missing(col_name)).count()

    pct = (remaining / final_count) * 100 if final_count > 0 else 0
    status = " CLEAN" if remaining == 0 else f" {remaining} missing"
    print(f"{col_name:20}: {status} ")


FINAL VALIDATION
Final rows: 83,000
Kept 84.0% of original data

Remaining missing values check:
Country             :  CLEAN 
DevType             :  CLEAN 
Employment          :  CLEAN 
YearsCoding         :  CLEAN 
Age                 :  CLEAN 
Gender              :  CLEAN 
JobSatisfaction     :  CLEAN 
UndergradMajor      :  CLEAN 
EducationTypes      :  CLEAN 
LanguageWorkedWith  :  CLEAN 
ConvertedSalary     :  CLEAN 


## Outlier Detection and Handling

We removed salary outliers using the Z-score method with a threshold of 3, which eliminates extreme values that deviate more than three standard deviations from the mean, ensuring the salary distribution is more representative of typical developer compensation and reducing the impact of anomalous data on subsequent analysis and modeling.


In [41]:
# OUTLIER REMOVAL FOR ConvertedSalary
print("\n" + "="*60)
print("OUTLIER REMOVAL FOR SALARY")
print("="*60)

# Calculate mean and standard deviation
salary_stats = df_step9.select(
    F.mean("ConvertedSalary").alias("mean"),
    F.stddev("ConvertedSalary").alias("stddev")
).collect()[0]

mean_sal = salary_stats['mean']
std_sal = salary_stats['stddev']

print(f"Before outlier removal:")
print(f"  - Mean salary: ${mean_sal:,.2f}")
print(f"  - Std deviation: ${std_sal:,.2f}")
print(f"  - Total rows: {df_step9.count():,}")

# Calculate Z-score and filter out outliers (|z| > 3)
df_with_zscore = df_step9.withColumn(
    "zscore",
    F.abs((F.col("ConvertedSalary") - mean_sal) / std_sal)
)

# Count outliers
outliers_count = df_with_zscore.filter(F.col("zscore") > 3).count()
print(f"\nOutliers detected (|z| > 3): {outliers_count:,}")

# Remove outliers
df_step10 = df_with_zscore.filter(F.col("zscore") <= 3).drop("zscore")

print(f"After outlier removal: {df_step10.count():,} rows")
print(f"Rows removed: {df_step9.count() - df_step10.count():,}")

# Show salary statistics after outlier removal
new_stats = df_step10.select(
    F.mean("ConvertedSalary").alias("mean"),
    F.stddev("ConvertedSalary").alias("stddev"),
    F.min("ConvertedSalary").alias("min"),
    F.max("ConvertedSalary").alias("max")
).collect()[0]

print(f"\nNew salary statistics:")
print(f"  - Mean: ${new_stats['mean']:,.2f}")
print(f"  - Std dev: ${new_stats['stddev']:,.2f}")
print(f"  - Min: ${new_stats['min']:,.2f}")
print(f"  - Max: ${new_stats['max']:,.2f}")

# Final dataset info
print(f"\nFinal dataset:")
print(f"  - Original rows: {original_count:,}")
print(f"  - Final rows: {df_step10.count():,}")
print(f"  - Retention rate: {df_step10.count()/original_count*100:.1f}%")

# ============================================
# SAVE FINAL CLEANED DATA
# ============================================
print("\n" + "="*60)
print("SAVING FINAL CLEANED DATA")
print("="*60)

df_final = spark.read.option("header", "true").csv("/content/stackoverflow_survey_afterCleaning/*.csv")

# save the file as csv in drive
df_final.toPandas().to_csv("/content/drive/MyDrive/stackoverflow_survey_final.csv", index=False)
print(f" Data saved in: /content/drive/MyDrive/stackoverflow_survey_final.csv")


OUTLIER REMOVAL FOR SALARY
Before outlier removal:
  - Mean salary: $97,448.11
  - Std deviation: $150,885.59
  - Total rows: 83,000

Outliers detected (|z| > 3): 1,418
After outlier removal: 81,582 rows
Rows removed: 1,418

New salary statistics:
  - Mean: $80,315.76
  - Std dev: $47,649.90
  - Min: $1.00
  - Max: $550,056.00

Final dataset:
  - Original rows: 98,855
  - Final rows: 81,582
  - Retention rate: 82.5%

SAVING FINAL CLEANED DATA
 Data saved in: /content/drive/MyDrive/stackoverflow_survey_final.csv


In [None]:
from google.colab import drive
drive.mount('/content/drive')

!cp "/content/drive/MyDrive/stackoverflow_survey_final.csv" ./

# read the file of cleaned data
df0 = spark.read.csv("stackoverflow_survey_final.csv", header=True, inferSchema=True)
df0.show(5)