# **Real-Time Data Processing and Analysis Project**

## **Introduction**

In this project, we aim to build a robust system for real-time processing and analysis of structured and unstructured job-related data. The system is designed to handle various data formats, including text, JSON, CSV, Excel, PDF, and images, which are ingested from multiple sources. Using Apache Spark, a powerful distributed data processing framework, we process, clean, and transform the data into actionable insights.

The data processed in this project includes job bulletins, positions, salaries, requirements, and other related fields. By leveraging Spark’s real-time streaming capabilities, this pipeline handles live data streams, ensuring efficient and scalable processing for dynamic datasets. Furthermore, the processed data is stored in cloud-based systems, making it readily available for downstream analytics, visualization, and business decision-making.

---

## **Motivation**

The motivation behind this project lies in the increasing demand for real-time data processing in industries such as recruitment, business intelligence, and automation. Manual data extraction and analysis are time-consuming and prone to errors. With this automated pipeline:
- Businesses can streamline their data workflows, reduce human intervention, and ensure consistency.
- Job applicants and recruiters gain immediate insights into job postings, enabling faster decision-making.
- The system serves as a proof-of-concept for scalable, real-time, multi-format data pipelines.

---

## **How It Works**

This project follows a **modular architecture** that includes the following key steps:

1. **Data Ingestion**:
   - Data from multiple sources (text, JSON, CSV, PDF, Excel, and images) is ingested into the system using a unified framework powered by Spark Streaming and custom extractors.

2. **Data Transformation**:
   - The raw data is processed using User-Defined Functions (UDFs) tailored to extract relevant fields such as position titles, salaries, job requirements, and notes.
   - Data cleaning and validation steps are performed to ensure the quality of the output.

3. **Real-Time Processing**:
   - Apache Spark processes the data streams in real-time, performing transformations and aggregations dynamically.

4. **Cloud Integration**:
   - The processed data is stored in AWS S3 for scalability, accessibility, and future analytics.
   - Parquet is used for storage, ensuring a compact, efficient format.

5. **Output**:
   - The cleaned and structured data is outputted to the console for debugging and stored in the cloud for future use in visualization tools like Power BI or Tableau.

---

## **Project Goals**
- To develop a fully automated data processing pipeline for real-time ingestion and transformation of job-related data.
- To explore and handle a variety of data formats efficiently.
- To demonstrate the use of distributed data processing systems like Apache Spark in solving real-world data challenges.
- To create a scalable solution that can be extended to other use cases and data sources in the future.

## **Potential Use Cases**
1. Recruitment Analytics Platform
* Automate the extraction of structured data from unstructured job postings in various formats (e.g., text, JSON, PDFs).
* Provide insights into salary benchmarks, job demand trends, and required qualifications using dashboards.
* Help recruiters make data-driven decisions for hiring and resource allocation.

2. Resume Parsing for Applicant Tracking Systems (ATS)
* Extract essential candidate information like skills, experience, and education.
* Enable companies to match resumes to job postings automatically, enhancing the recruitment process.

3. Market Research on Job Trends
* Analyze job postings to identify emerging roles, skills, and industries.
* Provide valuable insights for training organizations and career development platforms to align their offerings with market demand.
* Help policymakers understand labor market trends to shape educational and workforce policies.

4. Real-Time Job Alert System
* Process job postings in real-time to send customized alerts to users.
* Allow filtering by criteria such as salary range, location, or job title.
* Improve job-seeking platforms by offering timely and relevant notifications.

5. Job Posting Validation and Quality Control
* Automate the validation of job descriptions to ensure compliance with regulatory standards and organizational guidelines (e.g., salary transparency, inclusive language).
* Enhance the quality and consistency of postings on job boards by identifying incomplete or non-compliant descriptions.

**Disclaimer:** This project was specifically designed and tested for processing JSON and text file formats. While the code contains provisions for handling additional formats such as CSV, PDF, Excel, and images, these functionalities are currently untested and may require further development and validation. Users intending to use this code for untested formats are advised to thoroughly debug and validate the processing steps for these formats to ensure accuracy and reliability.

## **System Architecture:**

![alt text](Architecture.jpg)

### **Import Necessary Modules**:

In [10]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
from pyspark.sql.functions import udf, regexp_replace, input_file_name
from config import configuration  # Configuration file for credentials
from udf_utils import *  # Utility functions for UDFs
from tika import parser  # For processing PDF files
import pytesseract  # For processing image files
from PIL import Image
import pandas as pd  # For reading Excel files

### **Defining Functions:**

In [11]:
def define_udfs():
    """
    This function defines and returns User-Defined Functions (UDFs) for various data extraction tasks.
    """
    return {
        'extract_file_name_udf': udf(extract_file_name, StringType()),
        'extract_position_udf': udf(extract_position, StringType()),
        'extract_salary_udf': udf(extract_salary, StructType([
            StructField('salary_start', DoubleType(), True),
            StructField('salary_end', DoubleType(), True)
        ])),
        'extract_start_date_udf': udf(extract_start_date, DateType()),
        'extract_end_date_udf': udf(extract_end_date, DateType()),
        'extract_classcode_udf': udf(extract_class_code, StringType()),
        'extract_requirements_udf': udf(extract_requirements, StringType()),
        'extract_notes_udf': udf(extract_notes, StringType()),
        'extract_duties_udf': udf(extract_duties, StringType()),
        'extract_selection_udf': udf(extract_selection, StringType()),
        'extract_experience_length_udf': udf(extract_experience_length, StringType()),
        'extract_education_length_udf': udf(extract_education_length, StringType()),
        'extract_application_location_udf': udf(extract_application_location, StringType()),
    }

