# Imports and Initial installations

In [1]:
!apt-get -y install mysql-server
!pip install mysql-connector-python
!service mysql start
!mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH 'mysql_native_password' BY 'root';FLUSH PRIVILEGES;"

import mysql.connector
from google.colab import drive
import json
from datetime import datetime
from dateutil import parser
import time

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libcgi-fast-perl libcgi-pm-perl libclone-perl libencode-locale-perl libfcgi-bin libfcgi-perl
  libfcgi0ldbl libhtml-parser-perl libhtml-tagset-perl libhtml-template-perl libhttp-date-perl
  libhttp-message-perl libio-html-perl liblwp-mediatypes-perl libmecab2 libprotobuf-lite23
  liburi-perl mecab-ipadic mecab-ipadic-utf8 mecab-utils mysql-client-8.0 mysql-client-core-8.0
  mysql-server-8.0 mysql-server-core-8.0
Suggested packages:
  libdata-dump-perl libipc-sharedcache-perl libbusiness-isbn-perl libwww-perl mailx tinyca
The following NEW packages will be installed:
  libcgi-fast-perl libcgi-pm-perl libclone-perl libencode-locale-perl libfcgi-bin libfcgi-perl
  libfcgi0ldbl libhtml-parser-perl libhtml-tagset-perl libhtml-template-perl libhttp-date-perl
  libhttp-message-perl libio-html-perl liblwp-mediatypes-perl libmecab2 libprotobuf-l

# Establish a MySQL Connection and create database and table

In [2]:
conn = mysql.connector.connect(user='root', password='root', host='localhost')

cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS BigDataProject")
cursor.execute("USE BigDataProject")

cursor.execute("CREATE TABLE JobPostings (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, job_title varchar(200) null, job_description text(6000), location varchar(200) null, date_created date null)")



# Insert Job Records into RDBMS (MySQL) and run query to fetch them from databse

In [3]:
drive.mount('/content/drive/', force_remount=True)
path = '/content/drive/Shared drives/F23_CS226_DataWarehouse_18/International_Job_PostingsBig_2021/techmap-jobs-dump-2021-09.json'

start = time.time()
insert_string = """INSERT INTO JobPostings(job_title, job_description, location, date_created) VALUES (%s, %s, %s, %s)"""
with open(path,'r') as json_file:
    for line in json_file:
      jsn = json.loads(line)
      if ((jsn["name"] is not None and len(jsn["name"]) < 100) and (jsn["text"] is not None and len(jsn["text"]) < 6000) and
        (jsn["orgAddress"] is not None and("city" in jsn["orgAddress"]) and("state" in jsn["orgAddress"]) and len(jsn["orgAddress"]["city"]) < 200 and len(jsn["orgAddress"]["state"]) < 20)):

        vals = (jsn["name"], jsn["text"], jsn["orgAddress"]["city"] + ", " + jsn["orgAddress"]["state"], parser.parse(jsn["dateUploaded"]["$date"], fuzzy = True).date())
        cursor.execute(insert_string, vals)

conn.commit()

query_string = """SELECT COUNT(*) AS JobCount,
    (CASE
        WHEN job_title LIKE "%Software%" THEN 'Software' WHEN job_title LIKE "%Data%" THEN 'Data' WHEN job_title LIKE "%Machine Learning%" THEN 'Machine Learning'
        WHEN job_title LIKE "%Artificial Intelligence%" THEN 'Artificial Intelligence' WHEN job_title LIKE "% AI %" THEN 'Artificial Intelligence'
        ELSE 'Other'
    END) AS JobCategory,
    (CASE
        WHEN date_created LIKE "%2021%" THEN '2021' WHEN date_created LIKE "%2023%" THEN '2023'
        ELSE '2022'
    END) AS Year
FROM
    JobPostings
WHERE
    (job_title LIKE "%Software%" OR job_title LIKE "%Data%" OR job_title LIKE "%Machine Learning%" OR
    job_title LIKE "%Artificial Intelligence%" OR job_title LIKE "% AI %")
GROUP BY
  JobCategory, Year
ORDER BY
    JobCount DESC;"""

cursor.execute(query_string)

end = time.time()
print(f"Time to insert records into MySQL and execute query : {(end - start):.2f} seconds")

for rec in cursor.fetchall():
  print(rec)

Mounted at /content/drive/
Time to insert records into MySQL and execute query : 3663.81 seconds
(39148, 'Software', '2021')
(33969, 'Data', '2021')
(1277, 'Machine Learning', '2021')
(309, 'Artificial Intelligence', '2021')


# PySpark Dependencies Installation and Imports

In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=94d2b88fbd713808246c42b1af20473f56d2a0d0c44ed64be439c3bfffe4b4f8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
from google.colab import drive
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, ArrayType
import time
from pyspark.sql.functions import col, date_format, concat_ws

