<a href="https://colab.research.google.com/github/AmoghArakere/pyspark_practice_qns/blob/main/skill_lab_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

   
# **HR_Analytics**



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, udf, expr
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("HRDatasetCleaning").getOrCreate()

data = spark.read.csv("HR_Analytics.csv", header=True, inferSchema=True)

required_columns = [col_name for col_name in data.columns if not col_name.startswith("_c")]
data = data.select(*required_columns)

data = data.withColumn("BusinessTravel", when(col("BusinessTravel").isNull(), "Unknown").otherwise(col("BusinessTravel")))

mean_distance = data.select(mean(col("DistanceFromHome"))).collect()[0][0]
data = data.withColumn("DistanceFromHome", when(col("DistanceFromHome").isNull(), mean_distance).otherwise(col("DistanceFromHome")))

data = data.withColumn("EducationField", when(col("EducationField").isNull(), "Other").otherwise(col("EducationField")))

data = data.dropDuplicates(["EmpID"])

data = data.filter(col("DistanceFromHome") <= 50)

data = data.withColumn(
    "PercentSalaryHike_Category",
    when(col("PercentSalaryHike") <= 10, "Low")
    .when((col("PercentSalaryHike") > 10) & (col("PercentSalaryHike") <= 20), "Moderate")
    .otherwise("High")
)

job_sat_map = {1: "Poor", 2: "Below Average", 3: "Average", 4: "Excellent"}
job_sat_udf = udf(lambda x: job_sat_map.get(x), StringType())
data = data.withColumn("JobSatisfaction", job_sat_udf(col("JobSatisfaction")))

data = data.withColumn("SalarySlab", expr("CAST(SUBSTRING(SalarySlab, 1, LENGTH(SalarySlab) - 1) AS INT) * 1000"))

data.show()
data.write.csv("cleaned_hr_analytics12.csv", header=True)


+-----+---+--------+-----------------+--------------------+----------------+---------+--------------+------+--------------------+---------------+-------------+----------+--------+-----------------+--------------------------+
|EmpID|Age|AgeGroup|   BusinessTravel|          Department|DistanceFromHome|Education|EducationField|Gender|             JobRole|JobSatisfaction|MaritalStatus|SalarySlab|OverTime|PercentSalaryHike|PercentSalaryHike_Category|
+-----+---+--------+-----------------+--------------------+----------------+---------+--------------+------+--------------------+---------------+-------------+----------+--------+-----------------+--------------------------+
|RM004| 33|   26-35|Travel_Frequently|Research & Develo...|             3.0|        4| Life Sciences|Female|  Research Scientist|        Average|      Married|     19000|     Yes|               11|                  Moderate|
|RM005| 27|   26-35|    Travel_Rarely|Research & Develo...|             2.0|        1|       Medical




#**Real estate**


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("RealEstateDataCleaning").getOrCreate()
data = spark.read.csv("Real_estate.csv", header=True, inferSchema=True)
data=data.dropna()
data = data.withColumn("Date Recorded", to_date(col("Date Recorded"), "MM/dd/yyyy"))
data=data.dropna()
data = data.withColumn("Sales Ratio", col("Assessed Value") / col("Sale Amount"))
data = data.withColumn("Residential Type", when(col("Residential Type").isNull(), "Unknown").otherwise(col("Residential Type")))
data = data.filter((col("List Year") >= 2001) & (col("List Year") <= 2022))
mode_town = data.groupBy("Town").count().orderBy("count", ascending=False).first()[0]
data = data.withColumn("Town", when(col("Town").isNull(), lit(mode_town)).otherwise(col("Town")))
data.show()
data.write.csv("cleaned_real_estate21.csv", header=True)

