In [28]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [29]:
from os.path import abspath
import datetime
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [30]:
# setup da aplicação Spark
spark = SparkSession \
    .builder \
    .appName("job-glue-spark") \
    .getOrCreate()

In [31]:
# definindo o método de logging da aplicação use INFO somente para DEV [INFO,ERROR]
spark.sparkContext.setLogLevel("ERROR")

In [32]:
df = spark.read.format("csv")\
    .option("header", "True")\
    .option("inferSchema","True")\
    .option("encoding", "ISO-8859-1")\
    .csv("/content/drive/MyDrive/Dados/PROJETO-DATA-CANCER/landing/*.csv")

In [33]:

# imprime os dados lidos da raw
print ("\nImprime os dados lidos da landing:")
print (df.show())


Imprime os dados lidos da landing:
+--------+-----------------+---------+-------------+---+----------+-----------+----------------+----------+-------------------+-----------+---------------+--------------------+-----------------+------------+-------------------------+---------------+----------------------+------------------+----------------------+------------+-------------------------+------------------+-------------------------------+---------------------------+----------------------+----------------+---------+----------+---------+---+------------+-------------+-------------+--------------------+------------------+------------------+----+
|cod_paci|        RCBP_Name|   Gender|Date_of_Birth|Age|Raca_Color|Nationality|Naturality_State|Naturality|Degree_of_Education|State_Civil|Code_Profession|     Name_Occupation|   Status_Address|City_Address|Description_of_Topography|Topography_Code|Morphology_Description|Code_of_Morphology|Description_of_Disease|Illness_Code|Child_Illness_Descriptio

In [34]:
# imprime o schema do dataframe
print ("\nImprime o schema do dataframe lido da raw:")
print (df.printSchema())


