In [0]:
# importing required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, upper, col

In [0]:
# defining the Spark Session named Google_JobETL_Bronze
app_name = "Google_JobETL_Silver"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [0]:
# checking spark Session name (appname)
print('App Name :',app_name)

In [0]:
# Checking all the catalogs in unity-catalog
display(spark.sql('SHOW CATALOGS'))

In [0]:
# Checking all the avilable schemas in the catalog
display( spark.sql('SHOW SCHEMAS') )

In [0]:
# Checking the current unity-catalog and creating catalog if not present and using it
spark.sql(' CREATE CATALOG IF NOT EXISTS job_marketplace ')
spark.sql(' USE CATALOG job_marketplace ')

In [0]:
# displaying  present schemas in the job_marketplace catalog
display( spark.sql('SHOW SCHEMAS') )

In [0]:
# checking and creating the silver_layer schema in the job_marketplace catalog
spark.sql('CREATE SCHEMA IF NOT EXISTS job_marketplace.silver_layer')

In [0]:
# Checking if the silver layer schema is present in the job_marketplace catalog
display( spark.sql('SHOW SCHEMAS') )

In [0]:
# Reading the data from bronze layer
df = spark.read.format('delta').table('job_marketplace.bronze_layer.daily_jobs')
display(df)
print("✅Reading from Bronze Layer --> Silver Layer")

In [0]:
from pyspark.sql.functions import expr
columns_to_extract = ["posted_at", "schedule_type"] 

for key in columns_to_extract:
    df = df.withColumn(key, expr(f"detected_extensions['{key}']"))

In [0]:
display( df )

In [0]:
# from pyspark.sql.functions import regexp_replace, col , lower , split, trim, size, when

# df_cleaned = df.withColumn(
#     "location_cleaned",
#     regexp_replace(col("location"), r"\s*\(\+\d+\s+others?\)", "")
# )

# split_col = split(col("location_cleaned"), ",")

# df_split = df_cleaned.withColumn("city_raw", trim(split_col.getItem(0))) \
#     .withColumn("state_raw", when(size(split_col) >= 2, trim(split_col.getItem(1)))) \
#     .withColumn("country_raw", when(size(split_col) >= 3, trim(split_col.getItem(2))))  # null-safe

# df_final = df_split.withColumn(
#     "city",
#     when(lower(col("location_cleaned")).isin("india", "anywhere"), None).otherwise(col("city_raw"))
# ).withColumn(
#     "state",
#     when(lower(col("location_cleaned")).isin("india", "anywhere"), None).otherwise(col("state_raw"))
# ).withColumn(
#     "country",
#     when(lower(col("location_cleaned")) == "anywhere", "Remote")
#     .when(col("country_raw").isNull(), "India")
#     .otherwise(col("country_raw"))
# )


In [0]:
from pyspark.sql.functions import regexp_replace, col, lower, split, trim, size, when

# ✅ Step 1: Clean (+N others) from location
df_cleaned = df.withColumn(
    "location_cleaned",
    regexp_replace(col("location"), r"\s*\(\+\d+\s+others?\)", "")
)

# ✅ Step 2: Split location into components
split_col = split(col("location_cleaned"), ",")

# ✅ Step 3: Get raw parts
df_split = df_cleaned.withColumn("city_raw", trim(split_col.getItem(0))) \
    .withColumn("state_raw", when(size(split_col) >= 2, trim(split_col.getItem(1)))) \
    .withColumn("country_raw", when(size(split_col) >= 3, trim(split_col.getItem(2))))  # null-safe

# ✅ Step 4: List of known Indian states (lowercase)
known_states = [
    "andhra pradesh", "arunachal pradesh", "assam", "bihar", "chhattisgarh", "goa",
    "gujarat", "haryana", "himachal pradesh", "jharkhand", "karnataka", "kerala",
    "madhya pradesh", "maharashtra", "manipur", "meghalaya", "mizoram", "nagaland",
    "odisha", "punjab", "rajasthan", "sikkim", "tamil nadu", "telangana", "tripura",
    "uttar pradesh", "uttarakhand", "west bengal", "delhi", "jammu and kashmir"
]