+-------------+---------+-------------+------------+--------------------+--------------+-----------+-------------+----------------+-------------------+
|Serial Number|List Year|Date Recorded|        Town|             Address|Assessed Value|Sale Amount|Property Type|Residential Type|        Sales Ratio|
+-------------+---------+-------------+------------+--------------------+--------------+-----------+-------------+----------------+-------------------+
|       220008|     2022|   2023-01-30|     Andover|         618 ROUTE 6|        139020|     232000|  Residential|   Single Family| 0.5992241379310345|
|       200243|     2020|   2021-04-13|        Avon|111 NORTHINGTON D...|        619290|     890000|  Residential|   Single Family| 0.6958314606741574|
|        22043|     2022|   2023-03-15|Beacon Falls|   41 EDGEWOOD DRIVE|        164170|     285000|  Residential|   Single Family| 0.5760350877192982|
|       220440|     2022|   2023-02-16|    Branford|          69 MONTOYA|         84300|



# **Student Info**




In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("StudentInfoCleaning").getOrCreate()
df = spark.read.csv("student-info.csv", header=True, inferSchema=True)
df = df.withColumn("school",
    when(col("school") == "GP", "Gabriel Pereira")
    .when(col("school") == "MS", "Mousinho da Silveira")
    .otherwise(col("school"))
)
df = df.withColumn("address",
    when(col("address") == "U", "Urban")
    .when(col("address") == "R", "Rural")
    .otherwise(col("address"))
)
df = df.withColumn("family_size",
    when(col("family_size") == "LE3", "Less or Equal to 3")
    .when(col("family_size") == "GT3", "Greater than 3")
    .otherwise(col("family_size"))
)
def replace_education(column):
    return when(column == 0, "None") \
        .when(column == 1, "Primary Education (4th Grade)") \
        .when(column == 2, "5th to 9th Grade") \
        .when(column == 3, "Secondary Education") \
        .when(column == 4, "Higher Education") \
        .otherwise(column)
df = df.withColumn("mother_edu", replace_education(col("mother_edu"))) \
       .withColumn("father_edu", replace_education(col("father_edu")))
df = df.fillna("other", subset=["mother_job"])
df = df.withColumn("traveltime",
    when(col("traveltime") == 1, "15 min")
    .when(col("traveltime") == 2, "30 min")
    .when(col("traveltime") == 3, "45 min")
    .when(col("traveltime") == 4, "60 min")
    .otherwise(col("traveltime"))
)
df = df.fillna("unknown", subset=["internet"])
df = df.filter((col("age") >= 15) & (col("age") <= 22))
df.show()
df.printSchema()
df.write.csv("cleaned_student_info1.csv", header=True)

+---------------+---+---+-------+------------------+--------------------+--------------------+----------+----------+----------+--------+
|         school|sex|age|address|       family_size|          mother_edu|          father_edu|Mother_job|Father_job|traveltime|internet|
+---------------+---+---+-------+------------------+--------------------+--------------------+----------+----------+----------+--------+
|Gabriel Pereira|  F| 18|  Urban|    Greater than 3|    Higher Education|    Higher Education|   at_home|   teacher|    30 min|      no|
|Gabriel Pereira|  F| 17|  Urban|    Greater than 3|Primary Education...|Primary Education...|   at_home|     other|    15 min|     yes|
|Gabriel Pereira|  F| 15|  Urban|Less or Equal to 3|Primary Education...|Primary Education...|   at_home|     other|    15 min|     yes|
|Gabriel Pereira|  F| 15|  Urban|    Greater than 3|    Higher Education|    5th to 9th Grade|    health|  services|    15 min|     yes|
|Gabriel Pereira|  F| 16|  Urban|    Grea



# **Phone Usage**




In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("PhoneDatasetCleaning").getOrCreate()
data = spark.read.csv("phone_usage_india.csv", header=True, inferSchema=True)
data = data.filter((col("Age") >= 18) & (col("Age") <= 90))
data = data.withColumn(
    "Age_Category",
    when(col("Age") <= 35, "Young")
    .when((col("Age") > 35) & (col("Age") <= 60), "Middle-aged")
    .otherwise("Senior")
)
data = data.withColumn("Location", when(col("Location").isNull(), "Unknown").otherwise(col("Location")))
time_columns = ["Screen Time (hrs/day)", "Social Media Time (hrs/day)", "Streaming Time (hrs/day)", "Gaming Time (hrs/day)"]
for col_name in time_columns:
    minutes_col = col_name.replace("hrs/day", "Minutes")
    data = data.withColumn(minutes_col, col(col_name) * 60)
