In [16]:
#!/usr/bin/env python
# coding: utf-8
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit
import lxml
import lxml.etree
import lxml.html
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import pandas
from sqlalchemy import create_engine

In [17]:
spark = SparkSession \
    .builder \
    .getOrCreate()

In [18]:
df = spark.read.json("sample")
map_onet = spark.read.csv("map_onet_soc.csv", header = "true")
soc_heir = spark.read.csv("soc_hierarchy.csv", header = "true")

In [19]:
map_onet.show()

+----------+-------+
|      onet|   soc5|
+----------+-------+
|11-1011.00|11-1011|
|11-1011.03|11-1011|
|11-1021.00|11-1021|
|11-1031.00|11-1031|
|11-2011.00|11-2011|
|11-2011.01|11-2011|
|11-2021.00|11-2021|
|11-2022.00|11-2022|
|11-2031.00|11-2031|
|11-3011.00|11-3011|
|11-3021.00|11-3021|
|11-3031.00|11-3031|
|11-3031.01|11-3031|
|11-3031.02|11-3031|
|11-3051.00|11-3051|
|11-3051.01|11-3051|
|11-3051.02|11-3051|
|11-3051.03|11-3051|
|11-3051.04|11-3051|
|11-3051.05|11-3051|
+----------+-------+
only showing top 20 rows



I wasn't sure if pre-processing the mappings is allowed but converting the soc_hierarchy.csv to a dictionary will ensure contant lookup time

In [20]:
@F.udf(returnType=BooleanType())
def detect_html(s):
    try:
        return lxml.html.fromstring(s).find('.//*') is not None
            
    except lxml.etree.ParserError as e:
        return False

## Number of documents from which you successfully removed HTML tags.

In [21]:
df.filter(detect_html('body')).count()

4010

In [22]:
@F.udf(returnType=StringType())
def detect_html(s):
    try:
        if lxml.html.fromstring(s).find('.//*') is not None:
            return str(lxml.html.document_fromstring(s).text_content())
        else:
            return s
            
    except lxml.etree.ParserError as e:
        return s

df = df.withColumn('body',detect_html(col('body')))

Doing an inner join may not be very scalable, I assumed the mapping would be constant and fixed. I would have liked to have the mapping in a dictionary which would allow constant lookup time. 

In [23]:
df_soc5  = df.join(map_onet, on='onet', how='inner')

In [29]:
df_soc5.show()

+----------+--------------------+-------------------+----------+----------+-----+--------------------+-------+
|      onet|                body|               city|   expired|    posted|state|               title|   soc5|
+----------+--------------------+-------------------+----------+----------+-----+--------------------+-------+
|11-9199.00|Project Manager -...|             Laurel|2017-01-06|2016-12-07|   MD|Project Manager -...|11-9199|
|13-1023.00|You can become an...|            Fairfax|2015-12-30|2015-11-30|   VA|Lease Purchase Dr...|13-1023|
|19-3051.00|POSITION SUMMARY/...|      Ellicott City|2017-01-02|2016-12-03|   MD|Planning Supervis...|19-3051|
|53-3032.00|Speak with a Recr...|            Jarales|2017-01-18|2016-12-19|   NM|Experienced Class...|53-3032|
|11-3061.00|JOB SUMMARYThe Bu...|             Laredo|2017-01-02|2016-12-03|   TX|Buying Manager - ...|11-3061|
|43-3021.02|Position Descript...|         Norristown|2017-03-23|2016-12-23|   PA|Senior Billing Re...|43-3021|
|

I noticed the soc2 code could be derived from the first two numbers in the soc5. I am assuming the there's a limit of 99-xxxx since this would break if it goes beyond 3.

In [25]:
def find_soc(c):
  return c[:2] + '-0000'

soc2 = udf(lambda x: find_soc(x), StringType())
fin = df_soc5.withColumn('soc2', soc2(df_soc5.soc5) )

In [30]:
soc_heir.show()

+-------+-------+-----+--------------------+
|  child| parent|level|                name|
+-------+-------+-----+--------------------+
|00-0000|00-0000|    1|Total, All Occupa...|
|11-0000|00-0000|    2|Management Occupa...|
|11-1000|11-0000|    3|      Top Executives|
|11-1010|11-1000|    4|    Chief Executives|
|11-1011|11-1010|    5|    Chief Executives|
|11-1020|11-1000|    4|General and Opera...|
|11-1021|11-1020|    5|General and Opera...|
|11-1030|11-1000|    4|         Legislators|
|11-1031|11-1030|    5|         Legislators|
|11-2000|11-0000|    3|Advertising, Mark...|
|11-2010|11-2000|    4|Advertising and P...|
|11-2011|11-2010|    5|Advertising and P...|
|11-2020|11-2000|    4|Marketing and Sal...|
|11-2021|11-2020|    5|  Marketing Managers|
|11-2022|11-2020|    5|      Sales Managers|
|11-2030|11-2000|    4|Public Relations ...|
|11-2031|11-2030|    5|Public Relations ...|
|11-3000|11-0000|    3|Operations Specia...|
|11-3010|11-3000|    4|Administrative Se...|
|11-3011|1

