## Connection to the Azure SQL Database

Defined some variables to programmatically create the connection to the SQL Database.

In [None]:
jdbcUsername = "feuplogin"
jdbcPassword = "Logproject33"
jdbcHostname = "intranetfeupserver.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "intranet14"

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;".format(jdbcHostname, jdbcPort, jdbcDatabase)

connectionProperties = {
  "user": jdbcUsername,
  "password": jdbcPassword,
  "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

## Read and Transform Data

### Calendar Dimension

In [None]:
from pyspark.sql.functions import expr, sequence

# read initial and final dates from database
i_date = spark.read.jdbc(url=jdbcUrl, table="(SELECT TOP(1) [DataInicio] FROM [stg].[EXT_TBL_IMPUTACAO_DETALHE] ORDER BY [DataInicio] ASC) AS query", properties=connectionProperties)
f_date = spark.read.jdbc(url=jdbcUrl, table="(SELECT TOP(1) [DataFim] FROM [stg].[EXT_TBL_ORCAMENTO] ORDER BY [DataFim] DESC) AS query", properties=connectionProperties)

# extract initial and final dates as timestamps
initial_date = expr("to_timestamp('{}')".format(i_date.collect()[0][0]))
final_date = expr("to_timestamp('{}')".format(f_date.collect()[0][0]))

# generate sequence of timestamps with 1 month interval
timestamps_df = spark.range(1).select(sequence(initial_date, final_date, expr("interval 1 month")).alias("timestamps")).selectExpr("explode(timestamps) as timestamp")

# extract year, month and quarter from timestamp
timestamps_df = timestamps_df.withColumn("Ano", expr("year(timestamp)"))
timestamps_df = timestamps_df.withColumn("Mes", expr("month(timestamp)"))
timestamps_df = timestamps_df.withColumn("Trimestre_Num", expr("quarter(timestamp)"))

# create auxiliary columns
timestamps_df = timestamps_df.withColumn("Ano_Mes_Num", expr("concat_ws('-', Ano, LPAD(Mes, 2, '0'))"))
timestamps_df = timestamps_df.withColumn("Mes_Extenso", expr("CASE Mes WHEN 1 THEN 'Janeiro' WHEN 2 THEN 'Fevereiro' WHEN 3 THEN 'Março' WHEN 4 THEN 'Abril' WHEN 5 THEN 'Maio' WHEN 6 THEN 'Junho' WHEN 7 THEN 'Julho' WHEN 8 THEN 'Agosto' WHEN 9 THEN 'Setembro' WHEN 10 THEN 'Outubro' WHEN 11 THEN 'Novembro' WHEN 12 THEN 'Dezembro' END"))
timestamps_df = timestamps_df.withColumn("Mes_Abrev", expr("substring(Mes_Extenso, 1, 3)"))
timestamps_df = timestamps_df.withColumn("Mes_Abrev_E_Ano", expr("concat(Mes_Abrev, ' ', Ano)"))
timestamps_df = timestamps_df.withColumn("Mes_Extenso_E_Ano", expr("concat(Mes_Extenso, ' ', Ano)"))
timestamps_df = timestamps_df.withColumn("Trimestre", expr("concat(Ano, ' Trimestre ', Trimestre_Num)"))

# select only needed columns
result_df = timestamps_df.selectExpr("concat(Ano, LPAD(Mes, 2, '0')) as ID_Calendario", "Mes", "Mes_Abrev", "Mes_Extenso", "Ano", "Ano_Mes_Num", "Mes_Abrev_E_Ano", "Mes_Extenso_E_Ano", "Trimestre_Num", "Trimestre")

# insert data into database
result_df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_CALENDARIO]", mode="overwrite", properties=connectionProperties)

# for invalid data
table = [(-1, 0, '---', 'Não Definido', 0, '---', 'Não Definido', 'Não Definido', 0, 'Não Definido')]
df = spark.createDataFrame(table, ["ID_Calendario", "Mes", "Mes_Abrev", "Mes_Extenso", "Ano", "Ano_Mes_Num", "Mes_Abrev_E_Ano", "Mes_Extenso_E_Ano", "Trimestre_Num", "Trimestre"])
df.show()
df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_CALENDARIO]", mode="append", properties=connectionProperties)

### Productivity Classification Dimension

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

