<a href="https://colab.research.google.com/github/alanaquinoslv/spark_aws/blob/main/Spark_task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz

In [3]:
!pip install -q findspark

In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [5]:
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [6]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [8]:
df = spark.sql("select 'spark' as hello")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [67]:
# IMPORTS
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, count, when, lit

In [11]:
schema = StructType([
    StructField("funcional", IntegerType(), True),
    StructField("nome_trilha", StringType(), True),
    StructField("cod_trilha", StringType(), True),
    StructField("data_finalizacao_prova", StringType(), True),
    StructField("ind_trilha_finalizada", IntegerType(), True)
])

In [12]:
# MOCK
data = [
    (123, "ENG_DADOS M1", "EG001", "2024-01-24", 1),
    (123, "ENG_DADOS M2", "EG002", "2024-02-23", 1),
    (123, "ENG_DADOS M3", "EG003", "2024-03-22", 1),
	(992, "ENG_ANALYTICS M1", "ANL01", "2024-03-22", 1),
	(992, "ENG_ANALYTICS M1", "ANL01", "2024-03-22", 0),
	(329, "ENG_DADOS M1", "EG001", "2024-01-22", 1),
	(329, "ENG_DADOS M2", "EG002", "2024-03-14", 1),
	(329, "ENG_DADOS M3", "EG003", "2024-03-22", 0),
	(448, "ENG_DADOS M1", "EG001", "2024-01-22", 1),
	(448, "ENG_DADOS M2", "EG002", "2024-01-29", 1),
	(448, "ENG_DADOS M3", "EG003", "2024-02-10", 1),
	(865, "ENG_ANALYTICS M1", "ANL01", "2024-02-22", 1),
	(865, "ENG_ANALYTICS M2", "ANL02", "2024-03-07", 1),
	(865, "ENG_ANALYTICS M3", "ANL03", "2024-03-22", 1),
	(654, "ENG_DADOS M1", "EG001", "2024-03-01", 1),
	(654, "ENG_DADOS M2", "EG002", "2024-03-10", 0),
	(654, "ENG_DADOS M3", "EG003", "2024-03-22", 0)
]

In [13]:
df = spark.createDataFrame(data, schema)
df.show()

+---------+----------------+----------+----------------------+---------------------+
|funcional|     nome_trilha|cod_trilha|data_finalizacao_prova|ind_trilha_finalizada|
+---------+----------------+----------+----------------------+---------------------+
|      123|    ENG_DADOS M1|     EG001|            2024-01-24|                    1|
|      123|    ENG_DADOS M2|     EG002|            2024-02-23|                    1|
|      123|    ENG_DADOS M3|     EG003|            2024-03-22|                    1|
|      992|ENG_ANALYTICS M1|     ANL01|            2024-03-22|                    1|
|      992|ENG_ANALYTICS M1|     ANL01|            2024-03-22|                    0|
|      329|    ENG_DADOS M1|     EG001|            2024-01-22|                    1|
|      329|    ENG_DADOS M2|     EG002|            2024-03-14|                    1|
|      329|    ENG_DADOS M3|     EG003|            2024-03-22|                    0|
|      448|    ENG_DADOS M1|     EG001|            2024-01-22|   

In [27]:
# Códigos das trilhas para filtrar
codes_eng_dados = ["EG001", "EG002", "EG003"]
# Filtrar os dados de acordo com os critérios
df_filtrado = df.filter((col("cod_trilha").isin(codes_eng_dados)) & (col("ind_trilha_finalizada") == 1))
df_filtrado.show()

+---------+------------+----------+----------------------+---------------------+
|funcional| nome_trilha|cod_trilha|data_finalizacao_prova|ind_trilha_finalizada|
+---------+------------+----------+----------------------+---------------------+
|      123|ENG_DADOS M1|     EG001|            2024-01-24|                    1|
|      123|ENG_DADOS M2|     EG002|            2024-02-23|                    1|
|      123|ENG_DADOS M3|     EG003|            2024-03-22|                    1|
|      329|ENG_DADOS M1|     EG001|            2024-01-22|                    1|
|      329|ENG_DADOS M2|     EG002|            2024-03-14|                    1|
|      448|ENG_DADOS M1|     EG001|            2024-01-22|                    1|
|      448|ENG_DADOS M2|     EG002|            2024-01-29|                    1|
|      448|ENG_DADOS M3|     EG003|            2024-02-10|                    1|
|      654|ENG_DADOS M1|     EG001|            2024-03-01|                    1|
+---------+------------+----

In [65]:
# TESTES