In [118]:
s5 = soc_heir.filter(col("level") == 5).select("child", "parent").alias('5')
s5.show()

+-------+-------+
|  child| parent|
+-------+-------+
|11-1011|11-1010|
|11-1021|11-1020|
|11-1031|11-1030|
|11-2011|11-2010|
|11-2021|11-2020|
|11-2022|11-2020|
|11-2031|11-2030|
|11-3011|11-3010|
|11-3021|11-3020|
|11-3031|11-3030|
|11-3051|11-3050|
|11-3061|11-3060|
|11-3071|11-3070|
|11-3111|11-3110|
|11-3121|11-3120|
|11-3131|11-3130|
|11-9013|11-9010|
|11-9021|11-9020|
|11-9031|11-9030|
|11-9032|11-9030|
+-------+-------+
only showing top 20 rows



In [119]:
s4 = soc_heir.filter(col("level") == 4).select("child", "parent").alias('4')
s4.show()

+-------+-------+
|  child| parent|
+-------+-------+
|11-1010|11-1000|
|11-1020|11-1000|
|11-1030|11-1000|
|11-2010|11-2000|
|11-2020|11-2000|
|11-2030|11-2000|
|11-3010|11-3000|
|11-3020|11-3000|
|11-3030|11-3000|
|11-3050|11-3000|
|11-3060|11-3000|
|11-3070|11-3000|
|11-3110|11-3000|
|11-3120|11-3000|
|11-3130|11-3000|
|11-9010|11-9000|
|11-9020|11-9000|
|11-9030|11-9000|
|11-9040|11-9000|
|11-9050|11-9000|
+-------+-------+
only showing top 20 rows



In [120]:
s3 = soc_heir.filter(col("level") == 3).select("child", "parent").alias('3')
s3.show()

+-------+-------+
|  child| parent|
+-------+-------+
|11-1000|11-0000|
|11-2000|11-0000|
|11-3000|11-0000|
|11-9000|11-0000|
|13-1000|13-0000|
|13-2000|13-0000|
|15-1100|15-0000|
|15-2000|15-0000|
|17-1000|17-0000|
|17-2000|17-0000|
|17-3000|17-0000|
|19-1000|19-0000|
|19-2000|19-0000|
|19-3000|19-0000|
|19-4000|19-0000|
|21-1000|21-0000|
|21-2000|21-0000|
|23-1000|23-0000|
|23-2000|23-0000|
|25-1000|25-0000|
+-------+-------+
only showing top 20 rows



In [159]:
s5.registerTempTable("s5")
s4.registerTempTable("s4")

tmp = spark.sql("select s5.child, s4.parent from s5  LEFT JOIN s4 ON s5.parent = s4.child")

In [162]:
s3.registerTempTable("s3")
tmp.registerTempTable("tmp")

soc2_map = spark.sql("select tmp.child, s3.parent from tmp  LEFT JOIN s3 ON tmp.parent = s3.child")

In [168]:
df_soc5.join(soc2_map, df_soc5.soc5 == soc2_map.child, how='inner').groupBy(soc2_map.parent).count().show(truncate = False)

+-------+-----+
|parent |count|
+-------+-----+
|47-0000|326  |
|11-0000|3940 |
|21-0000|542  |
|45-0000|13   |
|15-0000|3603 |
|25-0000|609  |
|17-0000|981  |
|51-0000|700  |
|53-0000|9529 |
|49-0000|1246 |
|43-0000|4379 |
|27-0000|717  |
|29-0000|6960 |
|13-0000|2233 |
|37-0000|459  |
|55-0000|22   |
|23-0000|145  |
|31-0000|940  |
|39-0000|532  |
|19-0000|463  |
+-------+-----+
only showing top 20 rows



## Count of documents for each soc2

In [26]:
fin.groupBy('soc2').count().show(truncate = False)

+-------+-----+
|soc2   |count|
+-------+-----+
|47-0000|327  |
|11-0000|3940 |
|21-0000|586  |
|45-0000|13   |
|15-0000|3603 |
|25-0000|660  |
|17-0000|981  |
|53-0000|9711 |
|51-0000|800  |
|43-0000|4379 |
|49-0000|1246 |
|27-0000|717  |
|29-0000|7092 |
|13-0000|2426 |
|37-0000|459  |
|55-0000|22   |
|23-0000|145  |
|31-0000|940  |
|19-0000|463  |
|39-0000|542  |
+-------+-----+
only showing top 20 rows



## Total number of postings that were active on February 1st, 2017

In [27]:
str(fin.where(( to_date(col("expired")) >= lit("2017-02-01"))).count())

'18590'

Could probably skip converting this to Pandas before saving it to the Sqlite database. Sqlite would not scale well for bigger datasets.

In [28]:
engine = create_engine('sqlite:///test.db', echo=False)
result_pdf = fin.select("*").toPandas()
result_pdf.to_sql('results', con=engine, if_exists='replace')

In [None]:
#url = "jdbc:sqlite:test.db"
#fin.write.jdbc(url=url, table="new_db", mode="overwrite", properties={"driver":"org.sqlite.JDBC"})