# Prepare Data
classifications = ((-1, "Inválido", "Inválida", -1), \
    (0, "[0%, 100%[", "Menos do Previsto", 5), \
    (1, "100%", "Como Previsto", 4), \
    (2, "]100%, 125%]", "Até 25% Horas Extra Consumidas", 3), \
    (3, "]125%, 150%]", "Até 50% Horas Extra Consumidas", 2), \
    (4, "]150%, 175%]", "Até 75% Horas Extra Consumidas", 1), \
    (5, "]175%, +∞]", "Mais de 75% Horas Extra Consumidas", 0) \
  )
columns = ["ID_Classificacao_Produtividade", "Intervalo", "Nota", "Nota_Num"]

# Create DataFrame
df = spark.createDataFrame(data = classifications, schema = columns)
df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_CLASSIFICACAO_PRODUTIVIDADE]", mode="overwrite", properties=connectionProperties)

### State Dimension

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

# Read distinct states from external table
states_df = spark.read.jdbc(url=jdbcUrl, table="(select distinct [Estado] from [stg].[EXT_TBL_PROJETOS]) AS query", properties=connectionProperties)

# Create data frame with 'Não Definido'
df = spark.createDataFrame([(-1, 'Não Definido')], ["ID_Estado", "Estado"])

# Union data frames
df = df.union(states_df.selectExpr("row_number() over (order by Estado) as ID_Estado", "Estado"))

# Write data frame to destination table
df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_ESTADO]", mode="overwrite", properties=connectionProperties)


### Profile Dimension

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, col, when, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

profiles_font = spark.read.jdbc(url=jdbcUrl, table="(select distinct [Perfil] from [stg].[EXT_TBL_ORCAMENTO]) AS query", properties=connectionProperties)

profiles = profiles_font.select("Perfil").distinct().rdd.flatMap(lambda x: x).collect()

sorted_profiles = sorted(profiles, key=lambda x: int(x[0:2]))

profiles_df = spark.createDataFrame(sorted_profiles, "string").withColumnRenamed("value", "Nome_Perfil")
profiles_df = profiles_df.withColumn("Num_Perfil", substring(col("Nome_Perfil"), 1, 2).cast("int"))
profiles_df = profiles_df.withColumn("Posicao_Perfil", when(profiles_df["Num_Perfil"] < 10, substring(col("Nome_Perfil"), 5, 50)).otherwise(substring(col("Nome_Perfil"), 6, 50)))
profiles_df = profiles_df.withColumn("ID_Perfil", (1 + row_number().over(Window.orderBy("Nome_Perfil"))))

profiles_df = profiles_df.select("ID_Perfil", "Nome_Perfil", "Num_Perfil", "Posicao_Perfil")

profiles_df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_PERFIL]", mode="overwrite", properties=connectionProperties)

df = spark.createDataFrame([(-1, 'Não definido', 0, 'Não definida')], ["ID_Perfil", "Nome_Perfil", "Num_Perfil", "Posicao_Perfil"])
df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_PERFIL]", mode="append", properties=connectionProperties)

### Employee Dimension

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

employees_font = spark.read.jdbc(url=jdbcUrl, table="(select distinct [Username] FROM [stg].[EXT_TBL_IMPUTACAO_DETALHE]) AS query", properties=connectionProperties).select("Username")
employees_font.orderBy('Username')

df = spark.createDataFrame([(-1, 'Não Definido')], ["ID_Funcionario", "Nome_Funcionario"])
df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_FUNCIONARIO]", mode="overwrite", properties=connectionProperties)

df = employees_font.withColumn("ID_Funcionario", monotonically_increasing_id() + 1)
df = df.selectExpr("ID_Funcionario", "Username as Nome_Funcionario")

df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_FUNCIONARIO]", mode="append", properties=connectionProperties)


### Task Dimension

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, concat_ws, substring, col, lit, when

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

tasks = spark.read.jdbc(url=jdbcUrl, table="(select [ID], [CodigoProjeto], [Username], [Tarefa], [NHoras] from [stg].[EXT_TBL_HORASPREVISTAS]) AS query", properties=connectionProperties)

tasks = tasks.withColumn("employee_number", substring(col("Username"), 12, 50))

project_task_hours = tasks.withColumn("Projeto_Funcionario_Horas", concat_ws(" ", col("CodigoProjeto"),
                                                                                  lit("-"),
                                                                                  lit("funcionário"),
                                                                                  col("employee_number"),
                                                                                  lit("-"),
                                                                                  concat(
                                                                                      col("NHoras").cast("int"),
                                                                                      when(col("NHoras").cast("int") == 1, lit(" hora")).otherwise(lit(" horas"))
                                                                                  )
                                                                            )
                                      )

