1. Өгөгдлийн багцад хэдэн мөр багтсан бэ? 2021 оны өгөгдлийг ялган nyc нэртэй фреймворк үүсгэ.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC Payroll Analysis").getOrCreate()

file_path = '/content/Payroll_Data__Fiscal_Year.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

total_rows = df.count()
print(f"Total rows in dataset: {total_rows}")

nyc = df.filter(df["Fiscal Year"] == 2021)
nyc_rows = nyc.count()
print(f"Rows in 2021 data: {nyc_rows}")


Total rows in dataset: 6225611
Rows in 2021 data: 573477


2.     Зөвхөн 2021 оны санхүүгийн жилийн мэдээллийг ашиглан хичнээн хүн төлөөлдөг вэ?

In [None]:
unique_people = nyc.select("Payroll Number").distinct().count()
print(f"Unique people in 2021 fiscal year: {unique_people}")

Unique people in 2021 fiscal year: 156


3    2021 онд хэн хамгийн өндөр үндсэн цалинтай байсан бэ? Түүний үндсэн цалин, ямар агентлагт ажилладаг, түүний албан тушаалыг харуул.

In [None]:
from pyspark.sql.functions import col, concat, lit

highest_salary = nyc.orderBy(col("Base Salary").desc()) \
    .select(concat("First Name", lit(" "), "Last Name").alias("Full Name"), "Base Salary", "Agency Name", "Title Description") \
    .first()

print(f"Employee Name: {highest_salary['Full Name']}")
print(f"Base Salary: {highest_salary['Base Salary']}")
print(f"Agency: {highest_salary['Agency Name']}")
print(f"Title: {highest_salary['Title Description']}")

Employee Name: GUAJIRA THOMAS
Base Salary: 99.62
Agency: DEPT OF HEALTH/MENTAL HYGIENE
Title: CITY MEDICAL SPECIALIST


4.     2021 онд хамгийн өндөр цалинтай 5 ажилтныг харуулах

In [None]:
top_5_employees = nyc.orderBy(col("Base Salary").desc()) \
    .select(concat("First Name", lit(" "), "Last Name").alias("Employee Name"), "Base Salary", "Agency Name", "Title Description") \
    .limit(5)

top_5_employees.show()

+------------------+-----------+--------------------+--------------------+
|     Employee Name|Base Salary|         Agency Name|   Title Description|
+------------------+-----------+--------------------+--------------------+
|    GUAJIRA THOMAS|      99.62|DEPT OF HEALTH/ME...|CITY MEDICAL SPEC...|
|   HERBERT KWASNIK|      99.37|DEPARTMENT OF COR...|CITY MEDICAL SPEC...|
|   MITHLESH MATHUR|      99.37|DEPARTMENT OF COR...|CITY MEDICAL SPEC...|
|    JITENDRA TOLIA|      99.37|DEPARTMENT OF COR...|CITY MEDICAL SPEC...|
|CLAUDETTE ANDERSON|      99.37|DEPARTMENT OF COR...|CITY MEDICAL SPEC...|
+------------------+-----------+--------------------+--------------------+



5.    2021 онд аль ажилтан илүү цагаар хамгийн их цалин авсан бэ?

In [None]:
highest_overtime = nyc.orderBy(col("Total OT Paid").desc()) \
    .select(concat("First Name", lit(" "), "Last Name").alias("Employee Name"), "Total OT Paid", "Agency Name", "Title Description") \
    .first()

print(f"Employee Name: {highest_overtime['Employee Name']}")
print(f"Overtime: {highest_overtime['Total OT Paid']}")
print(f"Agency: {highest_overtime['Agency Name']}")
print(f"Title: {highest_overtime['Title Description']}")

Employee Name: SEAN PROWELL
Overtime: 999.80
Agency: DEPARTMENT OF EDUCATION ADMIN
Title: SUPERVISING THERAPIST


6.     Ажилтны цалин нь тэдний тогтмол цалин, илүү цагийн цалин болон бусад нийт цалингийн нийлбэртэй тэнцүү байна. (regular gross pay, total overtime pay, and total other pay) nyc DataFrame-д Total Gross Paid (нийт төлсөн  дүн )хэмээх баганыг үүсгэж мэдээллийг нэгтгэ.

