In [130]:
pip install pyspark







Note: you may need to restart the kernel to use updated packages.


In [131]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType
from pyspark.sql.functions import col, udf, concat_ws, size
import pandas as pd
import re
import spacy

In [132]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [133]:
def read_json_data_with_schema(spark, file_path, schema):
    """
    Read JSON data into a DataFrame using the defined schema.
    
    Args:
    - spark: SparkSession object
    - file_path: Path to the JSON file
    - schema: StructType schema for the JSON data
    
    Returns:
    - DataFrame containing the JSON data with the defined schema
    """
    df = spark.read.json(file_path, schema=schema)
    return df

def flatten_dataframe(df):
    """
    Flatten the DataFrame schema by selecting specific columns and renaming them.
    
    Args:
    - df: Input DataFrame
    
    Returns:
    - Flattened DataFrame
    """
    df_flat = df.select(
        col("_id.$oid").alias("id"),
        col("sourceCC"),
        col("source"),
        col("idInSource"),
        col("locationID.$oid").alias("locationID"),
        col("companyID.$oid").alias("companyID"),
        col("text"),
        col("html"),
        col("json.schemaOrg.@context").alias("json_schemaOrg_context"),
        col("json.schemaOrg.@type").alias("json_schemaOrg_type"),
        col("json.schemaOrg.title").alias("json_schemaOrg_title"),
        col("json.schemaOrg.description").alias("json_schemaOrg_description"),
        col("json.schemaOrg.employmentType").alias("json_schemaOrg_employmentType"),
        col("json.schemaOrg.datePosted").alias("json_schemaOrg_datePosted"),
        col("json.schemaOrg.hiringOrganization.@type").alias("json_schemaOrg_hiringOrganization_type"),
        col("json.schemaOrg.hiringOrganization.name").alias("json_schemaOrg_hiringOrganization_name"),
        col("json.schemaOrg.hiringOrganization.logo").alias("json_schemaOrg_hiringOrganization_logo"),
        col("json.schemaOrg.jobLocation.@type").alias("json_schemaOrg_jobLocation_type"),
        col("json.schemaOrg.jobLocation.address.@type").alias("json_schemaOrg_jobLocation_address_type"),
        col("json.schemaOrg.jobLocation.address.addressLocality").alias("json_schemaOrg_jobLocation_address_addressLocality"),
        col("json.schemaOrg.jobLocation.address.addressRegion").alias("json_schemaOrg_jobLocation_address_addressRegion"),
        col("json.schemaOrg.jobLocation.address.addressCountry").alias("json_schemaOrg_jobLocation_address_addressCountry"),
        col("locale"),
        col("position.name").alias("position_name"),
        col("position.workType").alias("position_workType"),
        col("position.department").alias("position_department"),
        col("position.careerLevel").alias("position_careerLevel"),
        col("orgAddress.companyName").alias("orgAddress_companyName"),
        col("orgAddress.addressLine").alias("orgAddress_addressLine"),
        col("orgAddress.formatted").alias("orgAddress_formatted"),
        col("orgAddress.level").alias("orgAddress_level"),
        col("orgAddress.countryCode").alias("orgAddress_countryCode"),
        col("orgAddress.country").alias("orgAddress_country"),
        col("orgAddress.state").alias("orgAddress_state"),
        col("orgAddress.county").alias("orgAddress_county"),
        col("orgAddress.city").alias("orgAddress_city"),
        col("orgAddress.district").alias("orgAddress_district"),
        col("orgAddress.quarter").alias("orgAddress_quarter"),
        col("orgAddress.houseNumber").alias("orgAddress_houseNumber"),
        col("orgCompany.sourceCC").alias("orgCompany_sourceCC"),
        col("orgCompany.source").alias("orgCompany_source"),
        col("orgCompany.idInSource").alias("orgCompany_idInSource"),
        col("orgCompany.mergedID").alias("orgCompany_mergedID"),
        col("orgCompany.nameOrg").alias("orgCompany_nameOrg"),
        col("orgCompany.description").alias("orgCompany_description"),
        col("orgCompany.registryID").alias("orgCompany_registryID"),
        col("orgCompany.urls.aarp_us").alias("orgCompany_aarp_us"),
        col("orgCompany.urls.homepage").alias("orgCompany_homepage"),
        col("orgCompany.imgLogo").alias("orgCompany_imgLogo"),
        col("orgCompany.imgCover").alias("orgCompany_imgCover"),
        col("orgCompany.name").alias("orgCompany_name"),
        col("orgCompany.url").alias("orgCompany_url"),
        col("name"),
        col("url"),
        col("dateScraped.$date").alias("dateScraped"),
        col("dateMerged.$date").alias("dateMerged"),
        col("dateUploaded.$date").alias("dateUploaded"),
        col("dateCreated.$date").alias("dateCreated"),
        col("orgTags.CATEGORIES").alias("orgTags_CATEGORIES"),
        col("orgTags.REQUIREMENTS").alias("orgTags_REQUIREMENTS"),
        col("orgTags.SKILLS").alias("orgTags_SKILLS"),
        col("salary.text").alias("salary_text")
    )
    return df_flat