tarefa_data = project_task_hours.selectExpr("ID as ID_Tarefa", "Tarefa as Nome_Tarefa", 
                                            "Projeto_Funcionario_Horas", "NHoras as Horas_Previstas_Tarefa")

tarefa_data.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_TAREFA]", mode="overwrite", properties=connectionProperties)

df = spark.createDataFrame([(-1, 'Não Definido', 'Não Definido', 0)], 
                           ["ID_Tarefa", "Nome_Tarefa", "Projeto_Funcionario_Horas", "Horas_Previstas_Tarefa"])

df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_TAREFA]", mode="append", properties=connectionProperties)

### Project Dimension

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, sum, lit, coalesce

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

projects = spark.read.jdbc(url=jdbcUrl, 
                            table="(select P.[CodigoProjeto], [Departamento], [Area], sum(H.[NHoras]) as [HorasPrevistas] \
                                   from [stg].[EXT_TBL_PROJETOS] as P \
                                   join [stg].[EXT_TBL_HORASPREVISTAS] as H on P.[CodigoProjeto] = H.[CodigoProjeto] \
                                   group by P.[CodigoProjeto], [Departamento],[Area]) AS query", 
                            properties=connectionProperties)

projects = projects.withColumn("ID_Projeto", monotonically_increasing_id()+1) \
             .withColumn("Codigo_Projeto", col("CodigoProjeto")) \
             .withColumn("Nome_Projeto", concat(lit("Projeto "), col("CodigoProjeto"))) \
             .withColumn("Departamento", coalesce(col("Departamento"), lit("Não Especificado"))) \
             .withColumn("Area", coalesce(col("Area"), lit("Não Especificada"))) \
             .withColumn("Horas_Previstas_Projeto", col("HorasPrevistas")) \
             .select("ID_Projeto", "Codigo_Projeto", "Nome_Projeto", "Departamento", "Area", "Horas_Previstas_Projeto")

projects.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_PROJETO]", mode="overwrite", properties=connectionProperties)

df = spark.createDataFrame([(-1, 'Não Especificado', 'Não Especificado', 'Não Especificado', 'Não Especificada', 0)], 
                           ["ID_Projeto", "Codigo_Projeto", "Nome_Projeto", "Departamento", "Area", "Horas_Previstas_Projeto"])

df.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[DIM_PROJETO]", mode="append", properties=connectionProperties)


### Task Fact

In [None]:
'''
  [0%, 100%[ - 5        - classification id 0
  100% - 4              - classification id 1
  ]100%, 125%] - 3      - classification id 2
  ]125%, 150%] - 2      - classification id 3
  ]150%, 175%] - 1      - classification id 4
  ]175%, +∞] - 0        - classification id 5
  '''
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum, lit, coalesce, monotonically_increasing_id

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

# Read data from database
imputations = spark.read.jdbc(url=jdbcUrl, table="(select [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID], sum([NHoras]) as Horas_Realizadas_Tarefa from [stg].[EXT_TBL_IMPUTACAO_DETALHE] group by [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID]) AS query", properties=connectionProperties)

imputations.show()

get_hours_performed = "(select [CodigoProjeto] as CodigoHoras, [FK_TarefaID] as TarefaIDHoras, sum([NHoras]) as Total_Horas_Realizadas_Tarefa_Query from [stg].[EXT_TBL_IMPUTACAO_DETALHE] group by [CodigoProjeto], [FK_TarefaID]) AS QUERY"

hours_performed = spark.read.jdbc(url=jdbcUrl, table=get_hours_performed, properties=connectionProperties)
imputations = imputations.join(hours_performed, (imputations.CodigoProjeto == hours_performed.CodigoHoras) & (imputations.FK_TarefaID == hours_performed.TarefaIDHoras), "left")
imputations = imputations.withColumn("Total_Horas_Realizadas_Tarefa", col("Total_Horas_Realizadas_Tarefa_Query")).drop("CodigoHoras", "UsernameHoras", "TarefaIDHoras", "Total_Horas_Realizadas_Tarefa_Query")

imputations.show()

# Generate ID column
imputations = imputations.withColumn("ID", monotonically_increasing_id())

imputations.show()