In [None]:
from pyspark.sql.functions import col, coalesce, lit, sum as pyspark_sum

nyc = nyc.withColumn("Total Gross Paid",
                     coalesce(col("Regular Gross Paid").cast("double"), lit(0.0)) +
                     coalesce(col("Total OT Paid").cast("double"), lit(0.0)) +
                     coalesce(col("Total Other Pay").cast("double"), lit(0.0)))

aggregated_data = nyc.groupBy("Agency Name").agg(
    pyspark_sum(col("Total Gross Paid")).alias("Total Gross Paid by Agency")
)

aggregated_data.show()

+--------------------+--------------------------+
|         Agency Name|Total Gross Paid by Agency|
+--------------------+--------------------------+
|LANDMARKS PRESERV...|         8310.920000000002|
|DISTRICT ATTORNEY...|                  20565.88|
|QUEENS COMMUNITY ...|                     72.73|
|OFFICE OF COLLECT...|                       0.0|
|BRONX DISTRICT AT...|         60613.72999999998|
|QUEENS COMMUNITY ...|        2.7199999999999998|
|BRONX COMMUNITY B...|                      83.5|
|QUEENS COMMUNITY ...|                    762.73|
|MANHATTAN COMMUNI...|                    694.67|
|     FIRE DEPARTMENT|         812603.4300000006|
|BROOKLYN COMMUNIT...|                    140.25|
|BROOKLYN COMMUNIT...|                       0.0|
|ADMIN FOR CHILDRE...|        1317813.2499999993|
|QUEENS COMMUNITY ...|                       0.0|
|QUEENS COMMUNITY ...|                    272.75|
|DEPT OF ED PER SE...|      2.3136645300000712E7|
|DEPT OF ED PER DI...|          538600.499999994|


7.     2021 онд бүх ажилчдын цалингийн дундаж болон медиан хэд вэ?

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, mean

nyc = nyc.withColumn("Base Salary", col("Base Salary").cast(DoubleType()))
mean_salary = nyc.agg(mean("Base Salary").alias("Mean Salary")).first()["Mean Salary"]
median_salary = nyc.approxQuantile("Base Salary", [0.5], 0.01)[0]
print(f"Mean Salary: {mean_salary}")
print(f"Median Salary: {median_salary}")

Mean Salary: 53.05191369745888
Median Salary: 33.18


8.     2021 оны мэдээлэлд ажилчид хэдэн  агентлагт ажиллаж байна вэ ?

In [None]:
distinct_agencies = nyc.select("Agency Name").distinct().count()
print(f"Number of agencies in 2021: {distinct_agencies}")


Number of agencies in 2021: 156


9.     2021 онд ямар агентлагууд хамгийн их, хамгийн бага дундаж цалинтай байсан бэ?

In [None]:
from pyspark.sql.functions import when, isnan


nyc = nyc.withColumn("Base Salary", col("Base Salary").cast("double"))

agency_avg_salaries = nyc.groupBy("Agency Name").agg(mean("Base Salary").alias("Average Salary"))

agency_avg_salaries = agency_avg_salaries.withColumn(
    "Average Salary",
    when(isnan("Average Salary"), 0).otherwise(col("Average Salary"))
)

highest_avg_salary = agency_avg_salaries.orderBy(col("Average Salary").desc()).first()

lowest_avg_salary = agency_avg_salaries.filter(col("Average Salary").isNotNull()).orderBy(col("Average Salary").asc()).first()

print(f"Highest Average Salary: {highest_avg_salary['Agency Name']} - ${highest_avg_salary['Average Salary']:.2f}")
if lowest_avg_salary:
    print(f"Lowest Average Salary: {lowest_avg_salary['Agency Name']} - ${lowest_avg_salary['Average Salary']:.2f}")
else:
    print("No agency found with a non-null lowest average salary.")

Highest Average Salary: CIVIL SERVICE COMMISSION - $465.85
Lowest Average Salary: DISTRICT ATTORNEY-SPECIAL NARC - $1.00