# ✅ Step 5: Final transformations
df_final = df_split.withColumn(
    "city",
    when(lower(col("location_cleaned")).isin("india", "anywhere"), None)
    .when(lower(col("city_raw")).isin(known_states) & 
          (lower(col("state_raw")).isin(["india"]) | col("state_raw").isNull()), None)
    .otherwise(col("city_raw"))
).withColumn(
    "state",
    when(lower(col("location_cleaned")).isin("india", "anywhere"), None)
    .when(lower(col("city_raw")).isin(known_states) & 
          (lower(col("state_raw")).isin(["india"]) | col("state_raw").isNull()), col("city_raw"))
    .otherwise(col("state_raw"))
).withColumn(
    "country",
    when(lower(col("location_cleaned")) == "anywhere", "Remote")
    .when(col("country_raw").isNotNull(), col("country_raw"))
    .when(col("state_raw").isNotNull(), col("state_raw"))
    .otherwise("India")
)


In [0]:
display( df_final )

In [0]:
# df_final
from pyspark.sql.functions import regexp_extract, col, current_timestamp, expr, when, lower

# Extract the number and unit
df1 = df_final.withColumn("posted_at_lower", lower(col("posted_at")))

df2 = df1.withColumn("number", regexp_extract("posted_at_lower", r"(\d+)", 1).cast("int")) \
         .withColumn("unit", regexp_extract("posted_at_lower", r"\d+\s*(\w+)", 1))

# Create expressions for timestamp subtraction using expr and string interpolation
df_final1 = df2.withColumn(
    "posted_at_ts",
    when(col("unit").isin("hour", "hours"),
         expr("timestampadd(HOUR, -number, current_timestamp())"))
    .when(col("unit").isin("day", "days"),
         expr("timestampadd(DAY, -number, current_timestamp())"))
    .when(col("unit").isin("minute", "minutes"),
         expr("timestampadd(MINUTE, -number, current_timestamp())"))
)

df_final2 = df.withColumn("posted_at_ts", when(col("posted_at_ts").isNull(), current_timestamp()).otherwise(col("posted_at_ts")))

# Show result
display(df_final2.select("posted_at", "number", "unit", "posted_at_ts"))


In [0]:
silver_df = df_final1.selectExpr( "job_id" , "title as job_title" , "search_role", "company_name", "description", "location_cleaned as loacation", 'city' , 'state','country', "share_link", "via", "posted_at_ts as posted_at", "schedule_type")

display( silver_df )

In [0]:
# Create a temp view to use in merge
silver_df.createOrReplaceTempView("new_jobs")

In [0]:
%sql
select * from new_jobs LIMIT 1

In [0]:
%sql
CREATE TABLE IF NOT EXISTS job_marketplace.silver_layer.clean_jobs (
  job_id STRING,
  job_title STRING,
  search_role STRING,
  company_name STRING,
  description STRING,
  loacation STRING,           -- Note: 'location' is misspelled in the source; keeping it consistent
  city STRING,
  state STRING,
  country STRING,
  share_link STRING,
  via STRING,
  posted_at TIMESTAMP,        -- You now have it in proper timestamp format
  schedule_type STRING
)
USING DELTA;


In [0]:
%sql
-- DROP TABLE job_marketplace.silver_layer.clean_jobs

In [0]:
%sql
select * from job_marketplace.silver_layer.clean_jobs

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Deduplicate source rows by keeping the latest posted_at
window_spec = Window.partitionBy("job_id").orderBy(col("posted_at").desc())

new_jobs_deduped = silver_df.withColumn("rn", row_number().over(window_spec)) \
                             .filter(col("rn") == 1) \
                             .drop("rn")

# Register deduped DataFrame as temp view
new_jobs_deduped.createOrReplaceTempView("new_jobs")


In [0]:
merge_sql = """
MERGE INTO job_marketplace.silver_layer.clean_jobs AS target
USING new_jobs AS source
ON target.job_id = source.job_id
WHEN MATCHED THEN UPDATE SET
  target.job_title      = source.job_title,
  target.search_role    = source.search_role,
  target.company_name   = source.company_name,
  target.description    = source.description,
  target.loacation      = source.loacation,
  target.city           = source.city,
  target.state          = source.state,
  target.country        = source.country,
  target.share_link     = source.share_link,
  target.via            = source.via,
  target.posted_at      = source.posted_at,
  target.schedule_type  = source.schedule_type
WHEN NOT MATCHED THEN INSERT (
  job_id,
  job_title,
  search_role,
  company_name,
  description,
  loacation,
  city,
  state,
  country,
  share_link,
  via,
  posted_at,
  schedule_type
) VALUES (
  source.job_id,
  source.job_title,
  source.search_role,
  source.company_name,
  source.description,
  source.loacation,
  source.city,
  source.state,
  source.country,
  source.share_link,
  source.via,
  source.posted_at,
  source.schedule_type
)
"""
spark.sql(merge_sql)
print("✅ Data successfully merged into Silver Layer")


In [0]:
%sql
select * from job_marketplace.silver_layer.clean_jobs limit 1