In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch, helpers

In [2]:
def salary_check(salary: str):
    if salary is None or salary.strip() == "Cạnh tranh" or salary.strip() == "":
        return ""
    salary = salary.replace("Trên", "").replace("Dưới", "").replace("Tr", "").strip()
    values = salary.split()
    if len(values) == 2:
        if values[1] == "VND":
            return values[0].replace(',', '.')
        if values[1] == "USD":
            return float(values[0].replace(",", "")) * 23182 // 1000000
    if len(values) == 4:
        if values[3] == "VND":
            return f"{values[0].replace(',', '.')} - {values[2].replace(',', '.')}"
        if values[3] == "USD":
            return f"{float(values[0].replace(',', '')) * 23182 // 1000000} - {float(values[2].replace(',', '')) * 23182 // 1000000}"


def degree_check(bc):
    if bc is None or bc == "" or bc.strip() == "Khác":
        return "Không yêu cầu"
    return bc.strip()


def experience_check(kn: str):
    if kn is None or kn == "" or kn.strip() == "Chưa có kinh nghiệm" or kn.strip() == "0 - 0 Năm":
        return "Không yêu cầu"
    kns = kn.replace("Năm", "").strip().split()
    if len(kns) == 2:
        return kns[0]
    if kns[0] >= kns[2]:
        return kns[0]
    else:
        return f"{kns[0]} - {kns[2]}"


def sex_check(sex: str):
    if sex is None or sex == "":
        return "Không yêu cầu"
    return sex.strip()

In [3]:
spark = SparkSession.builder.config("spark.sql.debug.maxToStringFields", 100000).getOrCreate()
df = spark.read.json("careerbuilder-raw.json").toPandas()
df["update_time"] = pd.to_datetime(df["update_time"], format="%d/%m/%Y").dt.strftime("%Y-%m-%d")
df["sex"] = df["sex"].apply(sex_check)
df = df[df["company_name"].str.strip() != ""]
df["salary"] = df["salary"].apply(salary_check)
df["degree"] = df["degree"].apply(degree_check)
df["experience"] = df["experience"].apply(experience_check)
df = spark.createDataFrame(df.astype(str)).toPandas()

22/07/03 12:38:00 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlp7s0)
22/07/03 12:38:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/03 12:38:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [4]:
es = Elasticsearch("http://172.18.0.19:9200")


def generator():
    documents = []
    for i in range(0, 5926):
        document = {
            "id_": i,
            "age": df.iloc[i: i + 1].age, 
            "company_name": df.iloc[i: i + 1].company_name, 
            "degree": df.iloc[i: i + 1].degree, 
            "experience": df.iloc[i: i + 1].experience, 
            "job_field": df.iloc[i: i + 1].job_field, 
            "job_name": df.iloc[i: i + 1].job_name, 
            "level": df.iloc[i: i + 1].level, 
            "location": df.iloc[i: i + 1].location, 
            "salary": df.iloc[i: i + 1].salary, 
            "sex": df.iloc[i: i + 1].sex, 
            "update_time": df.iloc[i: i + 1].update_time, 
            "working_form": df.iloc[i: i + 1].working_form, 
        }
        documents.append(document)
    helpers.bulk(es, documents, index="job-careerbuilder")

In [5]:
generator()