data = data.withColumn(
    "Screen Time (Minutes)",
    when(col("Screen Time (Minutes)") <= 180, "Low Usage")
    .when((col("Screen Time (Minutes)") > 180) & (col("Screen Time (Minutes)") <= 360), "Moderate Usage")
    .otherwise("High Usage")
)
agg_data = data.groupBy("Primary Use").sum("Monthly Recharge Cost (INR)").withColumnRenamed("sum(Monthly Recharge Cost (INR))", "Total Monthly Recharge Cost")
data.show()
agg_data.show()
data.write.csv("cleaned_phone_usage1.csv", header=True)

+-------+---+------+---------+------------+-------+---------------------+---------------------+-------------------------+------------------------+---------------------------+----------------------------+------------------------+---------------------+---------------------------+-------------+------------+---------------------+---------------------------+------------------------+---------------------+
|User ID|Age|Gender| Location| Phone Brand|     OS|Screen Time (hrs/day)|Data Usage (GB/month)|Calls Duration (mins/day)|Number of Apps Installed|Social Media Time (hrs/day)|E-commerce Spend (INR/month)|Streaming Time (hrs/day)|Gaming Time (hrs/day)|Monthly Recharge Cost (INR)|  Primary Use|Age_Category|Screen Time (Minutes)|Social Media Time (Minutes)|Streaming Time (Minutes)|Gaming Time (Minutes)|
+-------+---+------+---------+------------+-------+---------------------+---------------------+-------------------------+------------------------+---------------------------+--------------------



# **Audible**




In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("AudibleDatasetCleaning").getOrCreate()
data = spark.read.csv("audible_uncleaned.csv", header=True, inferSchema=True)
data = data.withColumn("author", when(col("author").isNull(), "Unknown").otherwise(col("author")))
data = data.withColumn("narrator", when(col("narrator").isNull(), "Unknown").otherwise(col("narrator")))
data = data.dropna(subset=["stars"])
data = data.filter((col("price") >= 100) & (col("price") <= 2000))
data = data.withColumn("author", regexp_replace(col("author"), "Writtenby:\\s*", ""))
data = data.withColumn("narrator", regexp_replace(col("narrator"), "Narratedby:\\s*", ""))
def convert_time_to_minutes(time_str):
    if not time_str:
        return None
    parts = time_str.split(" and ")
    hours = int(parts[0].split(" ")[0]) if "hrs" in parts[0] else 0
    minutes = int(parts[1].split(" ")[0]) if len(parts) > 1 else 0
    return hours * 60 + minutes
time_udf = udf(convert_time_to_minutes, IntegerType())
data = data.withColumn("time", time_udf(col("time")))
data = data.withColumn("releasedate", to_date(col("releasedate"), "MM/dd/yyyy"))
data = data.withColumn("n_stars", regexp_extract(col("stars"), r"(\d+(\.\d+)?)\s*out\s*of", 1).cast("float"))
data = data.withColumn("ratings", regexp_extract(col("stars"), r"(\d+)\s*ratings", 1).cast("int"))
data=data.drop("stars")
data = data.withColumn("price", regexp_replace(col("price"), "[^0-9]", "").cast("int"))
language_counts = data.groupBy("language").agg(count("*").alias("audiobook_count"))
data.write.csv("cleaned_audible_dataset342.csv", header=True,mode="overwrite")
data.show()
language_counts.show()

+--------------------+----------------+--------------+----+-----------+--------+-----+-------+-------+
|                name|          author|      narrator|time|releasedate|language|price|n_stars|ratings|
+--------------------+----------------+--------------+----+-----------+--------+-----+-------+-------+
|Geronimo Stilton ...| GeronimoStilton|    BillLobely| 140|       NULL| English|  468|    5.0|     34|
|    The Burning Maze|     RickRiordan| RobbieDaymond| 788|       NULL| English|  820|    4.5|     41|
|        The Deep End|      JeffKinney|    DanRussell| 123|       NULL| English|  410|    4.5|     38|
|Daughter of the Deep|     RickRiordan|SoneelaNankani| 676|       NULL| English|  615|    4.5|     12|
|The Lightning Thi...|     RickRiordan|JesseBernstein| 600|       NULL| English|  820|    4.5|    181|
|The Hunger Games:...|  SuzanneCollins|TatianaMaslany| 635|       NULL| English|  656|    5.0|     72|
|Quest for the Dia...|    WinterMorgan|   LukeDaniels| 143|       NULL| E