spark = SparkSession.builder.appName("JSONRead").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","12g").config("spark.sql.adaptive.enabled", "true").config("spark.sql.adaptive.coalescePartitions.enabled", "true").getOrCreate()

# Insert Job Records into PySpark DataFrame and query using SparkSQL

In [6]:
drive.mount('/content/drive/', force_remount=True)
path = '/content/drive/Shared drives/F23_CS226_DataWarehouse_18/International_Job_PostingsBig_2021/techmap-jobs-dump-2021-09.json'

schema = StructType([StructField('dateUploaded', StructType([StructField('$date', StringType(), True)]), True),
                     StructField('name', StringType(), True),
                     StructField('orgAddress', StructType([StructField('addressLine', StringType(), True), StructField('city', StringType(), True), StructField('companyName', StringType(), True), StructField('country', StringType(), True), StructField('countryCode', StringType(), True), StructField('county', StringType(), True), StructField('district', StringType(), True), StructField('formatted', StringType(), True), StructField('houseNumber', StringType(), True), StructField('level', LongType(), True), StructField('quarter', StringType(), True), StructField('state', StringType(), True), StructField('street', StringType(), True)]), True),
                     StructField('text', StringType(), True)])

start = time.time()
df_json = spark.read.option("mode", "DROPMALFORMED").schema(schema).json(path)

df_json = df_json.withColumn("dateUploaded", col("dateUploaded.$date"))
df_json = df_json.withColumn("dateUploaded", date_format("dateUploaded", "yyyy-MM-dd"))
df_json = df_json.withColumn("orgAddress", concat_ws(', ', col("orgAddress.city"), col("orgAddress.state")))

df_json = df_json.withColumnRenamed("name", "job_title").withColumnRenamed("dateUploaded", "date_posted").withColumnRenamed("orgAddress", "job_location",)

df_json.createOrReplaceTempView('JobData')
CategoryBased_df = spark.sql(
'''
SELECT
    COUNT(*) AS JobCount,
    (CASE
        WHEN job_title LIKE "%Software%" THEN 'Software'
        WHEN job_title LIKE "%Data%" THEN 'Data'
        WHEN job_title LIKE "%Machine Learning%" THEN 'Machine Learning'
        WHEN job_title LIKE "%Artificial Intelligence%" THEN 'Artificial Intelligence'
        WHEN job_title LIKE "% AI %" THEN 'Artificial Intelligence'
        ELSE 'Other' -- You can customize this for other cases
    END) AS JobCategory,

    (CASE
        WHEN date_posted LIKE "%2021%" THEN '2021'
        WHEN date_posted LIKE "%2023%" THEN '2023'
        ELSE '2022'
    END) AS Year
FROM
    JobData
WHERE
    (job_title LIKE "%Software%" OR
    job_title LIKE "%Data%" OR
    job_title LIKE "%Machine Learning%" OR
    job_title LIKE "%Artificial Intelligence%" OR
    job_title LIKE "% AI %")
GROUP BY
  JobCategory, Year
ORDER BY
    JobCount DESC;

'''
)

df_final = CategoryBased_df.collect()
end = time.time()
print(f"Time to query: {end - start:.2f} seconds")

display(df_final)

Mounted at /content/drive/
Time to query: 708.13 seconds


[Row(JobCount=49848, JobCategory='Software', Year='2021'),
 Row(JobCount=39385, JobCategory='Data', Year='2021'),
 Row(JobCount=1518, JobCategory='Machine Learning', Year='2021'),
 Row(JobCount=391, JobCategory='Artificial Intelligence', Year='2021')]

# Create a temp file from original dataset containing sub-set of job postings to insert into Pandas DataFrame

In [2]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)
path = '/content/drive/Shared drives/F23_CS226_DataWarehouse_18/International_Job_PostingsBig_2021/techmap-jobs-dump-2021-09.json'

count = 0
with open(path,'r') as firstfile, open('Sample.json','a') as secondfile:

    # read content from first file
    for line in firstfile:
             if(count > 451830):
                break;
             # append content to second file
             secondfile.write(line)
             count = count + 1

Mounted at /content/drive/


# Insert sub-set of job postings into Pandas DataFrame for performance comparison since Pandas DataFrame cannot scale beyond a certain dataset size.

In [None]:
import pandas as pd

pd_df = pd.read_json(path_or_buf="Sample.json", orient='column', lines=True)

The output of the above cell is not generated because Pandas DataFrame processing crashes after exceeding the memory limit.


#Spark RDD Code. Slower than SparkSQL.

df_coll = df_json.filter("name LIKE '%Software Developer%' OR name LIKE '%Data Engineer%' OR name LIKE '%Data Analyst%' OR name LIKE '%Data Scientist%'").groupBy("orgAddress").count()
df_coll.collect()
df_coll.show(2000)