# **Bronze layer**  - Ingesting data to databricks

In [0]:
import requests, json
from pyspark.sql import SparkSession

#Defining Spark Session
spark=SparkSession.builder.appName("JobETL_Bronze").getOrCreate()

API_KEY = "630f7f2b9c335376c24528a0c6997245ea046c46929b4ead4211322c87bf9064"

roles = ["Data Engineer", "Data Analyst", "Python Developer", "ETL Developer", "Spark Engineer"]
location = "India"
all_jobs=[]

#running a loop over the job roles
for role in roles:
    params={
        "engine":"google_jobs",
        "q":role,
        "location":location,
        "api_key":API_KEY
    }
    res = requests.get("https://serpapi.com/search.json", params=params)

    print("reading live data from google jobs api")

    jobs = res.json().get("jobs_results", [])

    for job in jobs:
        job["search_role"] = role
    all_jobs.extend(jobs)

#bronze_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(job) for job in all_jobs]))

bronze_df = spark.createDataFrame(all_jobs)
bronze_df.write.format("delta").option("mergeSchema",'true').mode("overwrite").saveAsTable("bronze_jobs_raw")

print("data written to bronze layer")

**Silver Layer** - cleaning and structuring data


In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Silver").getOrCreate()

#reading bronze layer
print("reading from broze layer to silver layer")
df =spark.read.table("bronze_jobs_raw")

#cleaning and structuring
silver_df= df.selectExpr(
    "title",
    "company_name",
    "location",
    "description",
    "job_id",
    "detected_extensions.posted_at as posted_at",
    "search_role"
).dropna(subset=["title","company_name","location"])
 
#display(silver_df)


In [0]:
#trimming company name column and make it upper 
silver_df=silver_df.withColumn("company_name",trim(upper(col("company_name"))))
#dropping na in posted at column
silver_df=silver_df.dropna(subset=["posted_at"])

#writing the data to silver table
silver_df.write.format("delta").option("mergeSchema",'true').mode("overwrite").saveAsTable("silver_jobs_raw")

print("Data written to silver layer")
#display(silver_df)

**Golden layer** - generate KPI table


In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Gold").getOrCreate()
print("reading from silver layer to gold layer")
df =spark.read.table("silver_jobs_raw")

# KPI 1 top companies
top_companies=df.groupBy("company_name").count().orderBy(col("count").desc())
#display(top_companies)


# KPI 2jobs by city

job_by_city=df.groupBy("location").agg(count("*").alias('job_count')).orderBy(col('job_count').desc())
#display(job_by_city)

In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Gold").getOrCreate()

df =spark.read.table("silver_jobs_raw")
df.write.mode("append").saveAsTable("job_aggregator_table")

print("data successfully written to taBLE -job_aggregator_table ")

In [0]:
print("full ETL job aggregator project is completed and live!!!")

In [0]:
%sql
describe history job_aggregator_table