In [12]:
def process_pdf(file_path):
    """
    Extracts text content from a PDF file.
    """
    raw_text = parser.from_file(file_path)
    return raw_text['content']

In [13]:
def process_image(file_path):
    """
    Extracts text from an image using OCR (Optical Character Recognition).
    """
    image = Image.open(file_path)
    text = pytesseract.image_to_string(image)
    return text

In [14]:
def process_excel(file_path):
    """
    Reads content from an Excel file and converts it to a Spark DataFrame.
    """
    pandas_df = pd.read_excel(file_path)
    spark = SparkSession.builder.getOrCreate()
    spark_df = spark.createDataFrame(pandas_df)
    return spark_df

### **Main Execution:**

In [None]:
if __name__ == "__main__":
    # Initialize Spark Session
    spark = (
        SparkSession.builder.appName('Real-Time Job Data Streaming')
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk:1.11.469')
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .config('spark.hadoop.fs.s3a.access.key', configuration.get('AWS_ACCESS_KEY'))  # AWS Access Key
        .config('spark.hadoop.fs.s3a.secret.key', configuration.get('AWS_SECRET_KEY'))  # AWS Secret Key
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
        .getOrCreate()
    )

    # Define input directories for different data formats
    text_input_dir = 'input_text'
    json_input_dir = 'input_json'
    csv_input_dir = 'input_csv'
    pdf_input_dir = 'input_pdf'
    img_input_dir = 'input_img'
    excel_input_dir = 'input_excel'

    # Define schema for structured data
    data_schema = StructType([
        StructField('file_name', StringType(), True),
        StructField('position', StringType(), True),
        StructField('classcode', StringType(), True),
        StructField('salary_start', DoubleType(), True),
        StructField('salary_end', DoubleType(), True),
        StructField('start_date', DateType(), True),
        StructField('end_date', DateType(), True),
        StructField('req', StringType(), True),
        StructField('notes', StringType(), True),
        StructField('duties', StringType(), True),
        StructField('selection', StringType(), True),
        StructField('experience_length', StringType(), True),
        StructField('job_type', StringType(), True),
        StructField('education_length', StringType(), True),
        StructField('school_type', StringType(), True),
        StructField('application_location', StringType(), True),
    ])

    # Load UDFs
    udfs = define_udfs()

    # Process text input as a stream
    job_bulletins_df = (
        spark.readStream.format('text')
        .option('wholetext', 'true')
        .load(text_input_dir)
    )

    # Apply transformations using UDFs
    job_bulletins_df = (
        job_bulletins_df
        .withColumn('file_name', udfs['extract_file_name_udf']('value'))
        .withColumn('position', udfs['extract_position_udf']('value'))
        .withColumn('salary_start', udfs['extract_salary_udf']('value').getField('salary_start'))
        .withColumn('salary_end', udfs['extract_salary_udf']('value').getField('salary_end'))
        .withColumn('start_date', udfs['extract_start_date_udf']('value'))
        .withColumn('end_date', udfs['extract_end_date_udf']('value'))
        .withColumn('classcode', udfs['extract_classcode_udf']('value'))
        .withColumn('req', udfs['extract_requirements_udf']('value'))
        .withColumn('notes', udfs['extract_notes_udf']('value'))
        .withColumn('duties', udfs['extract_duties_udf']('value'))
        .withColumn('selection', udfs['extract_selection_udf']('value'))
        .withColumn('experience_length', udfs['extract_experience_length_udf']('value'))
        .withColumn('education_length', udfs['extract_education_length_udf']('value'))
        .withColumn('application_location', udfs['extract_application_location_udf']('value'))
    )

    # Process JSON input as a stream
    json_df = (
        spark.readStream.format('json')
        .schema(data_schema)
        .load(json_input_dir)
    )

    # Process PDF files
    pdf_files = [process_pdf(f'{pdf_input_dir}/{file}') for file in os.listdir(pdf_input_dir)]

    # Process image files
    img_files = [process_image(f'{img_input_dir}/{file}') for file in os.listdir(img_input_dir)]

    # Process Excel files
    excel_dfs = [process_excel(f'{excel_input_dir}/{file}') for file in os.listdir(excel_input_dir)]

    # Union DataFrames
    combined_df = job_bulletins_df.union(json_df)

    # Function for writing stream to Parquet
    def stream_writer(input_df: DataFrame, checkpoint_dir: str, output_dir: str):
        """
        Writes the streaming DataFrame to a Parquet file with checkpointing.
        """
        return (
            input_df.writeStream.format('parquet')
            .option('checkpointLocation', checkpoint_dir)
            .option('path', output_dir)
            .outputMode('append')
            .trigger(processingTime='5 seconds')
            .start()
        )

    # Write the output to S3
    query = stream_writer(
        combined_df,
        checkpoint_dir='s3a://sparkunstructuredstreaming/checkpoints/',
        output_dir='s3a://sparkunstructuredstreaming/data/real_time_streaming/'
    )

    # Write the output to the console for debugging
    query_console = (
        combined_df.writeStream.outputMode('append')
        .format('console')
        .option('truncate', False)
        .start()
    )

    # Await termination
    query_console.awaitTermination()
    spark.stop()