# **Crime**




In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CrimeDatasetCleaning").getOrCreate()

data = spark.read.csv("crime_data.csv", header=True, inferSchema=True)

data = data.fillna({"Vict Sex": "Unknown", "Vict Age": 0})
data = data.filter((col("Vict Age") >= 0) & (col("Vict Age") <= 120))
data = data.withColumn("TIME OCC", lpad(col("TIME OCC").cast(StringType()), 4, "0"))
data = data.withColumn("TIME OCC", concat_ws(":", col("TIME OCC").substr(1, 2), col("TIME OCC").substr(3, 2)))
data = data.withColumn("AREA NAME", regexp_replace(col("AREA NAME"), r"[\\|/]+", ""))
data = data.withColumn("Date Rptd", to_timestamp(col("Date Rptd"), "MM/dd/yyyy hh:mm:ss a")).dropna()
data = data.withColumn("Date Rptd", date_format(col("Date Rptd"), "yyyy-MM-dd"))
data = data.withColumn("Crime Code", regexp_replace(col("Crime Code"), r"[^\d]", ""))
data = data.withColumn("Year", year(col("Date Rptd")))
data = data.withColumn("Month", month(col("Date Rptd")))

area_crime_counts = data.groupBy("AREA NAME").agg(count("*").alias("Total Crimes"))

data.write.csv("cleaned_crime_dataset111.csv", header=True, mode="overwrite")

data.show()
area_crime_counts.show()


+---------+----------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+-----------+------------+-------+---------+----+-----+
|    DR_NO| Date Rptd|            DATE OCC|TIME OCC|  AREA NAME|Part 1-2|Crime Code|     Crime Code Desc|Vict Age|Vict Sex|Premis Cd|Premis Desc| Status Desc|    LAT|      LON|Year|Month|
+---------+----------+--------------------+--------+-----------+--------+----------+--------------------+--------+--------+---------+-----------+------------+-------+---------+----+-----+
|201308739|2020-03-27|03/27/2020 12:00:...|   12:10|     Newton|       1|       510|    VEHICLE - STOLEN|       0| Unknown|      101|     STREET| Invest Cont| 34.017|-118.2643|2020|    3|
|201112065|2020-07-31|07/30/2020 12:00:...|   20:30|  Northeast|       1|       510|    VEHICLE - STOLEN|       0| Unknown|      101|     STREET|Adult Arrest|34.0953|-118.2974|2020|    7|
|201215394|2020-06-26|06/25/2020 12:00:...|   22:00|77th Str



# **Movie**




In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("MovieDatasetCleaning").getOrCreate()
data = spark.read.csv("movie_dataset_uncleaned.csv" , header=True, inferSchema=True)
mean_rating = data.select(mean(col("Rating"))).collect()[0][0]
data = data.withColumn("Rating", when(col("Rating").isNull(), mean_rating).otherwise(col("Rating")))
median_votes = data.approxQuantile("Votes", [0.5], 0)[0]
data = data.withColumn("Votes", when(col("Votes").isNull(), median_votes).otherwise(col("Votes")))
data = data.fillna({"RunTime": 0, "Genre": "Unknown", "Gross": "$0.00M"})
data = data.filter((col("Rating") >= 1) & (col("Rating") <= 10))
data = data.withColumn("Year", regexp_extract(col("Year"), "(\\d{4})", 1).cast(IntegerType()))
data = data.withColumn("Start_Year", split(col("Year"), "–").getItem(0).cast(IntegerType()))
data = data.withColumn("End_Year", split(col("Year"), "–").getItem(1).cast(IntegerType()))
data = data.withColumn("Start_Year", when(col("Start_Year").isNull() & col("End_Year").isNotNull(), col("End_Year") - 5)
                       .otherwise(col("Start_Year")))
