# LinkedIn Job Postings Analysis

- Not completed yet! Things written here are subject to change
- Don't forget to start all necessary services beforehand, as Python may claim those ports for itself!
- If you have problems with `pyspark` auto suggestions, there is a workaround mentioned in this [thread](https://github.com/microsoft/pylance-release/issues/4577)
- If Pylance crashed, restart the extension host (`Ctrl + Shift + P`) rather than restarting the whole kernel
- I will try to provide way to manipulate data using both Spark's SQL and Spark's RDD (or Pandas' dataframe)
- If the cell output is taking too much space, split the editor into 2 pages if necessary (`Ctrl + /`)

## Connect to Hive Database

In [104]:
from pyspark.sql import SparkSession
import warnings

# Ignore future warning because of PySpark's slow update
warnings.simplefilter(action = 'ignore', category = FutureWarning)

spark = (
    SparkSession
        .builder
        # local = single thread, local[*] = max threads
        .master('local[*]')
        # The default address for Hive metastore
        # .config('hive.metastore.uris', 'thrift://localhost:9083')
        # Seems to have no effect when "hive.metastore.uris" is used
        # .config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse')
        # Hive must to be configured to connect from Metabase
        .enableHiveSupport()
        .getOrCreate()
)

In [105]:
# View all databases
# print(spark.sql('SHOW DATABASES').show())
print(spark.catalog.listDatabases())

# View all tables
# print(spark.sql('SHOW TABLES').show())
print(spark.catalog.listTables())

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/C:/Users/Dhika/Documents/Projects/Proto/Python/Spark/spark-warehouse')]
[]


- `default` is the default database used by Hive
- List tables will list both persistent and temporary tables
- Persistent tables will likely be saved in Hive warehouse

## Read Data from Local Files

In [106]:
# import pyspark.pandas as pd
import pandas as pd
import os

# File list without extension
# We should group it by table relationship
# To easily navigate related tables by index later
files = [
    # Job related tables
    'data/job_postings',
    'data/job_details/benefits',
    'data/job_details/job_industries',
    'data/job_details/job_skills',
    # Company related tables
    'data/company_details/companies',
    'data/company_details/company_industries',
    'data/company_details/company_specialities',
    'data/company_details/employee_counts'
]

# To keep the column order
headers = []

for file in files:
    if os.path.isfile(f'{file}.csv'):
        # Force all column types as string
        # Auto conversion is inaccurate and will only give me more work
        csv = pd.read_csv(f'{file}.csv', dtype = str)

        # Save the original column order
        headers.append(list(csv.columns))

        # It seems that PySpark can't read complex CSV files correctly
        # Though Pandas seems able to read them just fine
        # As workaround, I use Pandas to export them to JSON first
        if not os.path.isfile(f'{file}.json'):
            csv.to_json(f'{file}.json', orient = 'records')

### Process Data as Dataframe
- **IMPORTANT!** We should not save dataframe in individual variables (`df_a`, `df_b`, etc), because we can't loop through variables dynamically. If we keep `df_a` and `df_b` in a list (e.g. `df_list = [df_a, df_b]`), when we need to replace the variable by index (e.g. `df_list[0] = other value`), it won't actually replace `df_a` so `df_list[0] == df_a` will return `False`. Save them in dictionary instead!
- To show the dataframe later, we can either use `print(xxx.show())` or `display(xxx.toPandas())`. However, converting `toPandas` first may take a large amount of memory if the data size is huge
- Dataframe must always be synced with temp view to produce consistent result

In [107]:
# To differentiate between HDFS and real system path
# By default, PySpark will use HDFS path if we enable Hive
# hdfs:// = Hadoop, file:// = real system
def real_path(path):
    if os.environ.get('OS','') == 'Windows_NT':
        return 'file:///' + os.path.abspath(path)
    # We only need 2 slash for Linux
    return 'file://' + os.path.abspath(path)

# Dataframe dict that we will oftenly use later
# The order must be the same as "files" or "headers"
# Must be initialized this way to get the auto suggestions from Pylance
df_list = {
    # Read files as dataframe
    'job_postings': spark.read.json(real_path(files[0] + '.json')),
    'job_benefits': spark.read.json(real_path(files[1] + '.json')),
    'job_industries': spark.read.json(real_path(files[2] + '.json')),
    'job_skills': spark.read.json(real_path(files[3] + '.json')),
    'company': spark.read.json(real_path(files[4] + '.json')),
    'company_industries': spark.read.json(real_path(files[5] + '.json')),
    'company_specialities': spark.read.json(real_path(files[6] + '.json')),
    'company_employees': spark.read.json(real_path(files[7] + '.json')),
}

for idx, key in enumerate(df_list.keys()):
    # Restore the original column order
    df_list[key] = df_list[key].select(headers[idx])

# We won't need it again
del headers

### Process Dataframe as Temp View

- Pandas' dataframe can be converted to Spark's temp view so it can be manipulated using SQL
- The difference between temp view and global temp view can be viewed [here](https://stackoverflow.com/q/42774187)
- Temp view can be converted to persistent view using `CREATE TABLE` or `saveAsTable`, but not `CREATE VIEW`. It's because `CREATE VIEW` is basically just a saved `SELECT` query result, which require an actual table as reference
- After modifying dataframe, don't forget to save the temp view too, or they may not be in sync

In [108]:
for key in df_list.keys():
    # Create Spark's temp view from Pandas' dataframe
    # When accessing from global temp view, we use "global_temp" as the database name
    # E.g. "SELECT ... FROM global_temp.table_name WHERE ..."
    df_list[key].createOrReplaceGlobalTempView(key)

    # We should not create persistent table from temp view immediately
    # Spark will complain if we try to read and write from/to the same table
    # Use temp view to read the data, and save it to a table once we no longer modify the data
    # df_list[key].write.mode('overwrite').saveAsTable(key)

# print(spark.catalog.listTables())

## Data Preprocessing

In [109]:
from IPython.display import HTML

# Increase max columns to display
pd.set_option('display.max_columns', 30)
# Shorten max column width
pd.set_option('display.max_colwidth', 30)
# Use Plotly instead of Matplotlib
# pd.set_option('plotting.backend', 'plotly')

# Prevent line break when displaying table, decrease margin of table
HTML("<style>td {white-space: nowrap !important;} table {margin: 8px !important}</style>")

In [110]:
from IPython.display import display_html

# Display Pandas dataframe side by side
# Modified to allow nested dataframes to be displayed in rows
# https://stackoverflow.com/q/38783027
def display_inline(dfs: list, titles, row_limit = 5):
    # Even though nested dataframes are allowed, titles should not be nested
    # Nested dataframes will only have one title (at the start of div)
    if titles == None or titles == False:
        titles = [ f'' for i in dfs ]

    def _dataframe_check(df):
        # If dataframe is a spark object, convert it first
        # if hasattr(df, 'toPandas'): df = df.toPandas()

        # If dataframe length is not limited
        if len(df) > row_limit: df = df.head(row_limit)
        return df

    output = '<span>'
    for (title, df) in zip(titles, dfs):
        output += '<div style="display: inline-block; border: 0.5px solid">'
        output += f'<center>{title}</center>'

        # If the dataframe turns out to be nested dataframes
        # Display them in rows and group them as inline div instead
        if isinstance(df, list):
            for nested_df in df:
                nested_df = _dataframe_check(nested_df)
                output += nested_df.to_html(notebook = True)
        else:
            df = _dataframe_check(df)
            output += df.to_html(notebook = True)

        output += '</div>'
    output += '</span>'

    display_html(output, raw = True)

### Get Table Column Overviews

In [111]:
# Get the table column overviews
# Kind of useful to see relationship between tables
for key in df_list.keys():
    print(f'* {key}')
    print(f'  {df_list[key].columns}')
    print()

* job_postings
  ['job_id', 'company_id', 'title', 'description', 'max_salary', 'med_salary', 'min_salary', 'pay_period', 'formatted_work_type', 'location', 'applies', 'original_listed_time', 'remote_allowed', 'views', 'job_posting_url', 'application_url', 'application_type', 'expiry', 'closed_time', 'formatted_experience_level', 'skills_desc', 'listed_time', 'posting_domain', 'sponsored', 'work_type', 'currency', 'compensation_type']

* job_benefits
  ['job_id', 'inferred', 'type']

* job_industries
  ['job_id', 'industry_id']

* job_skills
  ['job_id', 'skill_abr']

* company
  ['company_id', 'name', 'description', 'company_size', 'state', 'country', 'city', 'zip_code', 'address', 'url']

* company_industries
  ['company_id', 'industry']

* company_specialities
  ['company_id', 'speciality']

* company_employees
  ['company_id', 'employee_count', 'follower_count', 'time_recorded']



#### Get Sample Data from Job Related Tables

In [112]:
# Temporary variables to keep dataframes to show
html_out = []
html_out_title = []

# See all job related tables (index: 0-3)
# df_list.keys() returns a set, which is unordered
# We better use df_list directly to slice a dict
for key in list(df_list)[0:4]:
    # Related tables should be put under one block and title
    html_out_block = []
    html_out_title.append(key)

    # See the first 3 rows
    # html_out_block.append(df_list[key].toPandas().head(3))
    html_out_block.append(spark.sql(f'SELECT * FROM global_temp.{key} LIMIT 3').toPandas())

    # See the total rows and unique rows
    # print(f'[{key}] All rows: {df_list[key].distinct().count()}, unique rows: {df_list[key].count()}')
    # html_out_block.append(spark.sql(
    #     'SELECT * FROM'
    #     f' (SELECT COUNT(*) AS all_rows FROM global_temp.{key})'
    #     ' CROSS JOIN'
    #     f' (SELECT DISTINCT COUNT(*) AS unique_rows FROM global_temp.{key})'
    # ).toPandas())

    # Find duplicated values in one column
    # https://stackoverflow.com/q/2594829
    # html_out_block.append(df_list[key].toPandas()['job_id'].value_counts().to_frame('duplicates').reset_index().head(3))
    html_out_block.append(spark.sql(
        f'SELECT job_id, COUNT(job_id) AS duplicates FROM global_temp.{key} GROUP BY job_id HAVING COUNT(job_id) > 1 LIMIT 3'
    ).toPandas())

    html_out.append(html_out_block)

display_inline(html_out, html_out_title)

Unnamed: 0,job_id,company_id,title,description,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,original_listed_time,remote_allowed,views,job_posting_url,application_url,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,posting_domain,sponsored,work_type,currency,compensation_type
0,85008768,,Licensed Insurance Agent,While many industries were...,52000.0,,45760.0,YEARLY,Full-time,"Chico, CA",,1690000000000.0,,5.0,https://www.linkedin.com/j...,,ComplexOnsiteApply,1710000000000.0,,,,1690000000000.0,,1,FULL_TIME,USD,BASE_SALARY
1,133114754,77766802.0,Sales Manager,Are you a dynamic and crea...,,,,,Full-time,"Santa Clarita, CA",,1690000000000.0,,,https://www.linkedin.com/j...,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,0,FULL_TIME,,
2,133196985,1089558.0,Model Risk Auditor,Join Us as a Model Risk Au...,,,,,Contract,"New York, NY",1.0,1690000000000.0,,17.0,https://www.linkedin.com/j...,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,0,CONTRACT,,

Unnamed: 0,job_id,duplicates

Unnamed: 0,job_id,inferred,type
0,3690843087,0,Medical insurance
1,3690843087,0,Dental insurance
2,3690843087,0,401(k)

Unnamed: 0,job_id,duplicates
0,3700551805,2
1,3693053249,2
2,3693067778,5

Unnamed: 0,job_id,industry_id
0,3378133231,68
1,3497509795,96
2,3690843087,47

Unnamed: 0,job_id,duplicates
0,3700155066,3
1,3700551805,3
2,3693049220,3

Unnamed: 0,job_id,skill_abr
0,3690843087,ACCT
1,3690843087,FIN
2,3691763971,MGMT

Unnamed: 0,job_id,duplicates
0,3700155066,2
1,3700551805,2
2,3693048268,3


From the cells above we can conclude that:
- Job id is unique in `job_postings`
- Job id is not unique in `job_benefits`, `job_industries`, and `job_skills`
- We can find out the most needed skills for similar job type or overall job
- We can find out the most needed jobs in each area (state) or as a whole (country)
- We can find out the most needed jobs in each industry
- We can find out the most difficult jobs (requires many skills) and compare whether the salary is worth it or not
- We can find out whether areas (states) affect salary or not
- We can find out which jobs/areas has the highest salaries, and maybe also filter based on seniority level
- We can find out which job postings has the highest views/applicants, and perhaps the factor behind it (e.g. company, remote, benefits)
- We can predict salary/job trends, but only if we have enough time series data

#### Get Sample Data from Company Related Tables

In [113]:
# Temporary variables to keep dataframes to show
html_out = []
html_out_title = []

# See all company related tables
# df_list.keys() returns a set, which is unordered
# We better use list(df_list) directly to slice a dict
for key in list(df_list)[4:8]:
    # Related tables should be put under one block and title
    html_out_block = []
    html_out_title.append(key)

    # See the first 3 rows
    # html_out_block.append(df_list[key].toPandas().head(3))
    html_out_block.append(spark.sql(f'SELECT * FROM global_temp.{key} LIMIT 3').toPandas())

    # See the total rows and unique rows
    # print(f'[{key}] All rows: {df_list[key].distinct().count()}, unique rows: {df_list[key].count()}')
    # html_out_block.append(spark.sql(
    #     'SELECT * FROM'
    #     f' (SELECT COUNT(*) AS all_rows FROM global_temp.{key})'
    #     ' CROSS JOIN'
    #     f' (SELECT DISTINCT COUNT(*) AS unique_rows FROM global_temp.{key})'
    # ).toPandas())

    # Find duplicated values in one column
    # https://stackoverflow.com/q/2594829
    # html_out_block.append(df_list[key].toPandas()['company_id'].value_counts().to_frame('duplicates').reset_index().head(3))
    html_out_block.append(spark.sql(
        f'SELECT company_id, COUNT(company_id) AS duplicates FROM global_temp.{key} GROUP BY company_id HAVING COUNT(company_id) > 1 LIMIT 3'
    ).toPandas())

    html_out.append(html_out_block)

display_inline(html_out, html_out_title)

Unnamed: 0,company_id,name,description,company_size,state,country,city,zip_code,address,url
0,1009,IBM,"At IBM, we do more than wo...",7,NY,US,"Armonk, New York",10504,International Business Mac...,https://www.linkedin.com/c...
1,1016,GE HealthCare,Every day millions of peop...,7,0,US,Chicago,0,-,https://www.linkedin.com/c...
2,1021,GE Power,"GE Power, part of GE Verno...",7,NY,US,Schenectady,12345,1 River Road,https://www.linkedin.com/c...

Unnamed: 0,company_id,duplicates

Unnamed: 0,company_id,industry
0,81149246,Higher Education
1,10033339,Information Technology & S...
2,6049228,Accounting

Unnamed: 0,company_id,duplicates
0,444038,7
1,14632005,4
2,1452948,11

Unnamed: 0,company_id,speciality
0,81149246,Childrens Music Education
1,81149246,Foundational Music Theory
2,81149246,Child Music Lessons

Unnamed: 0,company_id,duplicates
0,444038,105
1,14632005,40
2,53438162,10

Unnamed: 0,company_id,employee_count,follower_count,time_recorded
0,81149246,6,91,1692644644.2779734
1,10033339,3,187,1692644644.2779734
2,6049228,20,82,1692644645.1013184

Unnamed: 0,company_id,duplicates
0,444038,7
1,14632005,4
2,1452948,11


The company tabels are not as interesting as the job tables, but there are still some insights we can get:
- Company id is unique in `company`
- Company id is not unique in `company_industries`, `company_specialities`, and `company_employees`
- We may need to add company `name` to `job_postings` and `job_benefits`
- We can find out which companies has the highest job postings, are they having mass layoffs or high turnover rate?
- We can find out which companies pays the most or has high benefits
- We can find out which areas (states) that's oftenly used as company HQ

### Data Reduction

#### Reduce Data in the Job Postings Table

In [114]:
# See the job postings table again
# display(df_list['job_postings'].toPandas().head(3))
display(spark.sql(f'SELECT * FROM global_temp.job_postings LIMIT 3').toPandas())

Unnamed: 0,job_id,company_id,title,description,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,original_listed_time,remote_allowed,views,job_posting_url,application_url,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,posting_domain,sponsored,work_type,currency,compensation_type
0,85008768,,Licensed Insurance Agent,While many industries were...,52000.0,,45760.0,YEARLY,Full-time,"Chico, CA",,1690000000000.0,,5.0,https://www.linkedin.com/j...,,ComplexOnsiteApply,1710000000000.0,,,,1690000000000.0,,1,FULL_TIME,USD,BASE_SALARY
1,133114754,77766802.0,Sales Manager,Are you a dynamic and crea...,,,,,Full-time,"Santa Clarita, CA",,1690000000000.0,,,https://www.linkedin.com/j...,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,0,FULL_TIME,,
2,133196985,1089558.0,Model Risk Auditor,Join Us as a Model Risk Au...,,,,,Contract,"New York, NY",1.0,1690000000000.0,,17.0,https://www.linkedin.com/j...,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,0,CONTRACT,,


In [115]:
# Filter unimportant columns in the job postings table
filter_cols = [
    'application_url',
    'job_posting_url',
    'posting_domain',
    'sponsored',
    # Unless we are planning to use NLP, it's useless
    'description',
    'skill_desc',
    # There is already a formatted work type
    'work_type',
    # Time is important, but only if we have enough data
    # Should be executed at least monthly to get meaningful insights
    'original_listed_time',
    # 'listed_time',
    # 'closed_time',
    # 'expiry'
]

# The SQL will return a parse exception error, not sure why
# https://stackoverflow.com/q/43644450
# display(df_list['job_postings'].drop(*filter_cols).toPandas())
# display(spark.sql(f'SELECT * EXCEPT ({", ".join(filter_cols)}) FROM global_temp.job_postings').toPandas())

# FIXME The "EXCEPT" SQL query doesn't work
# Using pythonic way "not in" to inverse filter as workaround
inverse_filter_cols = [ i for i in df_list['job_postings'].schema.names if i not in filter_cols ]

# display(df_list['job_postings'].select(*inverse_filter_cols).toPandas())
df_list['job_postings'] = spark.sql(f'SELECT {", ".join(inverse_filter_cols)} FROM global_temp.job_postings')
df_list['job_postings'].createOrReplaceGlobalTempView('job_postings')

display(df_list['job_postings'].toPandas().head(3))

Unnamed: 0,job_id,company_id,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,85008768,,Licensed Insurance Agent,52000.0,,45760.0,YEARLY,Full-time,"Chico, CA",,,5.0,ComplexOnsiteApply,1710000000000.0,,,,1690000000000.0,USD,BASE_SALARY
1,133114754,77766802.0,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,
2,133196985,1089558.0,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,


#### Reduce Data in the Company Table

In [116]:
# See the company table again
# display(df_list['company'].toPandas().head(3))
display(spark.sql(f'SELECT * FROM global_temp.company LIMIT 3').toPandas())

Unnamed: 0,company_id,name,description,company_size,state,country,city,zip_code,address,url
0,1009,IBM,"At IBM, we do more than wo...",7,NY,US,"Armonk, New York",10504,International Business Mac...,https://www.linkedin.com/c...
1,1016,GE HealthCare,Every day millions of peop...,7,0,US,Chicago,0,-,https://www.linkedin.com/c...
2,1021,GE Power,"GE Power, part of GE Verno...",7,NY,US,Schenectady,12345,1 River Road,https://www.linkedin.com/c...


In [117]:
# Filter unimportant columns in the company table
filter_cols = [
    'description',
    'url'
]

# The SQL will return a parse exception error, not sure why
# https://stackoverflow.com/q/43644450
# display(df_list['company'].drop(*filter_cols).toPandas())
# display(spark.sql(f'SELECT * EXCEPT ({", ".join(filter_cols)}) FROM global_temp.company').toPandas())

# FIXME The "EXCEPT" SQL query doesn't work
# Using pythonic way "not in" to inverse filter as workaround
inverse_filter_cols = [ i for i in df_list['company'].schema.names if i not in filter_cols ]

# display(df_comp.select(*inverse_filter_cols).toPandas())
df_list['company'] = spark.sql(f'SELECT {", ".join(inverse_filter_cols)} FROM global_temp.company')
df_list['company'].createOrReplaceGlobalTempView('company')

display(df_list['company'].toPandas().head(3))

Unnamed: 0,company_id,name,company_size,state,country,city,zip_code,address
0,1009,IBM,7,NY,US,"Armonk, New York",10504,International Business Mac...
1,1016,GE HealthCare,7,0,US,Chicago,0,-
2,1021,GE Power,7,NY,US,Schenectady,12345,1 River Road


### Data Transformation

In [118]:
from pyspark.sql import functions as F
# To cast data type from one to another
# https://spark.apache.org/docs/latest/sql-ref-datatypes.html
from pyspark.sql.types import (
    LongType,
    IntegerType,
    DoubleType,
    BooleanType,
    TimestampType
)

#### Change Data Types in Job Related Tables

In [119]:
# See the job postings table again
# display(df_list['job_postings'].toPandas().head(3))
display(spark.sql(f'SELECT * FROM global_temp.job_postings LIMIT 3').toPandas())

Unnamed: 0,job_id,company_id,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,85008768,,Licensed Insurance Agent,52000.0,,45760.0,YEARLY,Full-time,"Chico, CA",,,5.0,ComplexOnsiteApply,1710000000000.0,,,,1690000000000.0,USD,BASE_SALARY
1,133114754,77766802.0,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,
2,133196985,1089558.0,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,1700000000000.0,,,,1690000000000.0,,


In [120]:
column_cast = {
    'job_id': LongType(),
    'company_id': LongType(),
    'max_salary': IntegerType(),
    'med_salary': IntegerType(),
    'min_salary': IntegerType(),
    'applies': IntegerType(),
    'remote_allowed': BooleanType(),
    'expiry': DoubleType(),
    'closed_time': DoubleType(),
    'listed_time': DoubleType()
}

# Convert non-string column to its original type
# https://sparkbyexamples.com/spark/spark-change-dataframe-column-type/
for key, val in column_cast.items():
    df_list['job_postings'] = df_list['job_postings'].withColumn(key, F.col(key).cast(val))
    # Convert epoch time from miliseconds to seconds
    # So it can be converted to timestamp (date)
    if val == DoubleType():
        df_list['job_postings'] = df_list['job_postings'].withColumn(key, F.col(key) / 1000)

# Convert double back to timestamp
df_list['job_postings'] = df_list['job_postings'].withColumns({
    'expiry': F.col('expiry').cast(TimestampType()),
    'closed_time': F.col('closed_time').cast(TimestampType()),
    'listed_time': F.col('listed_time').cast(TimestampType())
})

# Overwrite the old table
df_list['job_postings'].createOrReplaceGlobalTempView('job_postings')
df_list['job_postings'].printSchema()

root
 |-- job_id: long (nullable = true)
 |-- company_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- max_salary: integer (nullable = true)
 |-- med_salary: integer (nullable = true)
 |-- min_salary: integer (nullable = true)
 |-- pay_period: string (nullable = true)
 |-- formatted_work_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- applies: integer (nullable = true)
 |-- remote_allowed: boolean (nullable = true)
 |-- views: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- expiry: timestamp (nullable = true)
 |-- closed_time: timestamp (nullable = true)
 |-- formatted_experience_level: string (nullable = true)
 |-- skills_desc: string (nullable = true)
 |-- listed_time: timestamp (nullable = true)
 |-- currency: string (nullable = true)
 |-- compensation_type: string (nullable = true)



In [121]:
print(list(df_list))

['job_postings', 'job_benefits', 'job_industries', 'job_skills', 'company', 'company_industries', 'company_specialities', 'company_employees']


In [122]:
# Check data type in the other job related tables
for key in list(df_list)[1:4]:
    print(key, end = ' ')
    df_list[key].printSchema()

job_benefits root
 |-- job_id: string (nullable = true)
 |-- inferred: string (nullable = true)
 |-- type: string (nullable = true)

job_industries root
 |-- job_id: string (nullable = true)
 |-- industry_id: string (nullable = true)

job_skills root
 |-- job_id: string (nullable = true)
 |-- skill_abr: string (nullable = true)



In [123]:
# When modifying dataframe, don't forget to save the change to temp view too!

# Transform data type in the other job related tables
for key in list(df_list)[1:4]:
    df_list[key] = df_list[key].withColumn('job_id', F.col('job_id').cast(LongType()))
    df_list[key].createOrReplaceGlobalTempView(key)

# Extra treatment for specific table(s)
df_list['job_industries'] = df_list['job_industries'].withColumn('industry_id', F.col('industry_id').cast(LongType()))
df_list['job_industries'].createOrReplaceGlobalTempView('job_industries')

# No need to reprint if we are certain of the result
# for key in list(df_list)[1:4]:
#     print(key, end = ' ')
#     df_list[key].printSchema()

#### Change Data Types in Company Related Tables

In [124]:
# See the company table again
# display(df_list['company'].toPandas().head(3))
display(spark.sql(f'SELECT * FROM global_temp.company LIMIT 3').toPandas())

Unnamed: 0,company_id,name,company_size,state,country,city,zip_code,address
0,1009,IBM,7,NY,US,"Armonk, New York",10504,International Business Mac...
1,1016,GE HealthCare,7,0,US,Chicago,0,-
2,1021,GE Power,7,NY,US,Schenectady,12345,1 River Road


In [125]:
column_cast = {
    'company_id': LongType(),
    'company_size': IntegerType(),
    'zip_code': IntegerType()
}

# Convert non-string column to its original type
# https://sparkbyexamples.com/spark/spark-change-dataframe-column-type/
for key, val in column_cast.items():
    df_list['company'] = df_list['company'].withColumn(key, F.col(key).cast(val))

# Overwrite the old table
df_list['company'].createOrReplaceGlobalTempView('company')
df_list['company'].printSchema()

root
 |-- company_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- company_size: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- address: string (nullable = true)



In [126]:
print(list(df_list))

['job_postings', 'job_benefits', 'job_industries', 'job_skills', 'company', 'company_industries', 'company_specialities', 'company_employees']


In [127]:
# Check data type in the other company related tables
for key in list(df_list)[5:8]:
    print(key, end = ' ')
    df_list[key].printSchema()

company_industries root
 |-- company_id: string (nullable = true)
 |-- industry: string (nullable = true)

company_specialities root
 |-- company_id: string (nullable = true)
 |-- speciality: string (nullable = true)

company_employees root
 |-- company_id: string (nullable = true)
 |-- employee_count: string (nullable = true)
 |-- follower_count: string (nullable = true)
 |-- time_recorded: string (nullable = true)



In [128]:
# When modifying dataframe, don't forget to save the change to temp view too!

# Transform data type in the other company related tables
for key in list(df_list)[5:8]:
    df_list[key] = df_list[key].withColumn('company_id', F.col('company_id').cast(LongType()))
    df_list[key].createOrReplaceGlobalTempView(key)

# Extra treatment for specific table(s)
df_list['company_employees'] = df_list['company_employees'].withColumns({
    'employee_count': F.col('employee_count').cast(IntegerType()),
    # Time recorded were written in a strange format
    # Cast string to double first
    'time_recorded': F.col('time_recorded').cast(DoubleType()),
})
df_list['company_employees'] = df_list['company_employees'].withColumn(
    # Cast double to int
    'time_recorded', F.col('time_recorded').cast(IntegerType())
)
df_list['company_employees'] = df_list['company_employees'].withColumn(
    # Cast int to timestamp
    'time_recorded', F.col('time_recorded').cast(TimestampType())
)
df_list['company_employees'].createOrReplaceGlobalTempView('company_employees')

# No need to reprint if we are certain of the result
# for key in list(df_list)[5:8]:
#     print(key, end = ' ')
#     df_list[key].printSchema()

#### Add Important Column to Other Tables

In [129]:
print(list(df_list))

# display(df_list['job_postings'].toPandas().head(3))
display(spark.sql('SELECT * FROM global_temp.job_postings LIMIT 3').toPandas())

['job_postings', 'job_benefits', 'job_industries', 'job_skills', 'company', 'company_industries', 'company_specialities', 'company_employees']


Unnamed: 0,job_id,company_id,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,85008768,,Licensed Insurance Agent,52000.0,,45760.0,YEARLY,Full-time,"Chico, CA",,,5.0,ComplexOnsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,USD,BASE_SALARY
1,133114754,77766802.0,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
2,133196985,1089558.0,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,


In [130]:
# Temporary variables to keep dataframes to show
html_out = []
html_out_title = []

# Add company name to other company related tables (index: 5-7)
# Also add it to job postings table (index: 0)
for key in ['job_postings'] + list(df_list)[5:8]:
    # PySpark allow duplicated column name from join operation
    # It can occur if we execute this cell more than once
    # This "if" condition is to prevent that
    if 'company_name' not in df_list[key].columns:
        len_bf = df_list[key].count()

        df_list[key] = spark.sql(
            f'SELECT a.*, b.name AS company_name FROM'
            # When using inner join, rows that has no relation will be discarded
            # Use left join to keep all rows, but the missing data will be filled with null
            f' global_temp.{key} AS a INNER JOIN global_temp.company AS b'
            ' ON a.company_id = b.company_id'
        )

        # By default, new column will be added as the last index
        # Reorder the company name index after company id index
        cols = df_list[key].columns[:-1]
        cols.insert(cols.index('company_id') + 1, 'company_name')

        # Replace the old dataframe and temp view
        df_list[key] = df_list[key][cols]
        df_list[key].createOrReplaceGlobalTempView(key)

        print(f'[{key}] Length before: {len_bf}, length after: {df_list[key].count()}')

    html_out_title.append(key)
    html_out.append(df_list[key].toPandas().head(3))

display_inline(html_out, html_out_title)

[job_postings] Length before: 15886, length after: 15470
[company_industries] Length before: 15880, length after: 15830
[company_specialities] Length before: 128355, length after: 127992
[company_employees] Length before: 15907, length after: 15857


Unnamed: 0,job_id,company_id,company_name,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,133114754,77766802,CargoLogin.,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
1,133196985,1089558,Employvision Inc.,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
2,381055942,96654609,First Baptist Church Forney,Business Manager,,,,,Full-time,"Forney, TX",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,

Unnamed: 0,company_id,company_name,industry
0,1009,IBM,Information Technology & S...
1,1009,IBM,Information Technology & S...
2,1009,IBM,Information Technology & S...

Unnamed: 0,company_id,company_name,speciality
0,91459053,Uppfront,online professional network
1,91459053,Uppfront,jobs
2,91459053,Uppfront,people search

Unnamed: 0,company_id,company_name,employee_count,follower_count,time_recorded
0,1009,IBM,316130,16114399,2023-08-24 11:19:39
1,1009,IBM,316130,16114399,2023-08-24 11:19:39
2,1009,IBM,316130,16114399,2023-08-24 11:19:39


In [131]:
# Add company name (from job postings) to job benefits
if 'company_name' not in df_list['job_benefits'].columns:
    len_bf = df_list[key].count()

    df_list['job_benefits'] = spark.sql(
        'SELECT a.*, b.company_name FROM'
        # When using inner join, rows that has no relation will be discarded
        ' global_temp.job_benefits AS a INNER JOIN global_temp.job_postings AS b'
        ' ON a.job_id = b.job_id'
    )

    # Reorder the index of company name column
    cols = df_list['job_benefits'].columns[:-1]
    cols.insert(1, 'company_name')

    # Replace the old dataframe and temp view
    df_list['job_benefits'] = df_list['job_benefits'].select(cols)
    df_list['job_benefits'].createOrReplaceGlobalTempView('job_benefits')

    print(f'[job_benefits] Length before: {len_bf}, length after: {df_list[key].count()}')

display(df_list['job_benefits'].toPandas().head(3))

[job_benefits] Length before: 15857, length after: 15857


Unnamed: 0,job_id,company_name,inferred,type
0,133114754,CargoLogin.,0,Vision insurance
1,133114754,CargoLogin.,0,401(k)
2,133114754,CargoLogin.,0,Medical insurance


#### Other Data Transformation Operations

In [132]:
# Temporary variables to keep dataframes to show
html_out = []
html_out_title = []

# See all possible values for some columns in job postings table
cols = [
    'formatted_work_type',
    'pay_period',
    'application_type',
    'formatted_experience_level',
    'currency',
    'compensation_type'
]

for key in cols:
    # print(df_job_postings.toPandas()[i].value_counts())
    html_out_title.append(key)
    html_out.append(spark.sql(f'SELECT {key}, COUNT({key}) AS count FROM global_temp.job_postings GROUP BY {key}').toPandas())

display_inline(html_out, html_out_title)

Unnamed: 0,formatted_work_type,count
0,Part-time,970
1,Contract,1709
2,Other,46
3,Volunteer,7
4,Full-time,12508

Unnamed: 0,pay_period,count
0,,0
1,HOURLY,2407
2,MONTHLY,80
3,YEARLY,3869

Unnamed: 0,application_type,count
0,OffsiteApply,9343
1,ComplexOnsiteApply,5197
2,SimpleOnsiteApply,930

Unnamed: 0,formatted_experience_level,count
0,Director,682
1,,0
2,Associate,1205
3,Executive,127
4,Entry level,3693

Unnamed: 0,currency,count
0,,0
1,USD,6356

Unnamed: 0,compensation_type,count
0,,0
1,BASE_SALARY,6356


In [133]:
# Compare two dataframes in PySpark
# https://stackoverflow.com/q/60279160

def compare_df(df1, df2, same_col: str = 'id'):
    columns = df1.columns

    df3 = df1.alias("d1").join(
        df2.alias("d2"),
        F.col(f"d1.{same_col}").eqNullSafe(F.col(f"d2.{same_col}")),
        "left"
    )

    for name in columns:
        df3 = df3.withColumn(
            name + "_temp",
            F.when(
                # If null is a possible value, don't use normal operator
                # Use "eqNullSafe" instead, but be aware that nan is not null!
                F.col("d1." + name).eqNullSafe(F.col("d2." + name)) == False,
                F.lit(name)
            )
        )

    df3.withColumn(
        "__diff__",
        F.concat_ws(",", *map(lambda name: F.col(name + "_temp"), columns))
    # You can choose either d1 or d2 to show
    ).select("__diff__", "d2.*").show()

In [134]:
# ======================================
# Set med (normalized) salary
# ======================================

# SQL update query is not supported temporarily by PySpark
# df_list['job_postings'] = spark.sql(
#     'UPDATE global_temp.job_postings '
#     ' SET med_salary = (CASE'
#     ' WHEN max_salary IS NOT NULL AND min_salary IS NOT NULL THEN (max_salary + min_salary) / 2'
#     ' WHEN max_salary IS NOT NULL THEN max_salary'
#     ' WHEN min_salary IS NOT NULL THEN min_salary'
#     ' END)'
# )

df_list['job_postings'] = df_list['job_postings'].withColumn(
    'med_salary',
    # No need to use multiple "otherwise" when method chaining "when"
    # However, if there's no otherwise, the value will be null if it doesn't match any condition
    F.when(
        (F.col('max_salary').isNotNull()) & (F.col('min_salary').isNotNull()),
        ((F.col('max_salary') + F.col('min_salary')) / 2).cast(IntegerType())
    ).when(
        F.col('max_salary').isNotNull(),
        F.col('max_salary')
    ).when(
        F.col('min_salary').isNotNull(),
        F.col('min_salary')
    )
)

# ======================================
# Set all pay period to monthly
# ======================================

for salary in ['max_salary', 'med_salary', 'min_salary']:
    df_list['job_postings'] = df_list['job_postings'].withColumn(
        salary, 
        F.when(
            (F.col('pay_period').eqNullSafe('HOURLY')) & (F.col(salary).isNotNull()),
            # Assume it's 9 to 5 workday, then it's 40 hours workweek
            F.col(salary) * 40 * 4
        ).when(
            (F.col('pay_period').eqNullSafe('YEARLY')) & (F.col(salary).isNotNull()),
            (F.col(salary) / 12).cast(IntegerType())
        # Otherwise null if not specified
        ).otherwise(F.col(salary))
    )

df_list['job_postings'] = df_list['job_postings'].withColumn(
    'pay_period',
    F.when(
        (F.col('pay_period').isNotNull()) & (F.col('med_salary').isNotNull()),
        'MONTHLY'
    )
)

df_list['job_postings'].createOrReplaceGlobalTempView('job_postings')
df_list['job_postings'].toPandas().head(10)

Unnamed: 0,job_id,company_id,company_name,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,133114754,77766802,CargoLogin.,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
1,133196985,1089558,Employvision Inc.,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
2,381055942,96654609,First Baptist Church Forney,Business Manager,,,,,Full-time,"Forney, TX",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
3,529257371,1244539,Ken Fulk Inc,NY Studio Assistant,,,,,Full-time,"New York, NY",,,2.0,ComplexOnsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,,
4,903408693,3894635,Sunnyland Farms,Office Associate,3500.0,3291.0,3083.0,MONTHLY,Full-time,"Albany, GA",5.0,,49.0,ComplexOnsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,USD,BASE_SALARY
5,967848246,18995316,Paradigm Senior Services,Education Manager,,,,,Full-time,United States,45.0,True,411.0,OffsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
6,1004740969,882349,"Eric L. Davis Engineering,...",Civil Engineer,,,,,Full-time,"Forney, TX",28.0,,82.0,ComplexOnsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,,
7,1029078768,61469,United Staffing Solutions ...,Registered Nurse (RN) Vacc...,8000.0,8000.0,8000.0,MONTHLY,Part-time,"Muskegon, MI",,,4.0,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,USD,BASE_SALARY
8,1657978824,89350959,American Steel Builders,REMOTE STEEL BUILDING SALE...,12000.0,12000.0,12000.0,MONTHLY,Contract,"Texas, United States",,True,,OffsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,USD,BASE_SALARY
9,1928027033,11544533,Creative Concrete,Construction Project Manager,,,,,Contract,"Minot, ND",,,22.0,ComplexOnsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,,


## Exploratory Data Analysis

In [135]:
# Temporary variables to keep dataframes to show
html_out = []
html_out_title = []

for key in list(df_list):
    html_out_title.append(key)
    # html_out.append(df_list[key].toPandas().head(3))
    html_out.append(spark.sql(f'SELECT * FROM global_temp.{key} LIMIT 3').toPandas())

display_inline(html_out, html_out_title)

Unnamed: 0,job_id,company_id,company_name,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,133114754,77766802,CargoLogin.,Sales Manager,,,,,Full-time,"Santa Clarita, CA",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
1,133196985,1089558,Employvision Inc.,Model Risk Auditor,,,,,Contract,"New York, NY",1.0,,17.0,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
2,381055942,96654609,First Baptist Church Forney,Business Manager,,,,,Full-time,"Forney, TX",,,,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,

Unnamed: 0,job_id,company_name,inferred,type
0,133114754,CargoLogin.,0,Vision insurance
1,133114754,CargoLogin.,0,401(k)
2,133114754,CargoLogin.,0,Medical insurance

Unnamed: 0,job_id,industry_id
0,3378133231,68
1,3497509795,96
2,3690843087,47

Unnamed: 0,job_id,skill_abr
0,3690843087,ACCT
1,3690843087,FIN
2,3691763971,MGMT

Unnamed: 0,company_id,name,company_size,state,country,city,zip_code,address
0,1009,IBM,7,NY,US,"Armonk, New York",10504,International Business Mac...
1,1016,GE HealthCare,7,0,US,Chicago,0,-
2,1021,GE Power,7,NY,US,Schenectady,12345,1 River Road

Unnamed: 0,company_id,company_name,industry
0,1009,IBM,Information Technology & S...
1,1009,IBM,Information Technology & S...
2,1009,IBM,Information Technology & S...

Unnamed: 0,company_id,company_name,speciality
0,91459053,Uppfront,online professional network
1,91459053,Uppfront,jobs
2,91459053,Uppfront,people search

Unnamed: 0,company_id,company_name,employee_count,follower_count,time_recorded
0,1009,IBM,316130,16114399,2023-08-24 11:19:39
1,1009,IBM,316130,16114399,2023-08-24 11:19:39
2,1009,IBM,316130,16114399,2023-08-24 11:19:39


What we can find out based on initial observation:
- We can find out the most needed skills for similar job type or overall job
- We can find out the most needed jobs in each area (state) or as a whole (country)
- We can find out the most needed jobs in each industry
- We can find out the most difficult jobs (requires many skills) and compare whether the salary is worth it or not
- We can find out whether areas (states) affect salary or not
- We can find out which jobs/areas has the highest salaries, and maybe also filter based on seniority level
- We can find out which job postings has the highest views/applicants, and perhaps the factor behind it (e.g. company, remote, benefits)
- We can predict salary/job trends, but only if we have enough time series data
- We can find out which companies has the highest job postings, are they having mass layoffs or high turnover rate?
- We can find out which companies pays the most or has high benefits
- We can find out which areas (states) that's oftenly used as company HQ

In [146]:
spark.sql('SELECT * FROM global_temp.job_postings WHERE LOWER(title) LIKE "%data %" OR LOWER(title) LIKE "%intelligence%" ').toPandas()

Unnamed: 0,job_id,company_id,company_name,title,max_salary,med_salary,min_salary,pay_period,formatted_work_type,location,applies,remote_allowed,views,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,currency,compensation_type
0,3586162459,69642092,"Sapience, Inc",Teradata Developer,,,,,Contract,United States,13.0,True,56,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
1,3682818140,68921004,eBusiness Technologies Corp.,Senior Business Intelligen...,12.0,10.0,8.0,MONTHLY,Contract,"McKinney, TX",41.0,,155,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,Executive,,2023-07-22 11:26:40,USD,BASE_SALARY
2,3690692186,61242,"Universal Screen Arts, Inc.",Seasonal Payroll/Data Entr...,,,,,Temporary,"Hudson, OH",55.0,,325,OffsiteApply,2024-03-09 23:00:00,NaT,,,2023-07-22 11:26:40,,
3,3692302089,37768,"Milestone Technologies, Inc.",Data Scientist/ Product An...,12800.0,12000.0,11200.0,MONTHLY,Contract,"San Francisco, CA",7.0,,37,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,Mid-Senior level,,2023-07-22 11:26:40,USD,BASE_SALARY
4,3692363778,2474970,DiLytics,Data Analytics Consultant,,,,,Full-time,"Sacramento, CA",7.0,True,26,ComplexOnsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
366,3701369242,11834,Broadridge,Lead Data Management Servi...,,,,,Full-time,"Pittsburgh, PA",8.0,,34,OffsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,,
367,3701369746,39203,LinQuest,Data Scientist / Operation...,13333.0,10833.0,8333.0,MONTHLY,Full-time,"Colorado Springs, CO",41.0,,215,OffsiteApply,2023-11-15 05:13:20,NaT,Entry level,,2023-07-22 11:26:40,USD,BASE_SALARY
368,3701371901,2848937,MSD,Data Engineering Product L...,25375.0,20747.0,16120.0,MONTHLY,Full-time,"West Point, PA",7.0,,22,OffsiteApply,2023-11-15 05:13:20,NaT,,,2023-07-22 11:26:40,USD,BASE_SALARY
369,3701372446,2113831,GovCIO,OCM Data Analyst (Remote),7333.0,7208.0,7083.0,MONTHLY,Full-time,"Fairfax, VA",47.0,,165,OffsiteApply,2023-11-15 05:13:20,NaT,Entry level,,2023-07-22 11:26:40,USD,BASE_SALARY


In [None]:
# tables = [ i.name for i in spark.catalog.listTables() ]
# print(tables)

# for table in tables:
#     spark.catalog.dropTempView(table)

In [None]:
# Create persistent table based on temp view
# We can also use "CREATE TABLE ... AS SELECT ...", but we must write all the columns and types manually
# After creating table, we need to change "global_temp" with the real database name (e.g. "default")
# df_list[i].write.mode('overwrite').saveAsTable(tb_list[i])