# Contando os módulos finalizados
dados_completo = df_filtrado.groupBy("funcional").agg(
    count(when((col("cod_trilha") == 'EG001') & (col("ind_trilha_finalizada") == 1), col("cod_trilha"))).alias("modulo1"),
    count(when((col("cod_trilha") == 'EG002') & (col("ind_trilha_finalizada") == 1), col("cod_trilha"))).alias("modulo2"),
    count(when((col("cod_trilha") == 'EG003') & (col("ind_trilha_finalizada") == 1), col("cod_trilha"))).alias("modulo3")
)

dados_completo.show()

+---------+-------+-------+-------+
|funcional|modulo1|modulo2|modulo3|
+---------+-------+-------+-------+
|      329|      1|      1|      0|
|      123|      1|      1|      1|
|      654|      1|      0|      0|
|      448|      1|      1|      1|
+---------+-------+-------+-------+



In [68]:
# TESTES
# CRIANDO FLAGS
flag = dados_completo.withColumn("eng_dados_completo",
                                           when((col("modulo1") + col("modulo2") + col("modulo3")) == 3, lit(1)).otherwise(0))

flag.show()

+---------+-------+-------+-------+------------------+
|funcional|modulo1|modulo2|modulo3|eng_dados_completo|
+---------+-------+-------+-------+------------------+
|      329|      1|      1|      0|                 0|
|      123|      1|      1|      1|                 1|
|      654|      1|      0|      0|                 0|
|      448|      1|      1|      1|                 1|
+---------+-------+-------+-------+------------------+



In [71]:
# TESTES
flag_filter = flag.filter(col('eng_dados_completo') == 1)
flag_filter.show()

+---------+-------+-------+-------+------------------+
|funcional|modulo1|modulo2|modulo3|eng_dados_completo|
+---------+-------+-------+-------+------------------+
|      123|      1|      1|      1|                 1|
|      448|      1|      1|      1|                 1|
+---------+-------+-------+-------+------------------+



In [30]:
# Realizar a contagem dos módulos finalizados agrupados por funcional
resultado = df_filtrado.groupBy("funcional").agg(count("*").alias("mod_eng_dados_completed"))
resultado.show()

+---------+-----------------------+
|funcional|mod_eng_dados_completed|
+---------+-----------------------+
|      329|                      2|
|      123|                      3|
|      654|                      1|
|      448|                      3|
+---------+-----------------------+



In [26]:
# Contar o número de módulos completados para cada funcional
contagem_modulos = df_filtrado.groupBy("funcional").agg(count("*").alias("total_modulos"))

# Filtrar os usuários que têm os três módulos completados
usuarios_completos = contagem_modulos.filter(col("total_modulos") == 3)

# Mostrar os resultados
usuarios_completos.show()

+---------+-------------+
|funcional|total_modulos|
+---------+-------------+
|      123|            3|
|      448|            3|
+---------+-------------+



In [34]:
# Contar o número de módulos completados para cada funcional
contagem_modulos = df_filtrado.groupBy("funcional").agg(count("*").alias("count_finalizados"))

# Criar uma nova coluna "eng_dados_completed" com base na contagem de módulos
df_completos = contagem_modulos.withColumn("eng_dados_completed", when(col("count_finalizados") == 3, 1).otherwise(0))
df_completos.show()

+---------+-----------------+-------------------+
|funcional|count_finalizados|eng_dados_completed|
+---------+-----------------+-------------------+
|      329|                2|                  0|
|      123|                3|                  1|
|      654|                1|                  0|
|      448|                3|                  1|
+---------+-----------------+-------------------+



In [35]:
# Códigos das trilhas para filtrar
codes_eng_analytics = ["ANL01", "ANL02", "ANL03"]
# Filtrar os dados de acordo com os critérios
df_filtrado_anl = df.filter((col("cod_trilha").isin(codes_eng_analytics)) & (col("ind_trilha_finalizada") == 1))
df_filtrado_anl.show()

+---------+----------------+----------+----------------------+---------------------+
|funcional|     nome_trilha|cod_trilha|data_finalizacao_prova|ind_trilha_finalizada|
+---------+----------------+----------+----------------------+---------------------+
|      992|ENG_ANALYTICS M1|     ANL01|            2024-03-22|                    1|
|      865|ENG_ANALYTICS M1|     ANL01|            2024-02-22|                    1|
|      865|ENG_ANALYTICS M2|     ANL02|            2024-03-07|                    1|
|      865|ENG_ANALYTICS M3|     ANL03|            2024-03-22|                    1|
+---------+----------------+----------+----------------------+---------------------+



In [36]:
# Realizar a contagem dos módulos finalizados agrupados por funcional
resultado_anl = df_filtrado_anl.groupBy("funcional").agg(count("*").alias("mod_eng_analytics_completed"))
resultado_anl.show()

+---------+---------------------------+
|funcional|mod_eng_analytics_completed|
+---------+---------------------------+
|      992|                          1|
|      865|                          3|
+---------+---------------------------+