Imprime o schema do dataframe lido da raw:
root
 |-- cod_paci: string (nullable = true)
 |-- RCBP_Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Date_of_Birth: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Raca_Color: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Naturality_State: string (nullable = true)
 |-- Naturality: string (nullable = true)
 |-- Degree_of_Education: string (nullable = true)
 |-- State_Civil: string (nullable = true)
 |-- Code_Profession: string (nullable = true)
 |-- Name_Occupation: string (nullable = true)
 |-- Status_Address: string (nullable = true)
 |-- City_Address: string (nullable = true)
 |-- Description_of_Topography: string (nullable = true)
 |-- Topography_Code: string (nullable = true)
 |-- Morphology_Description: string (nullable = true)
 |-- Code_of_Morphology: integer (nullable = true)
 |-- Description_of_Disease: string (nullable = true)
 |-- Illness_Code: string (nullable 

In [35]:
print ("\nEscrevendo os dados lidos da raw para parquet na processing zone...")
df.write.format("parquet")\
        .mode("overwrite")\
        .save("/content/drive/MyDrive/Dados/PROJETO-DATA-CANCER/processing")


Escrevendo os dados lidos da raw para parquet na processing zone...


In [36]:
# lendo arquivos parquet
df_parquet = spark.read.format("parquet")\
 .load("/content/drive/MyDrive/Dados/PROJETO-DATA-CANCER/processing")


In [37]:
# imprime os dados lidos em parquet
print ("\nImprime os dados lidos em parquet da processing zone")
print (df_parquet.show())


Imprime os dados lidos em parquet da processing zone
+--------+------------+---------+-------------+---+----------+-----------+----------------+--------------+-------------------+--------------------+---------------+--------------------+--------------+------------+-------------------------+---------------+----------------------+------------------+----------------------+------------+-------------------------+------------------+-------------------------------+---------------------------+----------------------+--------------------+----------+----------+---------+---+------------+-------------+-------------+--------------------+------------------+------------------+----+
|cod_paci|   RCBP_Name|   Gender|Date_of_Birth|Age|Raca_Color|Nationality|Naturality_State|    Naturality|Degree_of_Education|         State_Civil|Code_Profession|     Name_Occupation|Status_Address|City_Address|Description_of_Topography|Topography_Code|Morphology_Description|Code_of_Morphology|Description_of_Disease|Illn

In [38]:
df_soft = df_parquet.drop("RCBP_Name","Code_of_Disease.Adult_Young.","Topography_Code","Morphology_Description","Code_of_Morphology",\
                   "Code_of_Disease_Adult_Young","Youth_Adult_Illness_Description","Naturality","State_Civil","Code_Profession",\
                   "Name_Occupation","Description_of_Topography","Indicator_of_Rare_Case","year","Distant_metastasis","Date_of_Diagnostic",\
                   "Date_of_Last_Contact","Date_of_Last_Contact","TNM","Statement","Laterality","Extension","Diagnostic_means",\
                   "Code_of_Disease_Adult_Young","Youth_Adult_Illness_Descriptio","Child_Illness_Code", "Child_Illness_Description",\
                   "Date_of_Birth","Raca_Color","Degree_of_Education","Naturality_State","Nationality","status_vital","Illness_Code")

In [39]:
df_soft = df_soft\
.withColumnRenamed('Patient_Code','cod_paci')\
.withColumnRenamed('Status_Address','estado')\
.withColumnRenamed('City_Address','cidade')\
.withColumnRenamed('Type_of_Death','tipo_morte')\
.withColumnRenamed('Date_of_Death','data_morte')\
.withColumnRenamed('Description_of_Disease','tipo_cancer')

In [40]:
df_soft = df_soft.withColumn("cod_paci", df_soft["cod_paci"].cast(IntegerType()))\
                   .withColumn("Age", df_soft["Age"].cast(IntegerType()))

In [41]:
df_soft = df_soft.withColumn("cancer_dead", when(df_soft["tipo_morte"] == "CÂNCER", 1).otherwise(0))\
.withColumn("ano_morte", year(df_soft["data_morte"]))

In [42]:
# Registra o DataFrame como uma tabela temporária
df_soft.createOrReplaceTempView("df_table")

In [43]:
# Use a cláusula CASE WHEN THEN ELSE em SQL
query_age = """
SELECT *,
CASE 
    WHEN Age BETWEEN 0 AND 4 THEN '0-4'
    WHEN Age BETWEEN 5 AND 14 THEN '5-14'
    WHEN Age BETWEEN 15 AND 24 THEN '15-24'
    WHEN Age BETWEEN 25 AND 34 THEN '25-34'
    WHEN Age BETWEEN 35 AND 44 THEN '35-44'
    WHEN Age BETWEEN 45 AND 54 THEN '45-54'
    WHEN Age BETWEEN 55 AND 64 THEN '55-64'
    WHEN Age BETWEEN 65 AND 74 THEN '65-74'
    WHEN Age BETWEEN 75 AND 84 THEN '75-84'
    ELSE '85+'
END AS Faixa_etaria
FROM df_table
"""


In [44]:
df_soft = spark.sql(query_age)

In [45]:
# imprime os dados lidos em parquet
print ("\nImprime os dados com a coluna faixa etaria")
print (df_soft.show())


Imprime os dados com a coluna faixa etaria
+--------+---------+---+-------+-------+--------------------+----------+----------+-----------+---------+------------+
|cod_paci|   Gender|Age| estado| cidade|         tipo_cancer|tipo_morte|data_morte|cancer_dead|ano_morte|Faixa_etaria|
+--------+---------+---+-------+-------+--------------------+----------+----------+-----------+---------+------------+
|  360918| FEMININO| 69|SERGIPE|ARACAJU|  COLO DO UTERO, SOE|        NA|        NA|          0|     null|       65-74|
|  360871| FEMININO| 51|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|        NA|        NA|          0|     null|       45-54|
|  360901| FEMININO| 39|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|    CÂNCER|2015-10-03|          1|     2015|       35-44|
|  360922| FEMININO| 87|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|        NA|        NA|          0|     null|         85+|
|  360924| FEMININO| 80|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|        NA|        NA|          0|     null|       75-84|
|  3

In [46]:
df_soft.createOrReplaceTempView("df_table")

In [47]:
query_taxa = """
WITH cancer_stats AS (
    SELECT 
        tipo_cancer, 
        SUM(cancer_dead) as num_mortes, 
        COUNT(cod_paci) as num_casos,
        SUM(cancer_dead) / COUNT(cod_paci) as taxa_morte_tipo_cancer
    FROM df_table
    GROUP BY tipo_cancer
)
SELECT df_table.*, cancer_stats.taxa_morte_tipo_cancer
FROM df_table
LEFT JOIN cancer_stats
ON df_table.tipo_cancer = cancer_stats.tipo_cancer
"""

In [48]:
df_soft = spark.sql(query_taxa)

In [49]:
df_soft.repartition(1)\
          .write\
          .format("parquet")\
          .mode("overwrite")\
          .save("/content/drive/MyDrive/Dados/PROJETO-DATA-CANCER/curated")

In [50]:
# imprime os dados lidos em parquet
print ("\nImprime os dados que foram processados para curated")
print (df_soft.show())


Imprime os dados que foram processados para curated
+--------+---------+---+-------+-------+--------------------+----------+----------+-----------+---------+------------+----------------------+
|cod_paci|   Gender|Age| estado| cidade|         tipo_cancer|tipo_morte|data_morte|cancer_dead|ano_morte|Faixa_etaria|taxa_morte_tipo_cancer|
+--------+---------+---+-------+-------+--------------------+----------+----------+-----------+---------+------------+----------------------+
|  360918| FEMININO| 69|SERGIPE|ARACAJU|  COLO DO UTERO, SOE|        NA|        NA|          0|     null|       65-74|   0.14399454261711414|
|  360871| FEMININO| 51|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|        NA|        NA|          0|     null|       45-54|   0.03463413489001745|
|  360901| FEMININO| 39|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|    CÂNCER|2015-10-03|          1|     2015|       35-44|   0.03463413489001745|
|  360922| FEMININO| 87|SERGIPE|ARACAJU|PELE DE OUTRAS PA...|        NA|        NA|          0|

In [51]:
# para a aplicação
spark.stop()