# Calculate Project ID
project = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Projeto] as IDProjetoID, [Codigo_Projeto] from [dwProdutividade].[DIM_PROJETO]) AS query", properties=connectionProperties)
imputations = imputations.join(project, imputations.CodigoProjeto == project.Codigo_Projeto, "left")
imputations = imputations.withColumn("ID_Projeto", when(col("IDProjetoID").isNull(), -1).otherwise(col("IDProjetoID"))).drop("IDProjetoID", "Codigo_Projeto")

imputations.show()

# Calculate Calendar ID
calendar = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Calendario] as IDCalendarioQuery, [Ano] as AnoQuery, [Mes] as MesQuery from [dwProdutividade].[DIM_CALENDARIO]) AS query", properties=connectionProperties)
imputations = imputations.join(calendar, (imputations.Ano == calendar.AnoQuery) & (imputations.Mes == calendar.MesQuery), "left")
imputations = imputations.withColumn("ID_Calendario", when(col("IDCalendarioQuery").isNull(), -1).otherwise(col("IDCalendarioQuery"))).drop("Ano", "Mes", "IDCalendarioQuery", "AnoQuery", "MesQuery")

imputations.show()

# Calculate Task ID
task = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Tarefa] as IDTarefaQuery, [Nome_Tarefa], [Horas_Previstas_Tarefa] from [dwProdutividade].[DIM_TAREFA]) AS query", properties=connectionProperties)
imputations = imputations.join(task, imputations.FK_TarefaID == task.IDTarefaQuery, "left")
imputations = imputations.withColumn("ID_Tarefa", when(col("IDTarefaQuery").isNull(), -1).otherwise(col("IDTarefaQuery"))).drop("FK_TarefaID", "IDTarefaQuery")

imputations.show()

# Calculate Employee ID
employee = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Funcionario] as IDFuncionarioQuery, [Nome_Funcionario] from [dwProdutividade].[DIM_FUNCIONARIO]) AS query", properties=connectionProperties)
imputations = imputations.join(employee, imputations.Username == employee.Nome_Funcionario, "left")
imputations = imputations.withColumn("ID_Funcionario", when(col("IDFuncionarioQuery").isNull(), -1).otherwise(col("IDFuncionarioQuery"))).drop("IDFuncionarioQuery", "Nome_Funcionario")

imputations.show()

# Calculate Profile ID
profile_info = spark.read.jdbc(url=jdbcUrl, table="(select H.[ID] as IDTarefaHoras, H.[CodigoProjeto] as CodigoProjetoQuery, [Username] as UsernameQuery, H.[Tarefa] as TarefaQuery, [Perfil] as PerfilQuery from [stg].[EXT_TBL_HORASPREVISTAS] as H join [stg].[EXT_TBL_ORCAMENTO] as O on H.[OrcamentoID] = O.[ID]) AS query", properties=connectionProperties)

imputations = imputations.join(profile_info, (imputations.ID_Tarefa == profile_info.IDTarefaHoras) & (imputations.CodigoProjeto == profile_info.CodigoProjetoQuery) & (imputations.Nome_Tarefa == profile_info.TarefaQuery) & (imputations.Username == profile_info.UsernameQuery), "left")

imputations.show()
imputations = imputations.withColumn("Perfil", col("PerfilQuery")).drop("CodigoProjetoQuery", "Nome_Tarefa", "TarefaQuery", "UsernameQuery", "PerfilQuery", "IDCalendarioQuery")
imputations.show()

profile = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Perfil] as IDPerfilQuery, [Nome_Perfil] from [dwProdutividade].[DIM_PERFIL]) AS query", properties=connectionProperties)
imputations = imputations.join(profile, (imputations.Perfil == profile.Nome_Perfil), "left")
imputations.show()
imputations = imputations.withColumn("ID_Perfil", when(col("IDPerfilQuery").isNull(), -1).otherwise(col("IDPerfilQuery"))).drop("IDPerfilQuery", "Perfil", "Nome_Perfil", "CodigoProjeto", "Username",)
imputations.show()

