In [2]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
from pyspark.sql.functions import *

In [3]:
warehouse_location = 'hdfs://hdfs-nn:9000/TrabalhoPratico'

builder = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .enableHiveSupport() \

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
normalizar_data_female=spark.read.format("delta").load("hdfs://hdfs-nn:9000/TrabalhoPratico/silver/MentalHealth_DeltaTable/")

In [5]:
normalizar_data_female = normalizar_data_female.withColumn("TimeStamp", year("TimeStamp"))
normalizar_data_female = normalizar_data_female.withColumnRenamed("Country", "Pais")
normalizar_data_female = normalizar_data_female.withColumnRenamed("Gender", "Genero")


In [6]:
normalizar_data_female = normalizar_data_female.withColumnRenamed("Timestamp","Ano")

In [7]:
normalizar_data_female.show()

+----+---+------+--------------+--------------+---------+--------------+-------------+------------+----------+------------+----------------+----------+----------+------------------+-------------------------+-----------------------+------------+------------+-----------------------+---------------------+
| Ano|Age|Genero|          Pais|family_history|treatment|work_interfere|remote_worker|tech_company|  benefits|care_options|Wellness_program| Seek_help| anonymity|             leave|mental_health_consequence|phys_health_consequence|   coworkers|  supervisor|mental_health_interview|phys_health_interview|
+----+---+------+--------------+--------------+---------+--------------+-------------+------------+----------+------------+----------------+----------+----------+------------------+-------------------------+-----------------------+------------+------------+-----------------------+---------------------+
|2014| 37|Female| United States|            No|      Yes|         Often|           No|  

In [None]:
spark.sql(
    """
    CREATE DATABASE IF NOT EXISTS gold LOCATION 'hdfs://hdfs-nn:9000/TrabalhoPratico/gold/'
    """
)

In [None]:
spark.sql(
    """
    DROP TABLE IF EXISTS gold.mentalHealth_TratamentoPerSex_Country_Year
    """
)

In [None]:
# 1-> Criação de uma tabela de maneira a criar uma nova deltaTable , uma com uma coluna chamada PercentualSuicidiosPorSexo

spark.sql(
    """
    DROP TABLE IF EXISTS gold.mentalHealth_TratamentoPerSex
    """
)

spark.sql(
    """
    CREATE EXTERNAL TABLE  gold.mentalHealth_TratamentoPerSex (
        Ano Integer,
        Pais STRING,
        Male_Count LONG,
        Female_Count LONG,
        Homens_em_tratamento LONG,
        Mulheres_em_tratamento LONG,
        Percentagem_De_Homens_Em_Tratamento DOUBLE,
        Percentagem_De_Mulheres_Em_Tratamento DOUBLE
        
    )
    USING DELTA
    LOCATION 'hdfs://hdfs-nn:9000/TrabalhoPratico/gold/mentalHealth_TratamentoPerSex/'
    """
)

In [None]:

treatment_count = (
    normalizar_data_female

    .withColumn(
        "Homens_em_tratamento",
        when((col("Genero") == "Male") & (col("treatment") == "Yes"), 1).otherwise(0)
        # Se sim 1, se não 0
    )
    .withColumn(
        "Mulheres_em_tratamento",
        when((col("Genero") == "Female") & (col("treatment") == "Yes"), 1).otherwise(0)
    )
    .groupBy("Pais", "Ano")
    .agg(
        sum("Homens_em_tratamento").alias("Homens_em_tratamento"),
        sum("Mulheres_em_tratamento").alias("Mulheres_em_tratamento")
    )
)
treatment_count.show()

treatment_count_with_percentages = (
    treatment_count
    .withColumn(
        "Percentagem_De_Homens_Em_Tratamento",
        (col("Homens_em_tratamento") / (col("Homens_em_tratamento") + col("Mulheres_em_tratamento"))) * 100
    )
    .withColumn(
        "Percentagem_De_Mulheres_Em_Tratamento",
        (col("Mulheres_em_tratamento") / (col("Homens_em_tratamento") + col("Mulheres_em_tratamento"))) * 100
    )
)

treatment_count_with_percentages.show()

In [None]:
treatment_count_with_percentages  \
    .select( "Ano" , "Pais" ,"Homens_em_tratamento", "Mulheres_em_tratamento","Percentagem_De_Homens_Em_Tratamento", "Percentagem_De_Mulheres_Em_Tratamento") \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("hdfs://hdfs-nn:9000/TrabalhoPratico/gold/mentalHealth_TratamentoPerSex") 

In [None]:
#Ver esta tabel no tableua

spark.sql(
    """
    SELECT *
    FROM gold.mentalHealth_TratamentoPerSex
    """
).show()