data = data.withColumn("End_Year", when(col("End_Year").isNull() & col("Start_Year").isNotNull(), col("Start_Year") + 5)
                       .otherwise(col("End_Year")))
data = data.filter(col("Start_Year").isNotNull() & col("End_Year").isNotNull())
data = data.withColumn("Final_Genre", split(col("Genre"), "\\$").getItem(0))
data = data.withColumn("Final_Genre", when(col("Final_Genre").isNull() | (col("Final_Genre") == ""), "unknown")
                       .otherwise(lower(trim(col("Final_Genre")))))
data.write.csv("cleaned_movie_dataset.csv", header=True,mode="overwrite")
data.show()

+--------------------+----+--------------------+------+---------+-------+------+----------+--------+-----------+
|              MOVIES|Year|               GENRE|Rating|    Votes|RunTime| Gross|Start_Year|End_Year|Final_Genre|
+--------------------+----+--------------------+------+---------+-------+------+----------+--------+-----------+
|       Blood Red Sky|2021|Action$ Horror$ T...|   6.1|  21062.0|    121|$0.00M|      2021|    2026|     action|
|Masters of the Un...|2021|Animation$ Action...|   5.0|  17870.0|     25|$0.00M|      2021|    2026|  animation|
|    The Walking Dead|2010|Drama$ Horror$ Th...|   8.2| 885805.0|     44|$0.00M|      2010|    2015|      drama|
|      Rick and Morty|2013|Animation$ Advent...|   9.2| 414849.0|     23|$0.00M|      2013|    2018|  animation|
|         Outer Banks|2020|Action$ Crime$ Drama|   7.6|  25858.0|     50|$0.00M|      2020|    2025|     action|
|The Last Letter f...|2021|      Drama$ Romance|   6.8|   5283.0|    110|$0.00M|      2021|    2

# **Employee**

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Emp").getOrCreate()

emp = spark.read.csv("emp_data.csv", header=True, inferSchema=True)

missing_counts = emp.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in emp.columns])

emps = emp.fillna({"LastName": "Unknown"})
emp = emps.dropna(subset=["EmpID", "StartDate"])

emp = emp.withColumn("Current Employee Rating",
                     when(col("Current Employee Rating") < 1, 1)
                     .when(col("Current Employee Rating") > 5, 5)
                     .otherwise(col("Current Employee Rating")))

emp.select("LocationCode").distinct().show()

df = emp.dropDuplicates()
df1 = df.groupBy("DepartmentType", "Title").count().orderBy("DepartmentType", "Title")
df1.show()

emp = emp.withColumn("Performance Score",
                     when(col("Performance Score") == "Exceeds", 3)
                     .when(col("Performance Score") == "Fully Meets", 2)
                     .when(col("Performance Score") == "Needs Improvement", 1)
                     .otherwise(0))


window_spec = Window.partitionBy("DepartmentType").orderBy(desc("Performance Score"))

top_performers = emp.withColumn("rank", rank().over(window_spec)).filter(col("rank") == 1).drop("rank")
top_performers.show()

df.write.csv("cleaned_employee_dataset00.csv", header=True, mode="overwrite")
df.show()

+------------+
|LocationCode|
+------------+
|        2122|
|       97413|
|       80424|
|        1460|
|       75321|
|       49449|
|       56687|
|       78046|
|       44553|
|        9454|
|       57754|
|        8779|
|       29811|
|       65321|
|       39859|
|       49914|
|       74388|
|       34011|
|       60107|
|       31207|
+------------+
only showing top 20 rows

+-----------------+--------------------+-----+
|   DepartmentType|               Title|count|
+-----------------+--------------------+-----+
| Executive Office|    Network Engineer|   18|
|            IT/IS|  Area Sales Manager|    7|
|            IT/IS|         BI Director|    1|
|            IT/IS|                 CIO|    2|
|            IT/IS|        Data Analyst|   10|
|            IT/IS|       Data Analyst |    1|
|            IT/IS|      Data Architect|    2|
|            IT/IS|Database Administ...|   15|
|            IT/IS|Enterprise Architect|    5|
|            IT/IS|          IT Support|   44|
|  