10.  Хотын дарга Билл де Блазио 2021 онд хэр их орлого олсон (нийт цалин) бэ? Захирагчийн ажлын албанд хамгийн өндөр цалинг хэн авсан бэ? Хотын дарга Билл де Блазио хотын захирагчийн албанд хэзээ ажиллаж эхэлсэн бэ?

In [None]:
print(nyc.columns)


['Fiscal Year', 'Payroll Number', 'Agency Name', 'Last Name', 'First Name', 'Mid Init', 'Agency Start Date', 'Work Location Borough', 'Title Description', 'Leave Status as of June 30', 'Base Salary', 'Pay Basis', 'Regular Hours', 'Regular Gross Paid', 'OT Hours', 'Total OT Paid', 'Total Other Pay', 'Total Gross Paid']


In [None]:
nyc.filter(
    (col("Last Name") == "DE BLASIO") & (col("First Name") == "BILL") & (col("Fiscal Year") == 2021)
).show(truncate=False)


+-----------+--------------+-------------------+---------+----------+--------+-----------------+---------------------+-----------------+--------------------------+-----------+---------+-------------+------------------+--------+-------------+---------------+----------------+
|Fiscal Year|Payroll Number|Agency Name        |Last Name|First Name|Mid Init|Agency Start Date|Work Location Borough|Title Description|Leave Status as of June 30|Base Salary|Pay Basis|Regular Hours|Regular Gross Paid|OT Hours|Total OT Paid|Total Other Pay|Total Gross Paid|
+-----------+--------------+-------------------+---------+----------+--------+-----------------+---------------------+-----------------+--------------------------+-----------+---------+-------------+------------------+--------+-------------+---------------+----------------+
|2021       |2             |OFFICE OF THE MAYOR|DE BLASIO|BILL      |NULL    |01/01/2014       |MANHATTAN            |MAYOR            |ACTIVE                    |NULL       |

11.  2021 онд нэг ажлын байранд ногдох үндсэн цалин хэд байсан бэ?

In [None]:
nyc = nyc.withColumn("Base Salary", col("Base Salary").cast("double"))
nyc_2021 = nyc.filter(col("Base Salary").isNotNull() & (col("Fiscal Year") == 2021))

avg_salary_per_job = nyc_2021.groupBy("Title Description").agg(
    mean("Base Salary").alias("Average Base Salary")
)

avg_salary_per_job_sorted = avg_salary_per_job.orderBy(col("Average Base Salary").desc())
avg_salary_per_job_sorted.show(truncate=False)


+-------------------------------------------+-------------------+
|Title Description                          |Average Base Salary|
+-------------------------------------------+-------------------+
|PRINCIPAL                                  |870.8126666666667  |
|ASSISTANT PRINCIPAL                        |658.2873239436618  |
|SUPERVISOR                                 |654.3277777777777  |
|SENIOR STATIONARY ENGINEER                 |589.8342857142856  |
|WELDER                                     |548.0              |
|CRANE OPERATOR AMPES                       |543.94             |
|SUPERVISOR OF MECHANICS                    |497.6884210526315  |
|CHAIRMAN                                   |496.31             |
|STATIONARY ENGINEER                        |487.289512670566   |
|OILER                                      |477.6961572052402  |
|SENIOR SUPERVISOR COMMUNICATION ELECTRICIAN|470.96             |
|COMPOSITOR                                 |468.8066666666667  |
|SUPERVISO

12.  Ажлын байр бүрийн хувьд хамгийн өндөр үндсэн цалинтай ажжилтныг харуул.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank, when

nyc_cleaned = nyc.filter(col("Base Salary").isNotNull())
nyc_cleaned = nyc_cleaned.withColumn("Base Salary", col("Base Salary").cast("double"))
nyc_cleaned = nyc_cleaned.withColumn(
    "Annualized Base Salary",
    when(col("Pay Basis") == "Hourly", col("Base Salary") * 2080)
    .when(col("Pay Basis") == "Daily", col("Base Salary") * 260)
    .otherwise(col("Base Salary"))
)

window_spec = Window.partitionBy("Title Description").orderBy(col("Annualized Base Salary").desc())