In [46]:
# Contar o número de módulos completados para cada funcional
contagem_modulos_anl = df_filtrado_anl.groupBy("funcional").agg(count("*").alias("count_finalizados_anl"))

# Criar uma nova coluna "eng_dados_completed" com base na contagem de módulos
df_completos_anl = contagem_modulos_anl.withColumn("eng_analytics_completed", when(col("count_finalizados_anl") == 3, 1).otherwise(0))
df_completos_anl.show()

+---------+---------------------+-----------------------+
|funcional|count_finalizados_anl|eng_analytics_completed|
+---------+---------------------+-----------------------+
|      992|                    1|                      0|
|      865|                    3|                      1|
+---------+---------------------+-----------------------+



In [44]:
df_merged = df_completos.join(df_completos_anl, "funcional", "full")
df_merged.show()

+---------+-----------------+-------------------+---------------------+-----------------------+
|funcional|count_finalizados|eng_dados_completed|count_finalizados_anl|eng_analytics_completed|
+---------+-----------------+-------------------+---------------------+-----------------------+
|      123|                3|                  1|                 NULL|                   NULL|
|      329|                2|                  0|                 NULL|                   NULL|
|      448|                3|                  1|                 NULL|                   NULL|
|      654|                1|                  0|                 NULL|                   NULL|
|      865|             NULL|               NULL|                    3|                      1|
|      992|             NULL|               NULL|                    1|                      0|
+---------+-----------------+-------------------+---------------------+-----------------------+



In [48]:
# Juntar os DataFrames "ENG_DADOS" e "ENG_ANALYTICS" ao DataFrame inicial
df_joined = df.join(df_completos.select("funcional", "eng_dados_completed"), "funcional", "left") \
             .join(df_completos_anl.select("funcional", "eng_analytics_completed"), "funcional", "left")

df_joined.show()

+---------+----------------+----------+----------------------+---------------------+-------------------+-----------------------+
|funcional|     nome_trilha|cod_trilha|data_finalizacao_prova|ind_trilha_finalizada|eng_dados_completed|eng_analytics_completed|
+---------+----------------+----------+----------------------+---------------------+-------------------+-----------------------+
|      329|    ENG_DADOS M1|     EG001|            2024-01-22|                    1|                  0|                   NULL|
|      329|    ENG_DADOS M2|     EG002|            2024-03-14|                    1|                  0|                   NULL|
|      329|    ENG_DADOS M3|     EG003|            2024-03-22|                    0|                  0|                   NULL|
|      992|ENG_ANALYTICS M1|     ANL01|            2024-03-22|                    1|               NULL|                      0|
|      992|ENG_ANALYTICS M1|     ANL01|            2024-03-22|                    0|             

In [50]:
joined_filter = df_joined.filter((col("eng_dados_completed") == 1) | (col("eng_analytics_completed") == 1))
joined_filter.show()

+---------+----------------+----------+----------------------+---------------------+-------------------+-----------------------+
|funcional|     nome_trilha|cod_trilha|data_finalizacao_prova|ind_trilha_finalizada|eng_dados_completed|eng_analytics_completed|
+---------+----------------+----------+----------------------+---------------------+-------------------+-----------------------+
|      123|    ENG_DADOS M1|     EG001|            2024-01-24|                    1|                  1|                   NULL|
|      123|    ENG_DADOS M2|     EG002|            2024-02-23|                    1|                  1|                   NULL|
|      123|    ENG_DADOS M3|     EG003|            2024-03-22|                    1|                  1|                   NULL|
|      448|    ENG_DADOS M1|     EG001|            2024-01-22|                    1|                  1|                   NULL|
|      448|    ENG_DADOS M2|     EG002|            2024-01-29|                    1|             

In [61]:
# Selecionar apenas uma linha para cada 'funcional' utilizando distinct
joined_filter_distinct = joined_filter.select("funcional", "eng_dados_completed", "eng_analytics_completed").distinct()

# Mostrar os resultados
joined_filter_distinct.show()

+---------+-------------------+-----------------------+
|funcional|eng_dados_completed|eng_analytics_completed|
+---------+-------------------+-----------------------+
|      123|                  1|                   NULL|
|      448|                  1|                   NULL|
|      865|               NULL|                      1|
+---------+-------------------+-----------------------+



In [62]:
# Substituir campos nulos por 0 em todo o DataFrame
df_final = joined_filter_distinct.fillna(0)
df_final.show()

+---------+-------------------+-----------------------+
|funcional|eng_dados_completed|eng_analytics_completed|
+---------+-------------------+-----------------------+
|      123|                  1|                      0|
|      448|                  1|                      0|
|      865|                  0|                      1|
+---------+-------------------+-----------------------+