In [12]:
spark.sql(
    """
    DROP TABLE IF EXISTS gold.mentalHealth_Benefits_Care_TreatmentPerCountry
    """
)

DataFrame[]

In [13]:

spark.sql(
    """
    DROP TABLE IF EXISTS gold.mentalHealth_Benefits_Care_TreatmentPerCountry
    """
)

spark.sql(
    """
    CREATE EXTERNAL TABLE gold.mentalHealth_Benefits_Care_TreatmentPerCountry (
        Pais STRING,
        Genero STRING,
        Ano DATE,
        Sum_Benefits_Per_Country LONG,
        Sum_Care_Options_Per_Country LONG,
        Sum_No_Benefits_Per_Country LONG,
        Sum_No_CareOptions_Per_Country LONG,
        Sum_All_Benefits_Per_Country LONG,
        Sum_All_CareOptions_Per_Country LONG,
        Sum_Dont_Know_Benefits_Per_Country LONG,
        Sum_Dont_Know_Care_Options_Per_Country LONG,
        Percentage_Yes_Benefits LONG,
        Percentage_No_Benefits LONG,
        Percentage_Yes_CareOptions LONG,
        Percentage_No_CareOptions LONG,
        Percentage_Dont_Know_Benefits LONG,
        Percentage_Dont_Know_CareOptions LONG
        
    )
    USING DELTA
    LOCATION 'hdfs://hdfs-nn:9000/TrabalhoPratico/gold/mentalHealth_Benefits_Care_TreatmentPerCountry/'
    """
)


DataFrame[]

In [None]:
normalizar_data_female.show()

In [None]:
countries_for_date = normalizar_data_female.groupBy("Country", year("Timestamp").alias("Ano")).agg(
    sum(when(col("Gender") == "Male", 1).otherwise(0)).alias("Male_Count"),
    sum(when(col("Gender") == "Female", 1).otherwise(0)).alias("Female_Count")
)


treatment_count_per_country_year = countries_for_date.filter(normalizar_data_female['Treatment'] == 'yes') \
    .groupBy('Country', 'Ano') \
    .agg(count('*').alias('Total_Treatment_Yes'))

# Mostrando o total de tratamentos ('Total_Treatment_Yes') em cada país e ano

In [15]:

#treatment_by_gender = (
#    normalizar_data_female
#    .groupBy('Pais', 'Ano')
#    .agg(
#        sum(when(col("treatment") == "Yes", 1).otherwise(0)).alias("Total_Treatments_By_Gender")
#    )
#)

treatment_by_country = (
    normalizar_data_female
    .groupBy('Pais',  "Ano")
    .agg(
        #sum(when(col("treatment") == "Yes", 1).otherwise(0)).alias("Sum_Treatments_Per_Country"),
        sum(when(col("benefits") == "Yes",1).otherwise(0)).alias("Sum_Benefits_Per_Country"),
        sum(when(col("care_options") == "Yes",1).otherwise(0)).alias("Sum_Care_Options_Per_Country"),
        sum(when((col("benefits") == "No") | (col("benefits") == "Not sure"), 1).otherwise(0)).alias("Sum_No_Benefits_Per_Country"),
        sum(when((col("care_options") == "No") | (col("care_options") == "Not sure"), 1).otherwise(0)).alias("Sum_No_CareOptions_Per_Country"),
        sum(when(col("benefits") == "Don't know",1).otherwise(0)).alias("Sum_Dont_Know_Benefits_Per_Country"),
        sum(when(col("care_options") == "Don't know",1).otherwise(0)).alias("Sum_Dont_know_Care_Options_Per_Country"),
        sum(when(col("benefits").isNotNull(), 1).otherwise(0)).alias("Sum_All_Benefits_Per_Country"),
        sum(when(col("care_options").isNotNull(), 1).otherwise(0)).alias("Sum_All_CareOptions_Per_Country")
    )
)
treatment_with_percentages = treatment_by_country.withColumn(
    "Percentage_Yes_Benefits",
    (col("Sum_Benefits_Per_Country") / col("Sum_All_Benefits_Per_Country")) * 100
).withColumn(
    "Percentage_No_Benefits",
    (col("Sum_No_Benefits_Per_Country") / col("Sum_All_Benefits_Per_Country")) * 100
).withColumn(
    "Percentage_Yes_CareOptions",
    (col("Sum_Care_Options_Per_Country") / col("Sum_All_CareOptions_Per_Country")) * 100
).withColumn(
    "Percentage_No_CareOptions",
    (col("Sum_No_CareOptions_Per_Country") / col("Sum_All_CareOptions_Per_Country")) * 100
).withColumn(
    "Percentage_Dont_Know_Benefits",
    (col("Sum_Dont_Know_Benefits_Per_Country") / col("Sum_All_Benefits_Per_Country")) * 100
).withColumn(
    "Percentage_Dont_Know_CareOptions",
    (col("Sum_Dont_Know_Care_Options_Per_Country") / col("Sum_All_CareOptions_Per_Country")) * 100
)

