In [3]:
from pyspark.sql import SparkSession

# Crea una sesiÃ³n de Spark
spark = SparkSession.builder.appName("LecturaCSV") \
    .config("spark.driver.extraClassPath", "postgresql-42.6.0.jar") \
    .getOrCreate()

# Lee el archivo CSV y carga los datos en un DataFrame tiene mas de 12 
df_jobs = spark.read.csv("jobs.csv", header=False, inferSchema=True)
nombres_columnas = ["job_id", "job_name"]
df_jobs = df_jobs.toDF(*nombres_columnas)

# Lee el archivo CSV y carga los datos en un DataFrame
df_hired_employees = spark.read.csv("hired_employees.csv", header=False, inferSchema=True)
nombres_columnas = ["employe_id", "employe_name", "employe_hired_time", "employe_deparment_id", "employe_jobs_id"]
df_hired_employees = df_hired_employees.toDF(*nombres_columnas)


# Lee el archivo CSV y carga los datos en un DataFrame tiene menos de 12 
df_departments = spark.read.csv("departments.csv", header=True, inferSchema=True)
nombres_columnas = ["department_id", "department_name"]
df_departments = df_departments.toDF(*nombres_columnas)

df_hired_employees_final = df_hired_employees \
    .join(df_jobs, df_jobs["job_id"] == df_hired_employees["employe_jobs_id"], how="inner") \
    .join(df_departments, df_departments["department_id"] == df_hired_employees["employe_deparment_id"], how="inner") \
    .createOrReplaceTempView("df_hired_employees")


jdbc_url = "jdbc:postgresql://localhost:5432/globant"
propiedades = {
    "user": "usuario",
    "password": "contrasena"
}

df_hired_employees_final.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "public.hired_employees") \
    .option("user", propiedades["user"]) \
    .option("password", propiedades["password"]) \
    .option("createTableOptions", "TABLESPACE = public") \
    .mode("overwrite") \
    .save()

In [4]:
hired_department_job_sql = """
SELECT department_name,
       job_name,
       COALESCE(Q1,0) AS Q1,
       COALESCE(Q2,0) AS Q2,
       COALESCE(Q3,0) AS Q3,
       COALESCE(Q4,0) AS Q4
FROM (
    SELECT 
    department_name,  
    job_name,
    CASE 
        WHEN
            MONTH(employe_hired_time) IN (1,2,3)
            THEN 'Q1'
        WHEN 
            MONTH(employe_hired_time) IN (4,5,6)
            THEN 'Q2'
        WHEN 
            MONTH(employe_hired_time) IN (7,8,9)
            THEN 'Q3'
        ELSE 'Q4' END AS quarter,
    COUNT(*) hired_count
    FROM df_hired_employees
    WHERE 
        YEAR(employe_hired_time) == 2021
    GROUP BY 
        department_name, job_name, MONTH(employe_hired_time)
    ORDER BY department_name, job_name
) AS SourceTable
PIVOT (
    SUM(hired_count)
    FOR quarter IN ('Q1', 'Q2','Q3','Q4')
)
                """

resultado = spark.sql(hired_department_job_sql)

resultado.show()

+--------------------+--------------------+---+---+---+---+
|     department_name|            job_name| Q1| Q2| Q3| Q4|
+--------------------+--------------------+---+---+---+---+
|           Marketing|               Nurse|  0|  1|  0|  0|
|Research and Deve...|        Geologist IV|  0|  0|  1|  0|
|Business Development|        Accountant I|  0|  0|  1|  0|
|               Sales|             Actuary|  0|  1|  0|  0|
|     Human Resources|        Food Chemist|  0|  0|  0|  1|
|Research and Deve...|        Food Chemist|  0|  0|  0|  1|
|           Marketing|Sales Representative|  0|  2|  0|  1|
|            Services|Occupational Ther...|  0|  1|  1|  0|
|         Engineering|     Project Manager|  0|  2|  1|  0|
|Business Development|     Legal Assistant|  0|  0|  1|  1|
|            Training|             Actuary|  0|  0|  1|  0|
|         Engineering|Software Test Eng...|  0|  1|  0|  1|
|          Accounting|      Programmer III|  0|  0|  1|  0|
|Business Development|Occupational Ther.

In [5]:
hired_department_job_avg_sql = """
WITH MoreContracterCount AS (
    SELECT
        department_id,
        department_name, 
        COUNT(*) AS count_by_deparment
    FROM df_hired_employees
    WHERE 
        YEAR(employe_hired_time) == 2021
    GROUP BY 
        department_id, department_name
), MoreContracterFilter AS (
    SELECT * 
    FROM MoreContracterCount
    WHERE count_by_deparment > (SELECT AVG(count_by_deparment) FROM MoreContracterCount)
    ORDER BY count_by_deparment DESC
)

SELECT *
FROM MoreContracterFilter
                """

resultado = spark.sql(hired_department_job_avg_sql)

resultado.show()

+-------------+--------------------+------------------+
|department_id|     department_name|count_by_deparment|
+-------------+--------------------+------------------+
|            8|             Support|               217|
|            5|         Engineering|               207|
|            6|     Human Resources|               204|
|            7|            Services|               202|
|            4|Business Development|               187|
|            3|Research and Deve...|               149|
+-------------+--------------------+------------------+

