In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


In [3]:
spark = SparkSession.builder\
    .appName('extractor2')\
    .getOrCreate()


24/10/03 14:26:00 WARN Utils: Your hostname, basel resolves to a loopback address: 127.0.1.1; using 192.168.1.5 instead (on interface wlp3s0)
24/10/03 14:26:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/03 14:26:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
# Create a SparkSession

# Define the schema for salaries_2
schema = StructType([
    StructField("work_year", IntegerType(), True),
    StructField("experience_level", StringType(), True),
    StructField("employment_type", StringType(), True),
    StructField("job_title", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("salary_currency", StringType(), True),
    StructField("salary_in_usd", IntegerType(), True),
    StructField("employee_residence", StringType(), True),
    StructField("remote_ratio", IntegerType(), True),
    StructField("company_location", StringType(), True),
    StructField("company_size", StringType(), True)
])

# Read the CSV files into DataFrames using the correct path and schema
data1 = spark.read.csv('../../data/raw/salaries_2.csv', header=True, schema=schema)
data2 = spark.read.csv('../../data/raw/jobs_in_data.csv', header=True, inferSchema=True)
data3 = spark.read.csv('jobs.csv', header=True, inferSchema=True)

# Transform jobs_in_Data to match the schema of salaries_2
data2 = data2.select(
    "work_year",
    "experience_level",
    "employment_type",
    "job_title",
    "salary",
    "salary_currency",
    "salary_in_usd",
    "employee_residence",
    "work_setting",
    "company_location",
    "company_size"
).withColumnRenamed("work_setting", "remote_ratio")

data3 = data3.select(
    data3["date_posted"].cast(IntegerType()).alias("work_year"),
    data3["job_level"].alias("experience_level"),
    data3["job_type"].alias("employment_type"),
    data3["title"].alias("job_title"),
    data3["min_amount"].cast(IntegerType()).alias("salary"),
    data3["currency"].alias("salary_currency"),
    data3["max_amount"].cast(IntegerType()).alias("salary_in_usd"),
    data3["location"].alias("employee_residence"),
    data3["is_remote"].cast(IntegerType()).alias("remote_ratio"),
    data3["location"].alias("company_location"),
    data3["company_num_employees"].alias("company_size")
)


# View the structure of the DataFrames
print("Structure of data1:")
data1.printSchema()

print("Structure of data2:")
data2.printSchema()

print("Structure of data3:")
data3.printSchema()

Structure of data1:
root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

Structure of data2:
root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: string (nullable = true)
 |-- company_location: string (nullable = true)


In [16]:
df_parquet= spark.read.parquet('../../data/raw/data.parquet')
df_json= spark.read.option("multiline", "true").json('../../data/raw/data.json')
data3.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     NULL|            NULL|           NULL|  Lead Data Engineer|130658|            USD|       178484|        Dallas, TX|        NULL|      Dallas, TX|        NULL|
|     NULL|            NULL|           NULL|    or similar tools|  NULL|           NULL|         NULL|              NULL|        NULL|            NULL|        NULL|
|     NULL|            NULL|           NULL|                NULL|  NULL|           NULL|         NULL|              NULL|        NULL|            NULL|        NULL|
|     NULL

In [20]:
merged_dataset = data1.union(data2).union(data3).union(df_parquet).union(df_json)

# View the structure of the merged dataset
print("Structure of merged_dataset:")
merged_dataset.printSchema()

# Show a few rows of the merged dataset
merged_dataset.show(5)

Structure of merged_dataset:
root
 |-- work_year: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: string (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: string (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+--------

In [21]:
# Show the size of the merged dataset
merged_size = merged_dataset.count()
print(f"Size of the merged dataset: {merged_size} rows")

Size of the merged dataset: 64270 rows


In [22]:
merged_dataset.coalesce(1).write.parquet("../../data/processed", mode='overwrite')


                                                                                