nyc_with_rank = nyc_cleaned.withColumn("rank", rank().over(window_spec))

highest_paid_per_job = nyc_with_rank.filter(col("rank") == 1).select(
    "Title Description", "Last Name", "First Name", "Annualized Base Salary", "Agency Name"
)
highest_paid_per_job.show(truncate=False)


+------------------------------------+----------+----------+----------------------+------------------------------+
|Title Description                   |Last Name |First Name|Annualized Base Salary|Agency Name                   |
+------------------------------------+----------+----------+----------------------+------------------------------+
|NULL                                |ORTIZ     |A         |16.52                 |DEPT OF ED HRLY SUPPORT STAFF |
|*ATTORNEY AT LAW                    |LEVY      |HELEN     |64.17                 |HOUSING PRESERVATION & DVLPMNT|
|ACCOUNTANT                          |RUVINOV   |VITALIY   |35.26                 |HOUSING PRESERVATION & DVLPMNT|
|ADJUNCT ASSISTANT PROFESSOR         |TEZAPSIDIS|JANE      |757.73                |COMMUNITY COLLEGE (MANHATTAN) |
|ADJUNCT ASSOCIATE PROFESSOR         |GROTH     |CAMILLA   |383.18                |COMMUNITY COLLEGE (BRONX)     |
|ADJUNCT COLLEGE LAB TECH            |MIYASHIRO |SAUNDRA   |55.6                

13.  2021 оны өгөгдлийг ажлын байршил, дараа нь агентлагийн нэрээр бүлэглээд, бүлэглэсэн өгөгдлөөс хамгийн их таван үндсэн цалинтай ажилчдын мэдээллийг харуул.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank, desc

nyc = nyc.withColumn("Base Salary", col("Base Salary").cast("double"))
nyc_cleaned = nyc.filter(col("Base Salary").isNotNull())
window_spec = Window.partitionBy("Work Location Borough", "Agency Name").orderBy(desc("Base Salary"))
nyc_ranked = nyc_cleaned.withColumn("rank", rank().over(window_spec))
top_5_employees_per_group = nyc_ranked.filter(col("rank") <= 5).select(
    "Work Location Borough",
    "Agency Name",
    "Last Name",
    "First Name",
    "Base Salary",
    "Title Description",
    "rank"
)

top_5_employees_per_group.show(truncate=False)


+---------------------+------------------------------+------------+----------+-----------+-----------------------------------------------+----+
|Work Location Borough|Agency Name                   |Last Name   |First Name|Base Salary|Title Description                              |rank|
+---------------------+------------------------------+------------+----------+-----------+-----------------------------------------------+----+
|ALBANY               |DEPT OF ENVIRONMENT PROTECTION|EBERHARDT   |ANDREW    |618.24     |SENIOR STATIONARY ENGINEER                     |1   |
|ALBANY               |DEPT OF ENVIRONMENT PROTECTION|FAHD        |MARK      |501.92     |STATIONARY ENGINEER                            |2   |
|BRONX                |ADMIN FOR CHILDREN'S SVCS     |WASHINGTON  |LINDA     |25.95      |PRINCIPAL ADMINISTRATIVE ASSOCIATE -  NON SUPVR|1   |
|BRONX                |ADMIN FOR CHILDREN'S SVCS     |BLOUNT      |KATHY     |25.95      |PRINCIPAL ADMINISTRATIVE ASSOCIATE -  NON SUPV

14.  Цалингийн нийлбэоийг оноор харуул.

In [None]:
nyc = nyc.withColumn(
    "Total Salary",
    coalesce(col("Regular Gross Paid").cast("double"), lit(0.0)) +
    coalesce(col("Total OT Paid").cast("double"), lit(0.0)) +
    coalesce(col("Total Other Pay").cast("double"), lit(0.0))
)

In [None]:
null_total_salary_count = nyc.filter(col("Total Salary").isNull()).count()
print(f"Rows with NULL Total Salary after recalculation: {null_total_salary_count}")

Rows with NULL Total Salary after recalculation: 0


In [None]:
from pyspark.sql.functions import sum as _sum # Import sum function with an alias

