In [19]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
import os
import logging
import configparser
from pyspark.sql.functions import *

In [6]:
spark = SparkSession.builder \
    .appName("appGlob") \
    .config("spark.jars","../controller/postgresql-42.7.2.jar")\
    .config('spark.driver.extraClassPath', '../controller/postgresql-42.7.2.jar') \
    .getOrCreate()

In [7]:
config = configparser.ConfigParser()
config.read("../properties.conf")
database_config = config["database"]

In [59]:
#LOAD_FILE
filePath = "../inputs/"
fileEmp = "hired_employees.csv"
fileDept = "departments.csv"
fileJobs = "jobs.csv"
# Define the schema
emp_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("id_job", IntegerType(), True),
    StructField("id_department", IntegerType(), True)
])

jb_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("job", StringType(), True)
])

dp_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("department", StringType(), True)
])


In [60]:
try: 
    df_employee = spark.read.csv(filePath+fileEmp,header=False, inferSchema=False, schema=emp_schema ,sep=",")
    df_job = spark.read.csv(filePath+fileJobs,header=False, inferSchema=False, schema=jb_schema ,sep=",")
    df_dept = spark.read.csv(filePath+fileDept,header=False, inferSchema=False, schema=dp_schema ,sep=",")
except Exception as e:
    logger.error("Error: %s", str(e), exc_info=True)
    df = spark.createDataFrame([], schema=schema)

In [122]:
df_result2 = df_employee.alias("a").join(df_job.alias("b"),df_employee["id_job"] == col("b.id"), "left").select(col("a.id").alias("employee_id"),\
        col("a.name").alias("name"), \
        col("a.date").alias("date"), \
        col("b.job").alias("job"),\
        col("a.id_department").alias("id_department"))

df_result2 = df_result2.alias("a").join(df_dept.alias("b"),df_result2["id_department"] == df_dept["id"], "left").select(col("a.employee_id").alias("employee_id"),\
        col("a.name").alias("name"), \
        col("a.date").alias("date"), \
        col("a.job").alias("job"),\
        col("b.department").alias("department"))

In [142]:
quart1 = when(month("date").between(1, 3) , 1).otherwise(0).alias("Q1")
quart2 = when(month("date").between(4, 6) , 1).otherwise(0).alias("Q2")
quart3 = when(month("date").between(7, 9) , 1).otherwise(0).alias("Q3")
quart4 = when(month("date").between(10, 12) , 1).otherwise(0).alias("Q4")
#total_q2 = df.agg(sum("Q2").alias("total_Q1")).collect()[0]["total_Q2"]
#total_q3 = df.agg(sum("Q3").alias("total_Q1")).collect()[0]["total_Q3"]
#total_q4 = df.agg(sum("Q4").alias("total_Q1")).collect()[0]["total_Q4"]

df_resultQuarter = df_result2.select(col("*"),quart1,quart2,quart3,quart4)
df_resultQuarter1 = df_resultQuarter.groupBy("department","job", "Q1").agg(sum("Q1").alias("total_Q1")).select("department","job", col("total_Q1").alias("Q1"),lit(0).alias("Q2"),lit(0).alias("Q3"),lit(0).alias("Q4"))
df_resultQuarter2 = df_resultQuarter.groupBy("department","job", "Q2").agg(sum("Q2").alias("total_Q2")).select("department","job",lit(0).alias("Q1"),col("total_Q2").alias("Q2"),lit(0).alias("Q3"),lit(0).alias("Q4"))
df_resultQuarter3 = df_resultQuarter.groupBy("department","job", "Q3").agg(sum("Q3").alias("total_Q3")).select("department","job",lit(0).alias("Q1"),lit(0).alias("Q2"),col("total_Q3").alias("Q3"),lit(0).alias("Q4"))
df_resultQuarter4 = df_resultQuarter.groupBy("department","job", "Q4").agg(sum("Q4").alias("total_Q4")).select("department","job",lit(0).alias("Q1"),lit(0).alias("Q2"),lit(0).alias("Q3"),col("total_Q4").alias("Q4"))

df_union = df_resultQuarter1.union(df_resultQuarter2).union(df_resultQuarter3).union(df_resultQuarter4)
df_union = df_union.orderBy("department","job")



In [148]:
df_union.filter(col("department").isNotNull() & col("job").isNotNull()).show(truncate= False)

+-----------+---------------------------+---+---+---+---+
|department |job                        |Q1 |Q2 |Q3 |Q4 |
+-----------+---------------------------+---+---+---+---+
|Accounting |VP Marketing               |1  |0  |0  |0  |
|Accounting |VP Marketing               |0  |0  |0  |0  |
|Accounting |VP Marketing               |0  |0  |0  |0  |
|Accounting |VP Marketing               |0  |0  |0  |0  |
|Accounting |VP Sales                   |0  |0  |0  |0  |
|Accounting |VP Sales                   |0  |0  |1  |0  |
|Accounting |VP Sales                   |0  |0  |0  |0  |
|Accounting |VP Sales                   |0  |0  |0  |0  |
|Engineering|Account Representative II  |0  |0  |0  |0  |
|Engineering|Account Representative II  |0  |0  |1  |0  |
|Engineering|Account Representative II  |0  |0  |0  |0  |
|Engineering|Account Representative II  |0  |0  |0  |0  |
|Engineering|Environmental Specialist   |0  |1  |0  |0  |
|Engineering|Environmental Specialist   |0  |0  |0  |0  |
|Engineering|E

In [46]:
df_dept.filter(col("id") == 8).show()

+---+----------+
| id|department|
+---+----------+
|  8|   Support|
+---+----------+

