## Setting Global Config

In [1]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [2]:
import numpy as np
np.bool = np.bool_

In [3]:
#current notebook name
notebook_name = __session__.replace('.ipynb','')[__session__.rfind('/')+1:] 

In [4]:
# HDFS base paths
hdfs_lakehouse_base_path = 'hdfs://localhost:9000/lakehouse/'
hdfs_warehouse_base_path = 'hdfs://localhost:9000/warehouse'

## Creating SparkSession

In [5]:
import os
dependencies = ["org.apache.spark:spark-avro_2.12:3.5.0",
                "io.delta:delta-iceberg_2.12:3.0.0"]
os.environ['PYSPARK_SUBMIT_ARGS']= f"--packages {','.join(dependencies)} pyspark-shell"
os.environ['PYARROW_IGNORE_TIMEZONE'] = 'true'

In [6]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName(notebook_name)
    .config("spark.log.level","ERROR")
    .config("spark.sql.warehouse.dir",hdfs_warehouse_base_path)
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .enableHiveSupport()
    .getOrCreate()
)

24/12/15 03:46:18 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
24/12/15 03:46:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
io.delta#delta-iceberg_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-59a84c1f-07ae-4edc-b4a7-70e376802965;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.5.0 in central
	found org.tukaani#xz;1.9 in central
	found io.delta#delta-iceberg_2.12;3.0.0 in central
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.1.1 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.3 in central
	found org.checkerframework#checker-qual;3.19.0 in central
	found com.google.errorprone#error_prone_annotations;2.10.0 in central
:: resolution report :: resolve 3717ms :: artifacts dl 62ms
	:: modules in use:
	com.github.ben-m

## Reading Bronze DataFrame

In [7]:
jobs_brz = (spark.read
              .option("header","true")
              .option("escape","\"")
              .csv(f"{hdfs_lakehouse_base_path}/bronze/it_job_ratings/job_ratings/"))

                                                                                

In [8]:
jobs_brz.printSchema()

root
 |-- company_name: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- jobs: string (nullable = true)
 |-- interviews: string (nullable = true)
 |-- highly_rated_for: string (nullable = true)
 |-- critically_rated_for: string (nullable = true)
 |-- industry: string (nullable = true)



## Transforming Bronze DataFrame

In [9]:
from pyspark.sql.functions import *

In [10]:
from pyspark.sql.functions import regexp_replace, col

In [11]:
jobs_brz.toPandas()

                                                                                

Unnamed: 0,company_name,rating,reviews,jobs,interviews,highly_rated_for,critically_rated_for,industry
0,TCS,3.7,82.7k,37,9.8k,"Job Security, Work Life Balance","Promotions / Appraisal, Salary & Benefits, Work Satisfaction",Information Technology
1,Accenture,3.9,51.5k,32.7k,7.5k,"Company Culture, Job Security, Skill Development / Learning",Promotions / Appraisal,Consulting
2,Cognizant,3.8,46.4k,1.3k,5.3k,,"Promotions / Appraisal, Salary & Benefits",Information Technology
3,Wipro,3.7,46.3k,595,5.1k,Job Security,"Promotions / Appraisal, Salary & Benefits",Information Technology
4,Capgemini,3.8,38.2k,1.2k,4.5k,"Job Security, Work Life Balance, Skill Development / Learning","Promotions / Appraisal, Salary & Benefits",Consulting
...,...,...,...,...,...,...,...,...
615,Security and Intelligence Services (India),4.2,1.2k,23,51,"Job Security, Work Life Balance, Skill Development / Learning",,Security Services
616,Emami,4.0,1.2k,7,83,"Job Security, Work Life Balance",Promotions / Appraisal,Consumer Goods
617,NextComm Corporation,4.9,1.2k,--,5,"Work Life Balance, Promotions / Appraisal, Company Culture",,Information Technology
618,Shadowfax Technologies,3.6,1.2k,21,109,,"Promotions / Appraisal, Job Security, Company Culture",Logistics