# Calculate Classification ID
imputations = imputations.withColumn("ID_Classificacao_Produtividade_Tarefa", when(col("Total_Horas_Realizadas_Tarefa").isNull() , -1)
                                                                      .when(col("Horas_Previstas_Tarefa").isNull() , -1)
                                                                      .when(col("Horas_Previstas_Tarefa") == 0 , -1)
                                                                      .when(col("Total_Horas_Realizadas_Tarefa") / col("Horas_Previstas_Tarefa") < 1, 0)
                                                                      .when(col("Total_Horas_Realizadas_Tarefa") / col("Horas_Previstas_Tarefa") == 1, 1)
                                                                      .when(col("Total_Horas_Realizadas_Tarefa") / col("Horas_Previstas_Tarefa") <= 1.25, 2)
                                                                      .when(col("Total_Horas_Realizadas_Tarefa") / col("Horas_Previstas_Tarefa") <= 1.50, 3)
                                                                      .when(col("Total_Horas_Realizadas_Tarefa") / col("Horas_Previstas_Tarefa") <= 1.75, 4)
                                                                      .otherwise(5)).drop("Horas_Previstas_Tarefa")

imputations.show()

# Write final output to database
# Before writing it is necessary to rearrange the columns
imputations = imputations.select(col("ID"), col("Horas_Realizadas_Tarefa"), col("ID_Projeto"), col("ID_Calendario"), col("ID_Tarefa"), col("ID_Funcionario"), col("ID_Perfil"), col("ID_Classificacao_Produtividade_Tarefa"))
imputations.show()

imputations.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[FACTO_TAREFA]", mode="overwrite", properties=connectionProperties)

### Project Fact

In [None]:
'''
  [0%, 100%[ - 5        - classification id 0
  100% - 4              - classification id 1
  ]100%, 125%] - 3      - classification id 2
  ]125%, 150%] - 2      - classification id 3
  ]150%, 175%] - 1      - classification id 4
  ]175%, +∞] - 0        - classification id 5
  '''
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, last, date_trunc, when, sum, lit, coalesce, monotonically_increasing_id
from pyspark.sql.window import Window
import pandas as pd

spark = SparkSession.builder.appName("Produtividade").getOrCreate()

# Read data from database
projects_mensal = spark.read.jdbc(url=jdbcUrl, table="(select [CodigoProjeto], [Ano], [Mes], sum([HorasRealizadas]) as Horas_Realizadas_Projeto from ( select [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID], sum([NHoras]) as HorasRealizadas from [stg].[EXT_TBL_IMPUTACAO_DETALHE] group by [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID]) as T group by [CodigoProjeto], [Ano], [Mes]) AS query", properties=connectionProperties)

''' for better visualization
select [CodigoProjeto], [Ano], [Mes], sum([HorasRealizadas]) as Horas_Realizadas_Projeto 
from ( select [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID], sum([NHoras]) as HorasRealizadas 
  from [stg].[EXT_TBL_IMPUTACAO_DETALHE] 
  group by [CodigoProjeto], [Username], [Ano], [Mes], [FK_TarefaID]) as T 
group by [CodigoProjeto], [Ano], [Mes]
'''

get_hours_performed = "(select [CodigoProjeto] as CodigoHoras, sum([NHoras]) as Total_Horas_Realizadas_Projeto_Query from [stg].[EXT_TBL_IMPUTACAO_DETALHE] group by [CodigoProjeto]) AS query"
hours_performed = spark.read.jdbc(url=jdbcUrl, table=get_hours_performed, properties=connectionProperties)

projects_mensal = projects_mensal.join(hours_performed, projects_mensal.CodigoProjeto == hours_performed.CodigoHoras, "left")
projects_mensal.show()
projects_mensal = projects_mensal.withColumn("Total_Horas_Realizadas_Projeto", (col("Total_Horas_Realizadas_Projeto_Query"))).drop("Total_Horas_Realizadas_Projeto_Query", "CodigoHoras")
projects_mensal.show()

# Generate ID column
projects_mensal = projects_mensal.withColumn("ID", monotonically_increasing_id())
projects_mensal.show()

# Advance
advance = spark.read.jdbc(url=jdbcUrl, table="(select [CodigoProjecto], [DataAvanco], [Avanco] as Avanco_Projeto from [stg].[EXT_TBL_HISTORICO_AVANCOS]) AS query", properties=connectionProperties)
projects_mensal = projects_mensal.join(advance, (projects_mensal.CodigoProjeto == advance.CodigoProjecto) & (projects_mensal.Ano == year(advance.DataAvanco)) & (projects_mensal.Mes == month(advance.DataAvanco)), "left")
projects_mensal.show(n = 50)
projects_mensal_pandas = projects_mensal.toPandas()
projects_mensal_pandas = projects_mensal_pandas.groupby(["CodigoProjeto", "Ano", "Mes"]).apply(lambda x: x.sort_values(["Ano", "Mes"]))
projects_mensal_pandas['Avanco_Projeto'] = projects_mensal_pandas['Avanco_Projeto'].fillna(method = 'ffill')
projects_mensal = spark.createDataFrame(projects_mensal_pandas)
projects_mensal = projects_mensal.drop("CodigoProjecto", "DataAvanco")
projects_mensal.show()

