In [58]:
import sys; 
sys.path.insert(0, '..')

In [59]:
import findspark
findspark.init()

In [60]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Nyc-Jobs-data-exploration") \
    .config("spark.sql.shuffle.partitions", 200) \
    .config("spark.sql.parquet.mergeSchema", "true") \
    .getOrCreate()

In [61]:
import json
from pyspark.sql.types import StructType
from math import floor

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

### Read data

In [62]:
"""
Input dataframe schema is defined in JSON file 
"""

with open("/dataset/input_schema/nyc_jobs.json") as schema_file:
    schema = schema_file.read()

nyc_jobs_json_schema = StructType.fromJson(json.loads(schema))

In [90]:
nyc_jobs_df = spark.read.schema(nyc_jobs_json_schema).\
        option("quote", "\"").\
        option("escape", "\"").\
        option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS").\
        csv("/dataset/nyc-jobs.csv", header=True)

nyc_jobs_df.printSchema()
print(nyc_jobs_df.columns)

root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- num_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- ft_pt_indicator: string (nullable = true)
 |-- salary_range_from: double (nullable = true)
 |-- salary_range_to: double (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division_work_unit: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- min_qual_requirements: string (nullable = true)
 |-- preferred_skills: string (nullable = true)
 |-- additiona_information: string (nullable = true)
 |-- to_apply: string (nullable = true)
 |-- hours_shift: string (nullable = true)
 |-- work_location_1: string (nu

In [65]:
def calculate_mod(df: DataFrame, column: str):
    grouped_data = df.groupBy(column).agg(sf.count(column).alias("count"))
    sorted_grouped_data = grouped_data.sort(grouped_data["count"].desc())
    return sorted_grouped_data.first()[column]
    
def calculate_median(df: DataFrame, column: str):
    num_rows = df.count()
    median_index = floor(num_rows / 2)
    return df.sort(column).take(median_index + 1)[-1][column]

### Numerical Data Exploration

In [92]:
# Data Exploration
# Provide a detailed analysis of source data: Column values


analytical_columns = ['num_of_positions', 'salary_range_from', 'salary_range_to']

try:
    avg_df = nyc_jobs_df.agg(*[F.avg(F.col(c)).alias('avg_{}'.format(c)) for c in analytical_columns])
    min_df = nyc_jobs_df.agg(*[F.min(F.col(c)).alias('min_{}'.format(c)) for c in analytical_columns])
    max_df = nyc_jobs_df.agg(*[F.max(F.col(c)).alias('max_{}'.format(c)) for c in analytical_columns])
    
    
    mode_dict = dict()
    median_dict = dict()

    # Calculate mode and median
    mode_dict = {col: calculate_mod(nyc_jobs_df, col) for col in analytical_columns}
    median_dict = {col: calculate_median(nyc_jobs_df, col) for col in analytical_columns}

    # Convert mode and median dictionaries to DataFrames
    mod_df = spark.createDataFrame([mode_dict])
    median_df = spark.createDataFrame([median_dict])


        
except Exception as e:
    print(f"Error calculating min: {e}")

In [93]:
print("Minimum values for the selected numerical columns ->")
min_df.show()

print("Maximum values for the selected numerical columns ->")
max_df.show()

print("Average values for the selected numerical columns ->")
avg_df.show()

print("Mod values for the selected numerical columns ->")
mod_df.show()

print("Median values for the selected numerical columns ->")
median_df.show()

Minimum values for the selected numerical columns ->
+--------------------+---------------------+-------------------+
|min_num_of_positions|min_salary_range_from|min_salary_range_to|
+--------------------+---------------------+-------------------+
|                   1|                  0.0|              10.36|
+--------------------+---------------------+-------------------+

Maximum values for the selected numerical columns ->
+--------------------+---------------------+-------------------+
|max_num_of_positions|max_salary_range_from|max_salary_range_to|
+--------------------+---------------------+-------------------+
|                 200|             218587.0|           234402.0|
+--------------------+---------------------+-------------------+

Average values for the selected numerical columns ->
+--------------------+---------------------+-------------------+
|avg_num_of_positions|avg_salary_range_from|avg_salary_range_to|
+--------------------+---------------------+---------------

### String Data Exploration

In [101]:
"""
For each string column, get the distinct values count
"""

string_columns_frequency = dict()

for i, column in enumerate(nyc_jobs_df.columns):
    if nyc_jobs_df.dtypes[i][1] == "string":
        unique_count = nyc_jobs_df.select(column).distinct().count()
        string_columns_frequency[column] = unique_count


print(string_columns_frequency)

{'agency': 52, 'posting_type': 2, 'business_title': 1244, 'civil_service_title': 312, 'title_code_no': 323, 'level': 14, 'job_category': 131, 'ft_pt_indicator': 3, 'salary_frequency': 3, 'work_location': 226, 'division_work_unit': 678, 'job_description': 1608, 'min_qual_requirements': 337, 'preferred_skills': 1283, 'additiona_information': 682, 'to_apply': 894, 'hours_shift': 182, 'work_location_1': 228, 'recruitment_contact': 1, 'residency_requirement': 51}


In [115]:
"""
For each string column, get the Most Common value and its count
"""

for i, column in enumerate(nyc_jobs_df.columns):
    if nyc_jobs_df.dtypes[i][1] == "string":
        print("Column -> {}, Most Common Value:".format(column))
        df = nyc_jobs_df.groupBy(column).agg(F.count(column).alias("count")).orderBy(F.desc("count")).limit(1)
        df.show()
    

Column -> agency, Most Common Value:
+--------------------+-----+
|              agency|count|
+--------------------+-----+
|DEPT OF ENVIRONME...|  655|
+--------------------+-----+

Column -> posting_type, Most Common Value:
+------------+-----+
|posting_type|count|
+------------+-----+
|    Internal| 1684|
+------------+-----+

Column -> business_title, Most Common Value:
+--------------------+-----+
|      business_title|count|
+--------------------+-----+
|Assistant Civil E...|   33|
+--------------------+-----+

Column -> civil_service_title, Most Common Value:
+--------------------+-----+
| civil_service_title|count|
+--------------------+-----+
|COMMUNITY COORDIN...|  182|
+--------------------+-----+

Column -> title_code_no, Most Common Value:
+-------------+-----+
|title_code_no|count|
+-------------+-----+
|        56058|  182|
+-------------+-----+

Column -> level, Most Common Value:
+-----+-----+
|level|count|
+-----+-----+
|    0| 1112|
+-----+-----+

Column -> job_categ

### Example of test function