# Join the aggregated DataFrames
#result = (
    #treatment_by_gender.alias("treatment_gender")
    #.join(treatment_by_country.alias("treatment_country"), on=['Country',  'Ano'])
    #.select("treatment_gender.Country", "treatment_gender.Ano","treatment_country.Sum_Treatments_Per_Country", "treatment_country.Sum_Benefits_Per_Country", "treatment_country.Sum_Care_Options_Per_Country")
#)

# Show the table with gender, sum of treatments per country, and country
treatment_with_percentages.toPandas()

Unnamed: 0,Pais,Ano,Sum_Benefits_Per_Country,Sum_Care_Options_Per_Country,Sum_No_Benefits_Per_Country,Sum_No_CareOptions_Per_Country,Sum_Dont_Know_Benefits_Per_Country,Sum_Dont_know_Care_Options_Per_Country,Sum_All_Benefits_Per_Country,Sum_All_CareOptions_Per_Country,Percentage_Yes_Benefits,Percentage_No_Benefits,Percentage_Yes_CareOptions,Percentage_No_CareOptions,Percentage_Dont_Know_Benefits,Percentage_Dont_Know_CareOptions
0,Colombia,2014,0,1,2,1,0,0,2,2,0.0,100.0,50.0,50.0,0.0,0.0
1,Croatia,2014,1,1,1,1,0,0,2,2,50.0,50.0,50.0,50.0,0.0,0.0
2,Netherlands,2014,3,3,11,17,6,0,20,20,15.0,55.0,15.0,85.0,30.0,0.0
3,Nigeria,2014,0,0,1,1,0,0,1,1,0.0,100.0,0.0,100.0,0.0,0.0
4,Finland,2015,0,0,1,1,0,0,1,1,0.0,100.0,0.0,100.0,0.0,0.0
5,Austria,2014,1,1,0,2,2,0,3,3,33.333333,0.0,33.333333,66.666667,66.666667,0.0
6,Canada,2014,21,21,19,40,21,0,61,61,34.42623,31.147541,34.42623,65.57377,34.42623,0.0
7,Singapore,2014,0,0,1,2,1,0,2,2,0.0,50.0,0.0,100.0,50.0,0.0
8,South Africa,2014,0,1,4,4,1,0,5,5,0.0,80.0,20.0,80.0,20.0,0.0
9,Bosnia and Herzegovina,2014,0,0,0,1,1,0,1,1,0.0,0.0,0.0,100.0,100.0,0.0


In [17]:
treatment_with_percentages = treatment_with_percentages.drop("Sum_Dont_know_Care_Options_Per_Country")

In [18]:
treatment_with_percentages \
    .select("Pais", "Ano","Sum_Benefits_Per_Country","Sum_Care_Options_Per_Country","Sum_No_Benefits_Per_Country" ,
        "Sum_No_CareOptions_Per_Country" , "Sum_Dont_Know_Benefits_Per_Country" , 
        "Sum_All_Benefits_Per_Country" ,
        "Sum_All_CareOptions_Per_Country","Percentage_Yes_Benefits","Percentage_No_Benefits","Percentage_Yes_CareOptions","Percentage_No_CareOptions", "Percentage_Dont_Know_Benefits") \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("hdfs://hdfs-nn:9000/TrabalhoPratico/gold/mentalHealth_Benefits_Care_TreatmentPerCountry/") 

In [19]:
spark.sql(
    """
    Select *
    from gold.mentalHealth_Benefits_Care_TreatmentPerCountry
    """
    ).show()

+--------------------+----+------------------------+----------------------------+---------------------------+------------------------------+----------------------------------+----------------------------+-------------------------------+-----------------------+----------------------+--------------------------+-------------------------+-----------------------------+
|                Pais| Ano|Sum_Benefits_Per_Country|Sum_Care_Options_Per_Country|Sum_No_Benefits_Per_Country|Sum_No_CareOptions_Per_Country|Sum_Dont_Know_Benefits_Per_Country|Sum_All_Benefits_Per_Country|Sum_All_CareOptions_Per_Country|Percentage_Yes_Benefits|Percentage_No_Benefits|Percentage_Yes_CareOptions|Percentage_No_CareOptions|Percentage_Dont_Know_Benefits|
+--------------------+----+------------------------+----------------------------+---------------------------+------------------------------+----------------------------------+----------------------------+-------------------------------+-----------------------+------

In [None]:
spark.stop()