total_salary_by_year = nyc.groupBy("Fiscal Year").agg(
    _sum(col("Total Salary").cast("double")).alias("Total Salary") # Explicitly cast the column to double and use aliased sum
)

total_salary_by_year = total_salary_by_year.orderBy("Fiscal Year")
total_salary_by_year.show(truncate=False)

+-----------+------------------+
|Fiscal Year|Total Salary      |
+-----------+------------------+
|2021       |7.47281298899989E7|
+-----------+------------------+



15.  Ажилчдын тоог оноор харуул.

In [None]:
from pyspark.sql.functions import countDistinct

employees_by_year = nyc.groupBy("Fiscal Year").agg(
    countDistinct("Payroll Number").alias("Number of Employees")
)
employees_by_year = employees_by_year.orderBy("Fiscal Year")

employees_by_year.show(truncate=False)

+-----------+-------------------+
|Fiscal Year|Number of Employees|
+-----------+-------------------+
|2021       |156                |
+-----------+-------------------+



In [None]:
nyc.select("Fiscal Year").distinct().show()

+-----------+
|Fiscal Year|
+-----------+
|       2021|
+-----------+



16.  2021 оны нийт цалингийн медиан дээр үндэслэн ямар албан тушаалууд (titles -аар нь бүлэглэсэн) хамгийн их цалин авдаг вэ?

In [None]:
from pyspark.sql.functions import col, expr, percentile_approx

nyc = nyc.withColumn("Total Salary", col("Total Salary").cast("double"))

median_salary_per_title = nyc.groupBy("Title Description").agg(
    percentile_approx("Total Salary", 0.5).alias("Median Total Salary")
)

highest_median_salaries = median_salary_per_title.orderBy(col("Median Total Salary").desc())

highest_median_salaries.show(truncate=False)


+----------------------------------------------------------+-------------------+
|Title Description                                         |Median Total Salary|
+----------------------------------------------------------+-------------------+
|PRINCIPAL SCHOOL-NEIGHBORHOOD WORKER                      |1144.23            |
|HOME AIDE                                                 |907.09             |
|SCHOOL CROSSING GUARD                                     |844.77             |
|SENIOR SCHOOL LUNCH AIDE                                  |842.44             |
|DIRECTOR OF CONTRACT ADMINISTRATION                       |838.0              |
|FINANCE OFFICER                                           |802.55             |
|HUMAN RESOURCES TECHNICIAN                                |797.63             |
|SCHOOL EQUIPMENT MAINTAINER                               |797.63             |
|SCHOOL-NEIGHBORHOOD WORKER                                |797.63             |
|SENIOR SCHOOL-NEIGHBORHOOD 

17.  2021 онд хамгийн олон илүү цагаар ажилладаг 10 агентлаг юу вэ?

In [None]:
from pyspark.sql.functions import col, expr, percentile_approx, sum

nyc = nyc.withColumn("Total OT Paid", col("Total OT Paid").cast("double"))

overtime_by_agency = nyc.groupBy("Agency Name").agg(
    sum("Total OT Paid").alias("Total Overtime Paid")
)

top_overtime_agencies = overtime_by_agency.orderBy(col("Total Overtime Paid").desc())
top_10_overtime_agencies = top_overtime_agencies.limit(10)

top_10_overtime_agencies.show(truncate=False)


+------------------------------+-------------------+
|Agency Name                   |Total Overtime Paid|
+------------------------------+-------------------+
|POLICE DEPARTMENT             |2268962.629999985  |
|DEPT OF PARKS & RECREATION    |1049075.5500000017 |
|DEPARTMENT OF EDUCATION ADMIN |891089.8299999997  |
|DEPARTMENT OF CORRECTION      |691681.0000000002  |
|NYC HOUSING AUTHORITY         |658904.6399999994  |
|ADMIN FOR CHILDREN'S SVCS     |496740.7900000002  |
|FIRE DEPARTMENT               |461717.9500000002  |
|HRA/DEPT OF SOCIAL SERVICES   |453219.71000000025 |
|DEPT OF HEALTH/MENTAL HYGIENE |428375.4799999994  |
|DEPT OF ENVIRONMENT PROTECTION|307409.61          |
+------------------------------+-------------------+