In [12]:
jobs_slvr = jobs_brz.withColumn('reviews', 
                                when(col('reviews').rlike(r'(\d+)\.(\d+)k'), 
                                     (col('reviews').substr(lit(1), instr(col('reviews'), '.') - lit(1)).cast('int') * 1000 + 
                                      col('reviews').substr(instr(col('reviews'), '.') + lit(1), lit(1)).cast('int') * 100).cast('int'))
                                .when(col('reviews').rlike(r'(\d+)k'), 
                                     (regexp_replace(col('reviews'), 'k', '').cast('int') * 1000).cast('int'))
                                .otherwise(col('reviews').cast('int'))
                               ) \
                       .withColumn('jobs', 
                                when(col('jobs').rlike(r'(\d+)\.(\d+)k'), 
                                     (col('jobs').substr(lit(1), instr(col('jobs'), '.') - lit(1)).cast('int') * 1000 + 
                                      col('jobs').substr(instr(col('jobs'), '.') + lit(1), lit(1)).cast('int') * 100).cast('int'))
                                .when(col('jobs').rlike(r'(\d+)k'), 
                                     (regexp_replace(col('jobs'), 'k', '').cast('int') * 1000).cast('int'))
                                .otherwise(col('jobs').cast('int'))
                               ) \
                       .withColumn('interviews', 
                                when(col('interviews').rlike(r'(\d+)\.(\d+)k'), 
                                     (col('interviews').substr(lit(1), instr(col('interviews'), '.') - lit(1)).cast('int') * 1000 + 
                                      col('interviews').substr(instr(col('interviews'), '.') + lit(1), lit(1)).cast('int') * 100).cast('int'))
                                .when(col('interviews').rlike(r'(\d+)k'), 
                                     (regexp_replace(col('interviews'), 'k', '').cast('int') * 1000).cast('int'))
                                .otherwise(col('interviews').cast('int'))
                               ) \
                       .withColumn('rating', col('rating').cast('float')) \
                       .withColumn('jobs', col('jobs').cast('int')) \
                       .fillna({'jobs': 0})
jobs_slvr.show()

+--------------------+------+-------+-----+----------+--------------------+--------------------+--------------------+
|        company_name|rating|reviews| jobs|interviews|    highly_rated_for|critically_rated_for|            industry|
+--------------------+------+-------+-----+----------+--------------------+--------------------+--------------------+
|                 TCS|   3.7|  82700|   37|      9800|Job Security, Wor...|Promotions / Appr...|Information Techn...|
|           Accenture|   3.9|  51500|32700|      7500|Company Culture, ...|Promotions / Appr...|          Consulting|
|           Cognizant|   3.8|  46400| 1300|      5300|                NULL|Promotions / Appr...|Information Techn...|
|               Wipro|   3.7|  46300|  595|      5100|        Job Security|Promotions / Appr...|Information Techn...|
|           Capgemini|   3.8|  38200| 1200|      4500|Job Security, Wor...|Promotions / Appr...|          Consulting|
|           HDFC Bank|   3.9|  37000|  169|      2000|Jo

In [13]:
jobs_slvr.toPandas()["jobs"]

                                                                                

0         37
1      32700
2       1300
3        595
4       1200
       ...  
615       23
616        7
617        0
618       21
619       15
Name: jobs, Length: 620, dtype: int32

In [14]:
from functools import reduce
from pyspark.sql import functions as F

In [15]:
# Convert 'critically_rated_for' column to dummy variables
critically_rated_dummies = jobs_brz.select('company_name', 'critically_rated_for') \
    .withColumn('critically_rated_for', explode(split(col('critically_rated_for'), ', '))) \
    .groupBy('company_name') \
    .pivot('critically_rated_for') \
    .count() \
    .na.fill(0)

# Sum all the dummy variable columns per row
sum_columns = [col(c) for c in critically_rated_dummies.columns if c != 'company_name']
critically_rated_dummies = critically_rated_dummies.withColumn('total_ratings', reduce(lambda a, b: a + b, sum_columns))

# Rename the columns to 'bad_(whatever follows)'
for col_name in critically_rated_dummies.columns:
    if col_name != 'company_name':
        new_col_name = 'bad_' + col_name.lower().replace(' ', '_')
        critically_rated_dummies = critically_rated_dummies.withColumnRenamed(col_name, new_col_name)

