In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import expr, array, col, explode, arrays_zip, when, first, avg
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from functools import reduce
from pyspark.sql import DataFrame

# warehouse_location points to the default location for managed databases and tables
warehouse = 'hdfs://hdfs-nn:9000/warehouse'

builder = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .enableHiveSupport() \

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
# Carregar tabelas
own_df = spark.table("database.jobs")
wage_df = spark.table("database.wage")


own_df = own_df.withColumnRenamed('pais_name', 'nome_pais') \
               .withColumnRenamed('indicador_name', 'nome_indicador')

# Lista de países e anos
paises = ["%Germany%", "%Denmark%", "%Switzerland%", "%Poland%", "%Czech%", "%Slovenia%", "%Hungary%", "%Austria%"]
anos = ["2010", "2011", "2012", "2013", "2014", "2015"]

# Filtrando os DataFrames

def filter_by_multiple_conditions(df: DataFrame, paises: list, anos: list) -> DataFrame:
    conditions = reduce(lambda a, b: a | b, (col("nome_pais").like(p) for p in paises))
    df = df.filter(conditions)
    df = df.filter(col("ano").isin(anos))
    return df

own_df = filter_by_multiple_conditions(own_df, paises, anos)
wage_df = filter_by_multiple_conditions(wage_df, paises, anos)

own_df = own_df.drop("indicador_code", "pais_cod")

wage_df = wage_df.drop("cod_indicador", "cod_pais")

wage_df = wage_df.withColumnRenamed('Disaggregation', 'desagregacao')

wage_df = wage_df.filter((col('desagregacao') == 'female') | (col('desagregacao') == 'male'))

# Filtrar pelos indicadores de emprego
own_df = own_df.filter(F.col("nome_indicador").like("%Own-account workers%"))

# Adicionar a coluna "genero" com base no nome do indicador
own_df = own_df.withColumn("desagregacao",
    F.when(F.col("nome_indicador").like("% female%"), "female")
    .when(F.col("nome_indicador").like("% male%"), "male")
)

own_df = own_df.filter(F.col("desagregacao").isNotNull())

wage_df = wage_df.filter(F.col("desagregacao").isNotNull())

# Calcular a média dos valores por ano, gênero e indicador
wage_df_media = wage_df.groupBy("nome_pais", "ano", "desagregacao", "nome_indicador") \
    .agg(F.avg("valor").alias("valor"))

own_df_media = own_df.groupBy("nome_pais", "ano", "desagregacao", "nome_indicador") \
    .agg(F.avg("valor").alias("valor"))

# Selecionar apenas uma linha por ano e gênero
window = Window.partitionBy("nome_pais", "ano", "desagregacao").orderBy(F.col("valor"))

wage_df_final = wage_df_media.withColumn("row_number", F.row_number().over(window)) \
    .filter(F.col("row_number") == 1).drop("row_number")

own_df_final = own_df_media.withColumn("row_number", F.row_number().over(window)) \
    .filter(F.col("row_number") == 1).drop("row_number")

# Definindo a ordem das colunas em 
columns_order = ["nome_indicador", "nome_pais", "ano", "valor", "desagregacao"]

# Reorganizando as colunas no DataFrame 
wage_df_final = wage_df_final.select(columns_order)
own_df_final = own_df_final.select(columns_order)

In [3]:
wage_df_final = wage_df_final.withColumn("valor", col("valor").cast("float"))
own_df_final = own_df_final.withColumn("valor", col("valor").cast("float"))

In [4]:
df_concatenado = wage_df_final.union(own_df_final)
df_concatenado = df_concatenado.withColumn("ano", col("ano").cast("int"))

In [5]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

df_concatenado.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("hdfs://hdfs-nn:9000/Projeto/gold/TabelaWageOwnGold")

In [6]:
spark.sql(
    """
    SELECT * FROM database.WageOwnGold
    """
).show(500)

+--------------+--------------------+----+--------+------------+
|     nome_pais|      nome_indicador| ano|   valor|desagregacao|
+--------------+--------------------+----+--------+------------+
|       Austria|Own-account worke...|2010|   6.176|      female|
|       Austria|Own-account worke...|2010|    6.89|        male|
|       Austria|Own-account worke...|2011|   6.052|      female|
|       Austria|Own-account worke...|2011|   6.586|        male|
|       Austria|Own-account worke...|2012|   6.086|      female|
|       Austria|Own-account worke...|2012|   6.537|        male|
|       Austria|Own-account worke...|2013|   6.317|      female|
|       Austria|Own-account worke...|2013|   6.883|        male|
|       Austria|Own-account worke...|2014|   6.051|      female|
|       Austria|Own-account worke...|2014|   6.578|        male|
|       Austria|Own-account worke...|2015|   6.205|      female|
|       Austria|Own-account worke...|2015|   6.889|        male|
|Czech Republic|Own-accou

In [7]:
spark.stop()