In [1]:
import requests
import json


In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

sc = SparkContext()

spark = SparkSession(sc).builder.appName("Test Session").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/21 19:26:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/21 19:26:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
sql_command = """
    SELECT
        scraped_at,
        company_name,
        job_title,
        location,
        job_page_url
    FROM jobs
    ORDER BY job_id DESC
    ;"""

api_url = "http://192.168.0.29:5000/execute_sql"
response = requests.post(api_url, json={"sql_command": sql_command})

if response.status_code == 200:
    data = response.json()
else:
    print(f"Failed to fetch data: {response.status_code}, {response.text}")

In [4]:
schema = StructType([
   StructField("scraped_at", TimestampType(), False),
   StructField("company_name", StringType(), False),
   StructField("job_title", StringType(), False),
   StructField("location", StringType(), False),
   StructField("job_page_url", StringType(), False)
])
df = spark.createDataFrame(data, schema)
df.show(5)

PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `TimestampType()` can not accept object `2024-06-21 17:34:12.853887` in type `str`.

In [9]:
from pyspark.sql.functions import col, to_timestamp

columns = [
    "scraped_at_str",
    "company_name",
    "job_title",
    "location",
    "job_page_url"
]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Convert string to timestamp
df = df.withColumn("scraped_at", to_timestamp(col("scraped_at_str")))

In [10]:
new_columns = [
    "scraped_at",
    "company_name",
    "job_title",
    "location",
    "job_page_url"
]
df.select(new_columns).show(5)

+--------------------+--------------+--------------------+--------------------+--------------------+
|          scraped_at|  company_name|           job_title|            location|        job_page_url|
+--------------------+--------------+--------------------+--------------------+--------------------+
|2024-06-21 17:34:...|         Glean|Platform Security...|       Palo Alto, CA|https://boards.gr...|
|2024-06-21 17:33:...|Fortune Brands|People Services S...|25300 Al Moen Dri...|https://jobs.smar...|
|2024-06-21 17:31:...|      Experian|Data Acquisition ...|955 American Lane...|https://jobs.smar...|
|2024-06-21 17:31:...|      Experian|Procurement Contr...|475 Anton Blvd, C...|https://jobs.smar...|
|2024-06-21 17:31:...|      Experian|Revenue Cycle Sol...|., ., ., United S...|https://jobs.smar...|
+--------------------+--------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
filtered_df = df.where(df["company_name"] == 'Experian')

# Print the resulting data frame
filtered_df.show()

+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|      scraped_at_str|company_name|           job_title|            location|        job_page_url|          scraped_at|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|2024-06-21 17:31:...|    Experian|Data Acquisition ...|955 American Lane...|https://jobs.smar...|2024-06-21 17:31:...|
|2024-06-21 17:31:...|    Experian|Procurement Contr...|475 Anton Blvd, C...|https://jobs.smar...|2024-06-21 17:31:...|
|2024-06-21 17:31:...|    Experian|Revenue Cycle Sol...|., ., ., United S...|https://jobs.smar...|2024-06-21 17:31:...|
|2024-06-21 17:31:...|    Experian|Revenue Cycle Sol...|., ., ., United S...|https://jobs.smar...|2024-06-21 17:31:...|
|2024-06-21 15:35:...|    Experian|Analista de Desen...|Avenida das Naçõe...|https://jobs.smar...|2024-06-21 15:35:...|
|2024-06-21 15:35:...|    Experian|Human

In [13]:
filtered_df.count()

24/06/21 19:34:40 WARN TaskSetManager: Stage 5 contains a task of very large size (1236 KiB). The maximum recommended task size is 1000 KiB.


508

In [16]:
grouped_df = df.groupBy("company_name").count().orderBy("count", ascending=False)
grouped_df.show()

24/06/21 19:36:56 WARN TaskSetManager: Stage 14 contains a task of very large size (1236 KiB). The maximum recommended task size is 1000 KiB.


+-------------------+-----+
|       company_name|count|
+-------------------+-----+
|        Bosch Group| 5762|
|    Publicis Groupe| 4438|
|                   | 1170|
|       Sopra Steria| 1153|
|         ServiceNow| 1046|
|               Visa|  964|
|                ITW|  591|
| Palo Alto Networks|  522|
|         Epic Games|  520|
|   Sonic Automotive|  514|
|           Experian|  508|
|                OKX|  488|
|    Western Digital|  487|
|       NBCUniversal|  476|
|WNS Global Services|  465|
|              Capco|  429|
|               Hibu|  409|
|          JD Sports|  384|
|              Block|  353|
|            Celonis|  330|
+-------------------+-----+
only showing top 20 rows



In [17]:
grouped_df = df.groupBy("job_title").count().orderBy("count", ascending=False)
grouped_df.show()

24/06/21 19:37:28 WARN TaskSetManager: Stage 17 contains a task of very large size (1236 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+-----+
|           job_title|count|
+--------------------+-----+
|                    | 1170|
|Outside Sales Rep...|  401|
|     Sales Assistant|  144|
|   Account Executive|  122|
|Senior Customer R...|  116|
|Educational Sales...|  105|
|Senior Software E...|   68|
|   Software Engineer|   54|
|        Art Director|   49|
|     Project Manager|   49|
|  Dynamic PC Support|   48|
|Staff Software En...|   46|
|Customer Service ...|   45|
|Strategic Develop...|   44|
|     Account Manager|   44|
|  Service Technician|   43|
|Sales Development...|   37|
|          Copywriter|   36|
|Business Developm...|   35|
|Outside Sales Acc...|   34|
+--------------------+-----+
only showing top 20 rows



In [None]:

# Stop the Spark session if no longer needed
spark.stop()