# State 
state = spark.read.jdbc(url=jdbcUrl, table="(select [Estado], [CodigoProjeto] as CodigoProjetoQuery from [stg].[EXT_TBL_PROJETOS]) AS query", properties=connectionProperties)
projects_mensal = projects_mensal.join(state, projects_mensal.CodigoProjeto == state.CodigoProjetoQuery, "left")
projects_mensal.show()
state_info = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Estado] as IDEstadoQuery, [Estado] as EstadoQuery from [dwProdutividade].[DIM_ESTADO]) AS query", properties=connectionProperties)
projects_mensal = projects_mensal.join(state_info, projects_mensal.Estado == state_info.EstadoQuery, "left")
projects_mensal.show()
projects_mensal = projects_mensal.withColumn("ID_Estado", when(col("IDEstadoQuery").isNull(), -1).otherwise(col("IDEstadoQuery"))).drop("CodigoProjetoQuery", "IDEstadoQuery", "EstadoQuery", "Estado")
projects_mensal.show()

# Calculate Project ID
project = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Projeto] as IDProjetoID, [Codigo_Projeto], [Horas_Previstas_Projeto] from [dwProdutividade].[DIM_PROJETO]) AS query", properties=connectionProperties)
projects_mensal = projects_mensal.join(project, projects_mensal.CodigoProjeto == project.Codigo_Projeto, "left")
projects_mensal.show()
projects_mensal = projects_mensal.withColumn("ID_Projeto", when(col("IDProjetoID").isNull(), -1).otherwise(col("IDProjetoID"))).drop("IDProjetoID", "Codigo_Projeto")
projects_mensal.show()

# Calculate Calendar ID
calendar = spark.read.jdbc(url=jdbcUrl, table="(select [ID_Calendario] as IDCalendarioQuery, [Ano] as AnoQuery, [Mes] as MesQuery from [dwProdutividade].[DIM_CALENDARIO]) AS query", properties=connectionProperties)
projects_mensal = projects_mensal.join(calendar, (projects_mensal.Ano == calendar.AnoQuery) & (projects_mensal.Mes == calendar.MesQuery), "left")
projects_mensal.show()
projects_mensal = projects_mensal.withColumn("ID_Calendario", when(col("IDCalendarioQuery").isNull(), -1).otherwise(col("IDCalendarioQuery"))).drop("Ano", "Mes", "IDCalendarioQuery", "AnoQuery", "MesQuery")
projects_mensal.show()

# Calculate Classification ID
projects_mensal = projects_mensal.withColumn("ID_Classificacao_Produtividade_Projeto", when(col("Total_Horas_Realizadas_Projeto").isNull() , -1)
                                                                      .when(col("Horas_Previstas_Projeto").isNull() , -1)
                                                                      .when(col("Horas_Previstas_Projeto") == 0 , -1)
                                                                      .when(col("Total_Horas_Realizadas_Projeto") / col("Horas_Previstas_Projeto") < 1, 0)
                                                                      .when(col("Total_Horas_Realizadas_Projeto") / col("Horas_Previstas_Projeto") == 1, 1)
                                                                      .when(col("Total_Horas_Realizadas_Projeto") / col("Horas_Previstas_Projeto") <= 1.25, 2)
                                                                      .when(col("Total_Horas_Realizadas_Projeto") / col("Horas_Previstas_Projeto") <= 1.50, 3)
                                                                      .when(col("Total_Horas_Realizadas_Projeto") / col("Horas_Previstas_Projeto") <= 1.75, 4)
                                                                      .otherwise(5)).drop("Horas_Previstas_Projeto")
projects_mensal.show()

# Write final output to database
projects_mensal = projects_mensal.select(col("ID"), col("Avanco_Projeto"), col("Horas_Realizadas_Projeto"), col("ID_Estado"), col("ID_Projeto"), col("ID_Calendario"), col("ID_Classificacao_Produtividade_Projeto"))
projects_mensal.show()

projects_mensal.write.jdbc(url=jdbcUrl, table="[dwProdutividade].[FACTO_PROJETO]", mode="overwrite", properties=connectionProperties)