def write_df_to_csv(df, file_path):
    """
    Write a Spark DataFrame to a CSV file at the specified path.
    
    Args:
    - df: Spark DataFrame
    - file_path: Path where the CSV file will be saved
    
    Returns:
    - None
    """
    # Convert the Spark DataFrame to Pandas DataFrame
    pandas_df = df.toPandas()
    
    # Write the Pandas DataFrame to a CSV file
    pandas_df.to_csv(file_path, index=False)
    
    # Print a message to confirm that the CSV file has been written
    print(f"DataFrame written to '{file_path}'")

    
def keep_columns(df, columns_to_keep):
    """
    Keep only specified columns in the DataFrame and drop the rest.

    Parameters:
    - df: Input DataFrame
    - columns_to_keep: List of column names to keep

    Returns:
    - DataFrame with only specified columns
    """
    df_kept = df.select(*columns_to_keep)
    return df_kept

In [134]:
# Define the schema for the JSON data
schema = StructType([
    StructField("_id", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("sourceCC", StringType(), True),
    StructField("source", StringType(), True),
    StructField("idInSource", StringType(), True),
    StructField("locationID", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("companyID", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("text", StringType(), True),
    StructField("html", StringType(), True),
    StructField("json", StructType([
        StructField("schemaOrg", StructType([
            StructField("@context", StringType(), True),
            StructField("@type", StringType(), True),
            StructField("title", StringType(), True),
            StructField("description", StringType(), True),
            StructField("employmentType", StringType(), True),
            StructField("datePosted", TimestampType(), True),
            StructField("hiringOrganization", StructType([
                StructField("@type", StringType(), True),
                StructField("name", StringType(), True),
                StructField("logo", StringType(), True)
            ]), True),
            StructField("jobLocation", StructType([
                StructField("@type", StringType(), True),
                StructField("address", StructType([
                    StructField("@type", StringType(), True),
                    StructField("addressLocality", StringType(), True),
                    StructField("addressRegion", StringType(), True),
                    StructField("addressCountry", StringType(), True)
                ]), True)
            ]), True)
        ]), True)
    ]), True),
    StructField("locale", StringType(), True),
    StructField("position", StructType([
        StructField("name", StringType(), True),
        StructField("workType", StringType(), True),
        StructField("department", StringType(), True),
        StructField("careerLevel", StringType(), True)
    ]), True),
    StructField("orgAddress", StructType([
        StructField("companyName", StringType(), True),
        StructField("addressLine", StringType(), True),
        StructField("formatted", StringType(), True),
        StructField("level", StringType(), True),
        StructField("countryCode", StringType(), True),
        StructField("country", StringType(), True),
        StructField("state", StringType(), True),
        StructField("county", StringType(), True),
        StructField("city", StringType(), True),
        StructField("district", StringType(), True),
        StructField("quarter", StringType(), True),
        StructField("houseNumber", StringType(), True)
    ]), True),
    StructField("orgCompany", StructType([
        StructField("sourceCC", StringType(), True),
        StructField("source", StringType(), True),
        StructField("idInSource", StringType(), True),
        StructField("mergedID", StringType(), True),
        StructField("nameOrg", StringType(), True),
        StructField("description", StringType(), True),
        StructField("registryID", StringType(), True),
        StructField("urls", StructType([
            StructField("aarp_us", StringType(), True),
            StructField("homepage", StringType(), True)
        ]), True),
        StructField("ids", StructType([
            StructField("aarp_us", StringType(), True)
        ]), True),
        StructField("imgLogo", StringType(), True),
        StructField("imgCover", StringType(), True),
        StructField("name", StringType(), True),
        StructField("url", StringType(), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("url", StringType(), True),
    StructField("dateScraped", StructType([
        StructField("$date", TimestampType(), True)
    ]), True),
    StructField("dateMerged", StructType([
        StructField("$date", TimestampType(), True)
    ]), True),
    StructField("dateUploaded", StructType([
        StructField("$date", TimestampType(), True)
    ]), True),
    StructField("dateCreated", StructType([
        StructField("$date", TimestampType(), True)
    ]), True),
    StructField("orgTags", StructType([
        StructField("CATEGORIES", ArrayType(StringType()), True),
        StructField("REQUIREMENTS", ArrayType(StringType()), True),
        StructField("SKILLS", ArrayType(StringType()), True),
    ]), True),
    StructField("salary", StructType([
        StructField("text", StringType(), True)
    ]), True)
])

df = read_json_data_with_schema(spark, "/kaggle/input/international-job-postings-september-2021/techmap-jobs-dump-2021-09.json", schema)

df_flat = flatten_dataframe(df)

# Keep only the specified columns
columns_to_keep = [
    "id",
    "sourceCC",
    "source",
    "text",
    "html",
    "locale",
    "position_name",
    "position_workType",
    "position_careerLevel",
    "position_department",
    "orgAddress_addressLine",
    "orgAddress_level",
    "orgAddress_country",
    "orgAddress_state",
    "orgAddress_city",
    "orgCompany_nameOrg",
    "orgCompany_homepage",
    "url",
    "orgTags_CATEGORIES",
    "orgTags_REQUIREMENTS",
    "orgTags_SKILLS",
    "salary_text"
]
df_kept = keep_columns(df_flat, columns_to_keep)

# Filter rows based on the 'source' column
sources_to_keep = ["careerbuilder_us", "dice_us", "simplyhired_us"]
df_filtered = df_kept.filter(col("source").isin(sources_to_keep))

In [135]:
def process_salary_text(salary_text):
    if salary_text is None:
        return "NOT FOUND"
    
    salary_text_lower = salary_text.lower()
    if "," in salary_text_lower:
        salary_text_lower = salary_text_lower.replace(",", "")
    if "k" in salary_text_lower:
        salary_text_lower = salary_text_lower.replace("k", "000")
    if "(k)" in salary_text_lower:
        salary_text_lower = salary_text_lower.replace("(k)", "000")
    
    get_numbers = re.findall(r'\d+', salary_text_lower)
    if len(get_numbers) == 0:
        return "NOT FOUND"
    
    if 'hour' in salary_text_lower or 'stunde' in salary_text_lower or 'per hour' in salary_text_lower or 'hourly' in salary_text_lower :
        if len(get_numbers[0]) > 2:
            temp = int(get_numbers[0][0:2]) * 8 * 20
        else:
            temp = int(get_numbers[0]) * 8 * 20
    elif 'monthly' in salary_text_lower or 'monatlich' in salary_text_lower or 'per month' in salary_text_lower or 'monat' in salary_text_lower or 'month' in salary_text_lower:
        temp = int(get_numbers[0])
    elif 'yearly' in salary_text_lower or 'jährlich' in salary_text_lower or 'per year' in salary_text_lower or 'jahr' in salary_text_lower or 'year' in salary_text_lower or 'annum' in salary_text_lower or 'annually' in salary_text_lower or 'annual':
        if len(get_numbers[0]) <= 4:
            temp = (int(get_numbers[0]) * 1000) / 12
        else:
            temp = int(get_numbers[0]) / 12
    elif 'weekly' in salary_text_lower or 'wöchentlich' in salary_text_lower or 'per week' in salary_text_lower or 'woche' in salary_text_lower or 'week' in salary_text_lower:
        temp = int(get_numbers[0]) * 4
    elif "biweekly" in salary_text_lower or "bi-weekly" in salary_text_lower or "every two weeks" in salary_text_lower or "every 2 weeks" in salary_text_lower or "every other week" in salary_text_lower or "every 14 days" in salary_text_lower:
        temp = int(get_numbers[0]) * 2
    else:
        return f"{salary_text} NOT CHANGED"
    
    return str(temp) 
    

# Create a UDF from the process_salary_text function
process_salary_text_udf = udf(process_salary_text, StringType())

# Apply the UDF to the DataFrame column 'salary_text' to create a new column 'processed_salary_text'
df_filtered = df_filtered.withColumn('salary', process_salary_text_udf(col('salary_text')))


In [136]:
# # Count the non-empty rows in 'position_careerLevel' using isNotNull()
# non_empty_count = df_kept.filter(col("position_careerLevel").isNotNull()).count()

# print("Count of non-empty rows in 'position_careerLevel':", non_empty_count)
# # output: Count of non-empty rows in 'position_careerLevel': 4806

# # Get all unique values from the "position_workType" column
# unique_names = df_kept.select("position_workType").distinct()

# # Convert the result to a list
# unique_names_list = [row.position_workType for row in unique_names.collect()]

# print(unique_names_list)
# # output ['fulltime, permanent', 'fulltime, parttime', 'intern', 'fulltime - eligible for benefits', 'contractor', 'other', '[parttime]', 'temporary, parttime, fulltime', 'temporary', 'contract', 'seasonal', 'vollzeit', '[fulltime, parttime]', 'temp/casual', '[fulltime]', 'parttime', 'fulltime, contractor', 'fulltime', 'fulltime benefit eligible', 'parttime, fulltime', 'fulltime, contract', 'unavailable', 'permanent, parttime', 'temporary, contractor', 'parttime, permanent', 'temp-to-hire', 'freelance', 'temporary, fulltime', 'fulltime, contractor, temporary', 'temporary, parttime, fulltime, contract, internship', '[gig]', 'prn', 'parttime, freelance', 'per diem', 'apprenticeship', '[seasonal_fulltime]', 'parttime, temporary', 'prn, per diem', 'parttime, fulltime, contract', 'parttime, contract', 'internship', 'fulltime, internship', 'seasonal, fulltime', 'temporary, contract', 'parttime, internship', 'non-tenure', 'temporary, fulltime, contract, internship', 'parttime, prn', 'temporary, parttime', 'fulltime, contract, internship', 'temporary, parttime, seasonal, fulltime', 'temporary, seasonal, fulltime', 'temporary, parttime, contract', 'parttime, fulltime, internship', 'parttime, fulltime, prn', 'contractor, temporary, jobtypecontract', 'parttime, seasonal', 'temporary, parttime, seasonal', 'temporary, seasonal', 'per_diem', 'volunteer', 'temporary, fulltime, contract, per diem', 'parttime, per diem', 'fulltime, non-tenure', 'temporary, fulltime, contract', 'temporary, parttime, seasonal, fulltime, contract', 'temporary, parttime, contract, freelance', 'travel nursing', 'parttime, contract, internship', 'parttime, fulltime, contract, internship', 'temporary, travel nursing, fulltime, contract', 'temporary, parttime, fulltime, internship', 'per_diem, contractor, temporary', 'temporary, parttime, internship', 'permanent, jobtypeemployee', 'parttime, contract, per diem', 'temporary, fulltime, contractor', 'temporary, permanent, contract', 'contract, internship', 'permanent, parttime, contract', 'parttime, prn, per diem', 'parttime, seasonal, contract', None]

# # Get all unique values from the "position_careerLevel" column
# unique_names = df_kept.select("position_careerLevel").distinct()

# # Convert the result to a list
# unique_names_list = [row.position_careerLevel for row in unique_names.collect()]

# print(unique_names_list)
# # output: 
# # ----- 120 months of experience
# # ----- QUALIFICATION STANDARDS: (These qualifications meet or exceed NH, MA, CT and RI State specific requirements.) 
# # a. Knowledge, Skills, and Abilities: Ability to use computer for documenting services provided. Shall be fluent in the ability to communicate in the English language, both oral and written. 
# # b. Education: Must have completed the course for certified nursing assistant or have equivalent training or experience as required by State Regulation, and CPR and First Aide training as mandated by the State employed. 
# # c. Experience: Prior experience providing direct care or the equivalent in training preferred. Experience in caring for seniors with Alzheimers disease or other forms of Dementia is preferred. 
# # d. Certificates/Licenses: Current license/certification as required by State Regulation and provide copies of the documentation to the Resident Care Director or designee. 
# # e. Computer Skills: Demonstrates computer literacy and familiarity with office software, including but not limited to word processing or demonstrate the ability to learn data input.
# # ----- Mit Berufserfahrung
# # ----- Leader Qualifications: Experience working with youth or young adults (15-25), teaching or environmental education a plus Experience with conservation work skills or related skills preferred i.e., trail maintenance, trail construction, habitat restoration, chainsaw, carpentry, landscaping, and gardening Ability to perform manual, physical labor for up to 8 hours per day, exposed to the elements, and must occasionally lift and/or move 40 pounds or more Must be a minimum of 21 years of age Must have the ability to legally work in the US Must have a valid drivers license for 3 years and MVR that meets SCA standards Must be able to meet SCAs criminal background check standards Must have or secure housing in the program city Leader Benefits and Compensation: $800 weekly stipend Health, Dental, Vision, Retirement, PTO Leaders may receive the following training: Mental Health First Aid Trail Skill Training with Penn Trails (Independent Trail Building Organization) Carpentry Refresher Training Project Management Training Wilderness First Aid (can be provided if needed) Game of Logging Levels I and II (if not currently saw trained) Conservation Work Skills Refresher (4 Days) Leadership1957 Interested candidates should send a brief statement of interest and a current resume to young Adult Program Manager Stephen Luteran: sluteran@thesca.org https://www.thesca.org/program/young-adult/corps/pennsylvania-outdoor-corps/ SCA is an EOE dedicated to workforce diversity. For more information about SCA, visit us at www.theSCA.org
# # ----- Controller Requirements: Position requires a bachelors degree in accounting, higher degree preferred 2 years public accounting experience preferred and at least 2 years demonstrating competent accounting experience and Excellent oral and written communication skills required Strong organization skills and attention to detail required Proficient with Microsoft Office Suite and expert in Excel, Access or VBA is a plus Operating knowledge of ERP systems, NetSuite preferred. Experience with GAAP financial statement preparation and review required General journal entry preparation and entry experience with automation skills preferred In depth understanding of accrual and cash-based accounting Motivated, ambitious and ability to work independently required Ability to learn tasks quickly and often with minimal instruction required Controller Responsibilities: Corporate Accounting _ Prepare, examine, and analyze accounting records, financial statements, or other financial reports to assess accuracy, completeness, and conformance to reporting and procedural standards Daily operations of the accounting department, include overseeing all aspects of A/P, A/R, payroll, invoicing Maintain and analyze budgets, preparing periodic reports that compare budgeted to actual costs Treasury management, including bank reconciliations and cash flow management Demonstrate and maintain ability to effectively interact with all levels throughout the organization Facilitate the closing of all general ledger accounts in a timely manner Enter journal entries for multiple entities Review and code invoices Forecast month end Net Capital utilizing budgets and trend analysis Sales and property tax filings Reporting Work closely with the owners and CFO with respect to internal and external reporting Demonstrate and apply understanding of accounting systems and reporting to facilitate sufficient reports Correctly prepare financial statements in a timely manner, including consolidation report of all entities Prepare monthly financial statement packages for the principals As required, prepare, and review ad hoc schedules and reports, responding to the owners inquiries Respond in a timely manner to auditor, banking, bonding, insurance, and vendor requests and maintain effective interactions with all Job costing, reporting, and allocations Additional Duties and Supporting CFO: Controllership duties - These make up the backward-looking part of a Controllers job. Controllership duties hold the Controller responsible for presenting and reporting accurate and timely historical financial information of the company he or she works for. Every stakeholder in the company - including shareholders, analysts, creditors, employees and other members of management - relies on the accuracy and timeliness of this information. It is imperative that the information reported by the Controller is accurate, because many decisions are based on it. Treasury duties The Controller is also responsible for the helping CFO with company's present financial condition, so he or she may help decide how to invest the company's money, taking into consideration riskand liquidity. In addition, the Controller helps oversees the capital structureof the company, determining the best mix of debt, equityand internal financing. Addressing the issues surrounding capital structureis one of the most important duties of a Controller. Economic strategy and forecasting- Not only is a Controller responsible for a company's past and present financial situation, he or she is also an integral part of a company's financial future. A Controller must be able to identify and report what areas of a company are most efficient and how the company can capitalize on this information. Team Role: Job would include managing a small team (1-2 people) in all financial aspects of multiple companies which would include; check processing, vendor management, month end processing, account reconciliations, weekly production reports, monthly budgeting/reporting, overseeing accounting procedures and other office staff, helping process payroll, general ledger, income statements, invoices/accounts receivable and other HR items and reporting monthly to ownership. Specific Role: Would include managing and operating budgets for multiple companies and investments, diagnose and evaluate weaknesses in our companies financial (and operational) structure, providing risk management, plan and prepare for long term growth and investments, continue to help build on company strengths in the marketplace, as well as evaluating profitability and creating new ways to increase profit and decrease costs.
# # ----- FULL_TIME
# # ----- 72 months of experience
# # ----- CHIEF
# # ----- 96 months of experience
# # ----- JUNIOR
# # ----- STUDENT
# # ----- 6 months of experience
# # ----- LEAD
# # ----- 48 months of experience
# # ----- EDUCATION & TRAINING: 
# # High School Diploma/ GED EXPERIENCE: 
# # Preferred- 2 years of hospital registration LICENSES & CERTIFICATION: 
# # None
# # ----- 24 months of experience
# # ----- 3 months of experience
# # ----- 36 months of experience
# # ----- 84 months of experience
# # ----- REQUIRED EDUCATION AND/OR TRAINING: High school diploma or general education degree (GED), or one to three months related experience and/or training, or equivalent combination of education and experience preferred. PHYSICAL DEMANDS AND WORK ENVIORNMENT: Standing Continually required to stand Walking Continually required to walk Sitting - Occasionally required to sit Travelling Occasionally required to travel Finger Dexterity - Continually required to utilize hand and finger dexterity Climb, Bend, Balance, Stoop, Kneel or Crawl - Frequently required to climb, balance, bend, stoop, kneel or crawl Talking/Hearing - Continually required to talk or hear Visual Accuity - Continually utilize visual acuity to operate equipment, read technical information, and/or use a keyboard Lifting/Pushing/Carrying Occasionally required to lift more than 50 lbs. at a time with frequent lifting, pushing, or carrying of up to 30 lbs. EEO Statement: Petland is an equal opportunity employer. All applicants will be considered for employment without attention to race, color, religion, sex, pregnancy, national origin, age, mental or physical disabilities, military or veteran status, sexual orientation, or gender identity status. The above is intended to describe the general content of and requirements for the performance of this job. It is not to be construed as an exhaustive statement of duties, responsibilities, or physical requirements. Nothing in this job description restricts managements right to assign or reassign duties and responsibilities to this job at any time. Reasonable accommodations may be made to enable individuals with disabilities to perform the essential functions.Those applicants requiring reasonable accommodation to the application and/or interview process should notify Petlands Department of Human Resources.
# # ----- 144 months of experience
# # ----- Education/Experience: 0-6 months of previous experience preferred Ability to work on single task / procedures / products. Highly repetitive or routine duties, Ability to support 1-2 product lines / customer(s) Normally receives detailed instructions on work Works under close supervision Computer proficiency required Physical Requirements: Perform general physical activities Must be able to stand for up to 8 hours Monitor processes, materials and surroundings Spend time making repetitive hand motion Mechanical aptitude/dexterity and the ability to lift up to 35 lbs Benefits Offered: Comprehensive benefit package including medical, dental and vision coverage; company-paid basic life/AD&D insurance, short-term and long-term disability insurance; voluntary supplemental insurances, flexible spending accounts and employee assistance program (EAP). Sick Leave, Vacation Time, and company-paid Holidays are also provided as paid time off. NEOTech also provides a 401(k) Retirement Savings Plan option with a company match. NEOTech is an Equal Opportunity/Affirmative Action employer. All qualified applicants will receive consideration for employment without regard to race, color, religion, sex including sexual orientation and gender identity, national origin, disability, protected Veteran status, or any other characteristic protected by applicable federal, state, or local law. NEOTech has a long-standing commitment to maintaining a safe, quality-oriented and productive work environment. We also want all employees to perform their duties safely and efficiently, in a manner that protects their interests and those of their co-workers. We recognize that alcohol and drug abuse pose a threat to the health and safety of NEOTech employees and to the security of the Company’s equipment and facilities. For these reasons, NEOTech is committed to the elimination of drug and alcohol use and abuse in the workplace. Candidates being considered for hire must pass a pre-employment background check and drug test which include screening for illegal drugs and marijuana.
# # ----- 12 months of experience
# # ----- 60 months of experience
# # ----- SENIOR
# # ----- FULL_TIME, PART_TIME
# # ----- 180 months of experience
# # ----- 1 months of experience
# # ----- 240 months of experience
# # ----- PART_TIME
# # ----- 108 months of experience
# # ----- High School or equivalent
# # ----- Bachelors Degree
# # ----- None

In [137]:
def extract_experience(text, position_careerLevel):
    if text is None and position_careerLevel is None:
        return 'NOT FOUND'  
    
    text_concat = f"{text} {position_careerLevel}" if text else position_careerLevel
    
    # Define regular expression patterns for years and months with "experience"
    pattern_years = r'(\d+|\d+\+?)\s*(?:year|yr|yrs|years)\s*(?:of)?\s*(?:experience|minimum experience|required experience|desired experience)?'
    pattern_months = r'(\d+|\d+\+?)\s*(?:month|mon|months)\s*(?:of)?\s*(?:experience|minimum experience|required experience|desired experience)?'
    
    # Find all matches for years and months
    matches_years = re.findall(pattern_years, text_concat, re.IGNORECASE)
    matches_months = re.findall(pattern_months, text_concat, re.IGNORECASE)
    
    # Extract numerical values from the matches
    years_list = [int(''.join(filter(str.isdigit, match))) for match in matches_years]
    months_list = [int(''.join(filter(str.isdigit, match))) for match in matches_months]
    
    # Get the smallest value for years and months
    years = min(years_list) if years_list else None
    months = min(months_list) if months_list else None
    
    if years is not None:
        return f"{years} years"
    elif months is not None:
        return f"{months} months"
    else:
        return 'NOT FOUND'

# Create a UDF (User Defined Function) from the extract_experience function
extract_experience_udf = udf(extract_experience, StringType())

# Apply the UDF to the DataFrame columns 'text' and 'position_careerLevel' to extract years or months of experience
df_filtered = df_filtered.withColumn('years_or_months_experience', extract_experience_udf(col('text'), col('position_careerLevel')))

In [138]:

# Define the function to replace values with exact matches
def replace_career_level(value, position_name, position_workType):
    
    text = f"{value} {position_name} {position_workType}"
    text_lower = text.lower()
    target_words = ['junior', 'senior', 'student', 'chief', 'lead', 'intern']
    
    matches = [word.upper() for word in target_words if re.search(r'\b' + re.escape(word) + r'\b', text_lower)]
    if matches:
        return matches[0].upper()
    else:
        return 'NOT FOUND'

# Create a UDF (User Defined Function) from the replace_career_level function
replace_udf = udf(replace_career_level, StringType())

df_filtered = df_filtered.withColumn('position_careerLevel', replace_udf('position_careerLevel', col('position_name'), col('position_workType')))

In [139]:

# Define the function to replace values in position_workType based on keywords across all rows
def replace_work_type(workType, text):
    workType_lower = str(workType).lower()
    text_lower = str(text).lower()
    
    # Check if workType is "other" or empty
    if workType is None or str(workType).lower() in ['other', '', None]:
        if any(word in text_lower for word in ['full time', 'full-time', 'full_time', 'fulltime']):
            return 'FULL TIME'
        elif any(word in text_lower for word in ['part time', 'part-time', 'part_time', 'parttime']):
            return 'PART TIME'
        elif any(word in text_lower for word in ['work from home', 'remote', 'remotly']):
            return 'REMOTE'
    else:
        return workType_lower.upper()

    return 'NOT FOUND'

# Create a UDF (User Defined Function) from the replace_work_type function
replace_work_type_udf = udf(replace_work_type, StringType())

df_filtered = df_filtered.withColumn('position_workType', replace_work_type_udf(col('position_workType'), col('text')))

In [140]:
# # Load the spaCy English model
# nlp = spacy.load("en_core_web_sm")

# # Define the UDF to extract required skills from job descriptions
# def get_required_skills_udf(job_description):
#     # Process the job description using spaCy
#     doc = nlp(job_description)
    
#     # Initialize a list to store verb-noun pairs
#     list_of_verbs_nouns = []
    
#     i = 0
#     while i < len(doc):
#         if doc[i].pos_ == 'VERB':
#             temp = [doc[i]]
#             i += 1
#             while i < len(doc) and doc[i].pos_ == 'NOUN':
#                 temp.append(doc[i])
#                 i += 1
#             list_of_verbs_nouns.append(temp)
#         else:
#             i += 1
    
#     # Filter out verb-noun pairs with only one word
#     filtered_list = [pair for pair in list_of_verbs_nouns if len(pair) > 1]
    
#     # Convert the pairs to strings
#     list_of_skills_to_string = [' '.join([token.text for token in pair]) for pair in filtered_list]
    
#     return list_of_skills_to_string

# get_required_skills = udf(get_required_skills_udf, ArrayType(StringType()))

# # Apply the UDF to the DataFrame column 'text' to extract required skills
# df_filtered = df_filtered.withColumn('todo', get_required_skills(col('text')))
# # Convert array column 'todo' to a string column separated by commas
# df_filtered = df_filtered.withColumn('todo', concat_ws(',', 'todo'))


In [141]:
# Replace null values with "NOT FOUND" in the DataFrame
df_filtered = df_filtered.fillna("NOT FOUND")
# Filter rows where salary is not "NOT FOUND"
df_filtered = df_filtered.filter(col("salary") != "NOT FOUND")

# csv_path = "final_output_processed.csv"

# write_df_to_csv(df_filtered, csv_path)
# # Show the first 100 rows in the "salary_text" column
# df_filtered.select("salary").show(100, truncate=False)

# # Count the data in the DataFrame
# count = df_filtered.count()
# print("Total count of data:", count)

In [142]:
# Keep only the specified columns
columns_to_keep = [
    "id",
    "sourceCC",
    "source",
    "locale",
    "position_name",
    "position_workType",
    "position_careerLevel",
    "position_department",
    "orgAddress_addressLine",
    "orgAddress_level",
    "orgAddress_country",
    "orgAddress_state",
    "orgAddress_city",
    "orgCompany_nameOrg",
    "orgCompany_homepage",
    "url",
    "orgTags_CATEGORIES",
    "orgTags_REQUIREMENTS",
    "orgTags_SKILLS",
    "salary_text",
    "salary",
    "years_or_months_experience"
]
df_filtered = keep_columns(df_filtered, columns_to_keep)

csv_path = "final_output_processed.csv"

write_df_to_csv(df_filtered, csv_path)

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

DataFrame written to 'final_output_processed.csv'


In [143]:
%cd /kaggle/working/
!ls

/kaggle/working
df_filtered.pickle  final_output_processed.csv


In [144]:
from IPython.display import FileLink
FileLink(r'final_output_processed.csv')