# Exploring Synonymous Bigrams for LCT Keywords Using Spark

This notebook provides startup code for loading .xml data and further processing for this project.   
_<b>Make sure you are running this notebook on a cluster which use spark version 3.3.0, scala version 2.12 and has the spark-xml library (version 0.15.0) and spark-nlp (version 4.2.1) library installed, otherwise this notebook will not be able to read the data!</b>_

_<b>Make sure you are using DataBricks Runtime 11.3 or newer otherwise you will not be able to save any new files in this repository!</b>_

## Read Data From DBFS

Please place all .xml files under `DATA_PATH` (a path in DBFS that stores all xml files), and make sure no further directories are nested.

In [0]:
DATA_PATH = "/FileStore/data"

In [0]:
import os
file_list = [file.path for file in dbutils.fs.ls(DATA_PATH) if os.path.basename(file.path).endswith(".xml")]

In [0]:
file_list

Out[21]: ['dbfs:/FileStore/data/US_XML_AddFeed_20100101_20100107.xml',
 'dbfs:/FileStore/data/US_XML_AddFeed_20100108_20100114.xml']

In [0]:
df_raw=spark.read.format('com.databricks.spark.xml').options(rowTag='Job').load(','.join(file_list))
#df_raw=spark.read.format('com.databricks.spark.xml').options(rowTag='Job').load("/FileStore/data/US_XML_AddFeed_20100101_20100107.xml")
df_raw.show()

+----------+--------------------+-------------+--------------------+-----------+------------+--------------+--------------------+-----------------+-------------+--------------------+------------+--------------------+--------------------+-----------------+---------------+---------------------+--------------------+--------------------+--------------------+----------+--------------------------------+---------------------------+--------------------+------------------------+-------------------------+----------------+--------------------+------------+--------------------+--------------+-----------+-------------+----------+--------------------+---------+----------------+--------------------+--------------------+--------+--------+--------+---------+--------------------+---------------+--------------+-------------+---------------+---------------+--------------+-------------+---------------+-----------+--------------------+--------------------+------------+-------------------+
|    BGTOcc|      

In [0]:
df_raw.count()

Out[5]: 103645

In [0]:
df_raw.printSchema()

root
 |-- BGTOcc: string (nullable = true)
 |-- BGTSubOcc: string (nullable = true)
 |-- CIPCode: string (nullable = true)
 |-- CanonCertification: struct (nullable = true)
 |    |-- CanonCertification: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _name: string (nullable = true)
 |    |    |    |-- _type: string (nullable = true)
 |-- CanonCity: string (nullable = true)
 |-- CanonCountry: string (nullable = true)
 |-- CanonCounty: string (nullable = true)
 |-- CanonEmployer: struct (nullable = true)
 |    |-- _StockTicker: string (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |-- CanonIntermediary: string (nullable = true)
 |-- CanonJobHours: string (nullable = true)
 |-- CanonJobTitle: string (nullable = true)
 |-- CanonJobType: string (nullable = true)
 |-- CanonMaximumDegree: string (nullable = true)
 |-- CanonMinimumDegree: string (nullable = true)
 |-- CanonOther

In [0]:
df_raw.select("JobText").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Process Dataframe With Spark-nlp Pipeline

In [0]:
import pyspark.sql.functions as f
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

In [0]:
document_assembler = DocumentAssembler() \
    .setInputCol("JobText")
    
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

bigrams = NGramGenerator() \
            .setInputCols(["token"]) \
            .setOutputCol("bigrams") \
            .setN(2) 

pipeline = Pipeline(stages=[
    document_assembler, 
    tokenizer, 
    bigrams,
])

In [0]:
model = pipeline.fit(df_raw)
df_bi = model.transform(df_raw)

In [0]:
df_bi.select("bigrams.result").show(2, truncate=200)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                  result|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[From :, : Company, Company :, : Providence, Providence Health, Health &, & Services, Services (, ( ), ) Job, Job Reference, Reference ID, ID :, : 21228810, 21228810 Category, Category :, : other, ...|
|[> >, > please, please with, with care, care :, : miscategorized, miscategorized prohibited, prohibited spam/overpost, spam/overpost best, best of, of craigslist, craigslist Avoid, Avoid 

## Generate Bigram Count Dictionary

In [0]:
count_res = df_bi.withColumn('bigram', f.explode(f.col("bigrams.result"))).groupBy('bigram').count().sort('count', ascending=False)

In [0]:
# dbutils.fs.rm("/FileStore/data/bigram_count.parquet", True)

Out[45]: True

In [0]:
count_res.write.parquet("/FileStore/data/bigram_count.parquet") 

In [0]:
from pyspark.sql.types import IntegerType
count_df = spark.read.parquet("/FileStore/data/bigram_count.parquet")
count_df.withColumn("count", f.col("count").cast(IntegerType()))
count_df.sort('count', ascending=False).show()

+-----------+------+
|     bigram| count|
+-----------+------+
|   this job|204512|
| job poster|161537|
|   . Please|141359|
|        > >|122641|
|       , or|119409|
|      , and|111425|
|   , please|105321|
| Location :| 89634|
|       to :| 89340|
| about this| 86973|
| services ,| 85372|
|     do not| 84491|
|     Date :| 83978|
|       to a| 83409|
|        � �| 82456|
|  Please do| 81654|
|     or any| 81382|
| , products| 81372|
|    check ,| 81106|
|products or| 81083|
+-----------+------+
only showing top 20 rows



In [0]:
count_df.count()

Out[48]: 2743582

In [0]:
count_df.printSchema()

root
 |-- bigram: string (nullable = true)
 |-- count: long (nullable = true)



## Demo: Finding Synonymous Bigram for "renewable energy"

In [0]:
count_df.filter(count_df.bigram == 'renewable energy').show()
re_num = count_df.filter(count_df.bigram == 'renewable energy').select("count").collect()[0]["count"]

+----------------+-----+
|          bigram|count|
+----------------+-----+
|renewable energy|   74|
+----------------+-----+



In [0]:
threshold = 0.7

`threshold` is a hyperparameter that helps filter bigram counts. Only bigrams within the range of [base_bigram_count * (1-`threshold`), base_bigram_count * (1+`threshold`)] will be selected.
Using larger threshold will give you more potential candidates, but requires more human effort to identify useful ones from the results.
Using smaller threshold will save human effort, at risk of lossing synonym candidates.

In [0]:
re_approx = count_df.filter((f.col("count") < re_num*(1+threshold)) & (f.col("count") > re_num*(1-threshold)))

After filtering, put a core keyword(in this case, 'energy') to narrow the search.

In [0]:
re_approx.filter(f.col("bigram").contains('energy')).show(50, truncate=False)

+------------------------+-----+
|bigram                  |count|
+------------------------+-----+
|energy efficiency       |112  |
|energy sales            |97   |
|high-energy ,           |97   |
|, high-energy           |87   |
|the energy              |85   |
|energy to               |75   |
|renewable energy        |74   |
|high-energy work        |73   |
|yet high-energy         |67   |
|energy management       |66   |
|with energy             |61   |
|energy conservation     |51   |
|energy efficient        |45   |
|energy environment      |45   |
|high-energy personality |39   |
|positive energy         |35   |
|High-energy and         |35   |
|your energy             |33   |
|high-energy environment |33   |
|and high-energy         |30   |
|high-energy team        |30   |
|for high-energy         |28   |
|in energy               |28   |
|energy industry         |28   |
|hard-working High-energy|27   |
|energy individual       |27   |
|solar energy            |26   |
|energy-sm