#**Sales**

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
df = spark.read.csv("Sales Data.csv", header=True, inferSchema=True)
numerical_columns = ['Sales', 'Quantity Ordered']
for col_name in numerical_columns:
    mean_value = df.select(mean(col_name)).collect()[0][0]
    df = df.fillna({col_name: mean_value})
df = df.dropna()
df = df.dropDuplicates()
df = df.withColumn("Sales", col("Sales").cast("float"))
df = df.withColumn("Quantity Ordered", col("Quantity Ordered").cast("integer"))
df = df.withColumn("Price Each", col("Price Each").cast("float"))
df.printSchema()
columns_to_check = ['Sales', 'Price Each', 'Quantity Ordered']
for col_name in columns_to_check:
    df = df.filter(col(col_name) >= 0)
total_sales=df.groupBy("Product").sum("Sales").withColumnRenamed("sum(Sales)", "Total Sales")
total_sales.show()
df.write.csv("cleaned_sales_dataset12.csv", header=True,mode="overwrite")
df.show()

root
 |-- _c0: integer (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: float (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Sales: float (nullable = false)
 |-- City: string (nullable = true)
 |-- Hour: integer (nullable = true)

+--------------------+------------------+
|             Product|       Total Sales|
+--------------------+------------------+
|    Wired Headphones|246651.92929840088|
|  Macbook Pro Laptop|         8037600.0|
|Apple Airpods Hea...|         2349150.0|
|              iPhone|         4794300.0|
|Lightning Chargin...|347094.14547920227|
|Bose SoundSport H...|1345565.4012680054|
|USB-C Charging Cable| 286674.7890357971|
|AAA Batteries (4-...| 92740.83064889908|
|        20in Monitor| 454148.7011795044|
|    27in FHD Monitor|1132424.5414733887|
|   

# **Job**

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("jobs").getOrCreate()

jobs = spark.read.csv("Jobs.csv", header=True, inferSchema=True)

jobs = jobs.withColumn("Salary Estimate", regexp_replace(col("Salary Estimate"), r"\s+", ""))

jobs = jobs.withColumn("min_salary", regexp_extract(col("Salary Estimate"), r"(\d+)-", 1).cast("double"))
jobs = jobs.withColumn("max_salary", regexp_extract(col("Salary Estimate"), r"-(\d+)", 1).cast("double"))

jobs = jobs.withColumn("avg_sal", (col("min_salary") + col("max_salary")) / 2)

jobs = jobs.withColumn("Rating", when((col("Rating") == 0) | (col("Rating") == -1), 1).otherwise(col("Rating")))

columns_with_nulls = [col_name for col_name in jobs.columns if jobs.filter(col(col_name).isNull()).count() > 0]
df_cleaned = jobs.fillna({col_name: -1 for col_name in columns_with_nulls})

df_cleaned = df_cleaned.withColumn("company_size_category",
    when(col("Size").contains("1 to 50") | col("Size").contains("51 to 200"), "Small")
    .when(col("Size").contains("201 to 500") | col("Size").contains("501 to 1000"), "Medium")
    .when(col("Size").contains("1001 to 5000") | col("Size").contains("5001 to 10000") |
          col("Size").contains("10000+"), "Large")
    .otherwise("Unknown"))

salary_by_size = df_cleaned.groupBy("company_size_category").agg(avg("avg_sal").alias("average_salary")).orderBy("company_size_category")

salary_by_size.show()
df_cleaned.show()
df_cleaned.write.csv("jobies", header=True, mode="overwrite")

+---------------------+------------------+
|company_size_category|    average_salary|
+---------------------+------------------+
|                Large|124.48571428571428|
|               Medium|120.93506493506493|
|                Small|122.80325814536342|
|              Unknown| 135.6590909090909|
+---------------------+------------------+

+--------------------+---------------+------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+-----------+------+-----+-------+----------+----------+-------+---------------------+
|           Job Title|Salary Estimate|Rating|         Location|                Size|   Type of ownership|            Industry|              Sector|job_state|company_age|python|spark|tableau|min_salary|max_salary|avg_sal|company_size_category|
+--------------------+---------------+------+-----------------+--------------------+--------------------+--------------------+--------------------+---------+-----------+