critically_rated_dummies.show()

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+-------------------+----------------+--------------------------+---------------------+--------------------------------+---------------------+---------------------+-----------------+
|        company_name|bad_company_culture|bad_job_security|bad_promotions_/_appraisal|bad_salary_&_benefits|bad_skill_development_/_learning|bad_work_life_balance|bad_work_satisfaction|bad_total_ratings|
+--------------------+-------------------+----------------+--------------------------+---------------------+--------------------------------+---------------------+---------------------+-----------------+
|       Shahi Exports|                  0|               0|                         1|                    0|                               0|                    0|                    0|                1|
|   YASH Technologies|                  0|               0|                         1|                    0|                               0|                    0|                    0

                                                                                

In [16]:
# Convert 'highly_rated_for' column to dummy variables
highly_rated_dummies = jobs_brz.select('company_name', 'highly_rated_for') \
    .withColumn('highly_rated_for', explode(split(col('highly_rated_for'), ', '))) \
    .groupBy('company_name') \
    .pivot('highly_rated_for') \
    .count() \
    .na.fill(0)
sum_columns = [col(c) for c in highly_rated_dummies.columns if c != 'company_name']
highly_rated_dummies = highly_rated_dummies.withColumn('total_ratings', reduce(lambda a, b: a + b, sum_columns))

# Rename the columns to 'good_(whatever follows)'
for col_name in highly_rated_dummies.columns:
    if col_name != 'company_name':
        new_col_name = 'good_' + col_name.lower().replace(' ', '_')
        highly_rated_dummies = highly_rated_dummies.withColumnRenamed(col_name, new_col_name)

# Join the two dataframes on 'company_name'
review_score = critically_rated_dummies.join(highly_rated_dummies, on='company_name', how='outer')

review_score_slv = review_score.fillna(0)
review_score_slv = review_score_slv.withColumn('overall_score', col('good_total_ratings') - col('bad_total_ratings'))
review_score_slv = review_score_slv.withColumnRenamed('good_salary_&_benefits', 'good_salary_and_benefits')
review_score_slv = review_score_slv.withColumnRenamed('bad_salary_&_benefits', 'bad_salary_and_benefits')
for col_name in review_score.columns:
    new_col_name = col_name.replace('_/', '')
    review_score_slv = review_score_slv.withColumnRenamed(col_name, new_col_name)
review_score_slv.toPandas()

                                                                                

Unnamed: 0,company_name,bad_company_culture,bad_job_security,bad_promotions_appraisal,bad_salary_and_benefits,bad_skill_development_learning,bad_work_life_balance,bad_work_satisfaction,bad_total_ratings,good_company_culture,good_job_security,good_promotions_appraisal,good_salary_and_benefits,good_skill_development_learning,good_work_life_balance,good_work_satisfaction,good_total_ratings,overall_score
0,24/7 Customer,0,0,1,1,0,0,1,3,0,0,0,0,0,0,0,0,-3
1,3i Infotech,0,0,1,1,1,0,0,3,0,0,0,0,0,0,0,0,-3
2,ABB Group,0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,3,3
3,ACC,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,-1
4,ACT Fibernet,0,0,0,0,0,0,0,0,1,1,0,1,0,0,0,3,3
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
609,eClerx,0,0,1,1,0,0,1,3,0,0,0,0,0,0,0,0,-3
610,iEnergizer,0,0,0,0,0,0,0,0,1,1,0,0,1,0,0,3,3
611,iOPEX Technologies,0,0,1,1,0,0,0,2,0,0,0,0,0,0,0,0,-2
612,suzuki motor gujarat,0,0,0,0,0,0,0,0,1,1,0,1,0,0,0,3,3


In [17]:
spark.sql("DROP SCHEMA IF EXISTS it_job_ratings CASCADE")
spark.sql("CREATE SCHEMA IF NOT EXISTS it_job_ratings")

DataFrame[]

DataFrame[]

In [18]:
(jobs_slvr.write
           .format("delta")
           .mode("overwrite")
           .option("path",f"{hdfs_lakehouse_base_path}/silver/it_job_ratings/job_ratings/")
           .saveAsTable("it_job_ratings.job_ratings"))

                                                                                

In [19]:
(review_score_slv.write
            .format("delta")
            .mode("overwrite")
            .option("path",f"{hdfs_lakehouse_base_path}/silver/it_job_ratings/review_scores/")
            .saveAsTable("it_job_ratings.review_scores"))

                                                                                