
### Requisitos para os arquivos da pasta partidas

Objetivo: realizar processo de ETL e disponibilizar os dados em camada adequada.

1. Ler o arquivos csv da pasta no Datalake [ok]
2. Renomear colunas para português usando snake case [ok]
3. Definir a tipagem de colunas correta [ok]
4. Coluna com data de cosumo do arquivo [ok]
5. Salvar dados em formato parquet na camada adequada [ok]

In [1]:
%run ../../Configuração-e-Utilitários/Variaveis.ipynb

In [2]:
%run ../../Configuração-e-Utilitários/Funções.ipynb

### Lendo os arquivos

In [5]:
eventos_df = (spark.read
               .option("header", "true")
               .option("inferSchema", "true")
               .option("encoding", "UTF-8")
               #.schema(schema)
               .csv(f"{caminho_bronze}/eventos.csv") #/eventos-*.csv")  # O asterisco representa qualquer coisa após o underline ("_")
).drop('_c0')

# Imprimindo o df
eventos_df.show()

+--------------------+--------------------+--------------------+--------------------+-----+--------------+--------+-------+
|                time|                team|              player|              assist| type|        detail|comments|id_game|
+--------------------+--------------------+--------------------+--------------------+-----+--------------+--------+-------+
|{'elapsed': 12, '...|{'id': 127, 'name...|{'id': 1771, 'nam...|{'id': 30408, 'na...| Goal|   Normal Goal|    NULL|1005649|
|{'elapsed': 17, '...|{'id': 127, 'name...|{'id': 10174, 'na...|{'id': None, 'nam...| Card|   Yellow Card|Argument|1005649|
|{'elapsed': 23, '...|{'id': 147, 'name...|{'id': 10242, 'na...|{'id': None, 'nam...| Card|   Yellow Card|    Foul|1005649|
|{'elapsed': 31, '...|{'id': 147, 'name...|{'id': 10400, 'na...|{'id': None, 'nam...| Card|   Yellow Card|    Foul|1005649|
|{'elapsed': 51, '...|{'id': 147, 'name...|{'id': 41290, 'na...|{'id': 77821, 'na...|subst|Substitution 1|    NULL|1005649|
|{'elaps

In [6]:
eventos_df.printSchema()

root
 |-- time: string (nullable = true)
 |-- team: string (nullable = true)
 |-- player: string (nullable = true)
 |-- assist: string (nullable = true)
 |-- type: string (nullable = true)
 |-- detail: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- id_game: integer (nullable = true)



### Criando schema e acessando dados de colunas aninhadas

##### Verificando colunas aninhadas

In [7]:
print('Colunas para alteração: \n',
    eventos_df.select(eventos_df.time).collect()[0], '\n',
    eventos_df.select(eventos_df.team).collect()[0], '\n',
    eventos_df.select(eventos_df.player).collect()[0], '\n',
    eventos_df.select(eventos_df.assist).collect()[0], '\n'
)

eventos_df.show()

Colunas para alteração: 
 Row(time="{'elapsed': 12, 'extra': 0}") 
 Row(team="{'id': 127, 'name': 'Flamengo', 'logo': 'https://media-4.api-sports.io/football/teams/127.png'}") 
 Row(player="{'id': 1771, 'name': 'Ayrton Lucas'}") 
 Row(assist="{'id': 30408, 'name': 'Gerson'}") 

+--------------------+--------------------+--------------------+--------------------+-----+--------------+--------+-------+
|                time|                team|              player|              assist| type|        detail|comments|id_game|
+--------------------+--------------------+--------------------+--------------------+-----+--------------+--------+-------+
|{'elapsed': 12, '...|{'id': 127, 'name...|{'id': 1771, 'nam...|{'id': 30408, 'na...| Goal|   Normal Goal|    NULL|1005649|
|{'elapsed': 17, '...|{'id': 127, 'name...|{'id': 10174, 'na...|{'id': None, 'nam...| Card|   Yellow Card|Argument|1005649|
|{'elapsed': 23, '...|{'id': 147, 'name...|{'id': 10242, 'na...|{'id': None, 'nam...| Card|   Yellow 

##### Criando schema das colunas aninhadas

In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema_time = StructType(
    fields=[
        StructField('elapsed', IntegerType()),
        StructField('extra', IntegerType()),
    ]
)

schema_team = StructType(
    fields=[
        StructField('id', IntegerType()),
        StructField('name', StringType()),
        StructField('logo', StringType())
    ]
)

schema_player_assist = StructType(
    fields=[
        StructField('id', IntegerType()),
        StructField('name', StringType())
    ]
)

##### Alteração do schema

In [9]:
from pyspark.sql.functions import from_json

eventos_df = eventos_df.withColumn('time', from_json(eventos_df.time, schema_time))
eventos_df = eventos_df.withColumn('team', from_json(eventos_df.team, schema_team))
eventos_df = eventos_df.withColumn('player', from_json(eventos_df.player, schema_player_assist))
eventos_df = eventos_df.withColumn('assist', from_json(eventos_df.assist, schema_player_assist))

eventos_df.printSchema()

root
 |-- time: struct (nullable = true)
 |    |-- elapsed: integer (nullable = true)
 |    |-- extra: integer (nullable = true)
 |-- team: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- logo: string (nullable = true)
 |-- player: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |-- assist: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- detail: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- id_game: integer (nullable = true)



### Formatando colunas aninhadas para o formato tabular

In [10]:
from pyspark.sql.functions import explode, array

# Coluna TEAM
eventos_df = eventos_df.withColumn('team_id', explode(array('team.id')) )
eventos_df = eventos_df.withColumn('team_name', explode(array('team.name')) )
eventos_df = eventos_df.withColumn('team_logo', explode(array('team.logo')) )

# Coluna TIME
eventos_df = eventos_df.withColumn('time_elapsed', explode(array('time.elapsed')) )
eventos_df = eventos_df.withColumn('time_extra', explode(array('time.extra')) )

# Coluna PLAYER
eventos_df = eventos_df.withColumn('player_id', explode(array('player.id')) )
eventos_df = eventos_df.withColumn('player_name', explode(array('player.name')) )

# Coluna ASSIST
eventos_df = eventos_df.withColumn('assist_id', explode(array('assist.id')) )
eventos_df = eventos_df.withColumn('assist_name', explode(array('assist.name')) )


In [11]:
# Drop das colunas aninhadas
eventos_df = eventos_df.drop('time') \
                       .drop('team') \
                       .drop('player')\
                       .drop('assist')

In [12]:
eventos_df.show()
eventos_df.printSchema()

+-----+--------------+--------+-------+-------+---------+--------------------+------------+----------+---------+---------------+---------+--------------+
| type|        detail|comments|id_game|team_id|team_name|           team_logo|time_elapsed|time_extra|player_id|    player_name|assist_id|   assist_name|
+-----+--------------+--------+-------+-------+---------+--------------------+------------+----------+---------+---------------+---------+--------------+
| Goal|   Normal Goal|    NULL|1005649|    127| Flamengo|https://media-4.a...|          12|         0|     1771|   Ayrton Lucas|    30408|        Gerson|
| Card|   Yellow Card|Argument|1005649|    127| Flamengo|https://media-4.a...|          17|         0|    10174|Gabriel Barbosa|     NULL|          NULL|
| Card|   Yellow Card|    Foul|1005649|    147| Coritiba|https://media-4.a...|          23|         0|    10242|    Júnior Urso|     NULL|          NULL|
| Card|   Yellow Card|    Foul|1005649|    147| Coritiba|https://media-4.a..

### Renomeando as colunas

In [13]:
eventos_df = eventos_df \
    .withColumnRenamed("id_game", "id_partida") \
    .withColumnRenamed("team_id", "id_time") \
    .withColumnRenamed("player_id", "id_jogador") \
    .withColumnRenamed("assist_id", "id_assistencia") \
    .withColumnRenamed("type", "tipo_do_evento") \
    .withColumnRenamed("detail", "detalhe_do_evento") \
    .withColumnRenamed("comments", "comentarios") \
    .withColumnRenamed("player_name", "nome_do_jogador") \
    .withColumnRenamed("team_name", "nome_do_time") \
    .withColumnRenamed("team_logo", "logo_do_time") \
    .withColumnRenamed("time_elapsed", "minuto_do_evento") \
    .withColumnRenamed("time_extra", "minuto_do_evento_prorrogacao") \
    .withColumnRenamed("assist_name", "nome_da_assistancia")

eventos_df.show()

+--------------+-----------------+-----------+----------+-------+------------+--------------------+----------------+----------------------------+----------+---------------+--------------+-------------------+
|tipo_do_evento|detalhe_do_evento|comentarios|id_partida|id_time|nome_do_time|        logo_do_time|minuto_do_evento|minuto_do_evento_prorrogacao|id_jogador|nome_do_jogador|id_assistencia|nome_da_assistancia|
+--------------+-----------------+-----------+----------+-------+------------+--------------------+----------------+----------------------------+----------+---------------+--------------+-------------------+
|          Goal|      Normal Goal|       NULL|   1005649|    127|    Flamengo|https://media-4.a...|              12|                           0|      1771|   Ayrton Lucas|         30408|             Gerson|
|          Card|      Yellow Card|   Argument|   1005649|    127|    Flamengo|https://media-4.a...|              17|                           0|     10174|Gabriel Barb

### Tratamento dos valores das colunas

In [14]:
eventos_df.printSchema()

root
 |-- tipo_do_evento: string (nullable = true)
 |-- detalhe_do_evento: string (nullable = true)
 |-- comentarios: string (nullable = true)
 |-- id_partida: integer (nullable = true)
 |-- id_time: integer (nullable = true)
 |-- nome_do_time: string (nullable = true)
 |-- logo_do_time: string (nullable = true)
 |-- minuto_do_evento: integer (nullable = true)
 |-- minuto_do_evento_prorrogacao: integer (nullable = true)
 |-- id_jogador: integer (nullable = true)
 |-- nome_do_jogador: string (nullable = true)
 |-- id_assistencia: integer (nullable = true)
 |-- nome_da_assistancia: string (nullable = true)



### Criando coluna com data de consumo

In [15]:
eventos_df = adicionar_data_de_ingestao(
    df = eventos_df,
    nome_coluna = 'data_de_ingestao',
    add_tempo = -3,
    unidade_de_medida = 'horas'
)

eventos_df.show()

+--------------+-----------------+-----------+----------+-------+------------+--------------------+----------------+----------------------------+----------+---------------+--------------+-------------------+----------------+
|tipo_do_evento|detalhe_do_evento|comentarios|id_partida|id_time|nome_do_time|        logo_do_time|minuto_do_evento|minuto_do_evento_prorrogacao|id_jogador|nome_do_jogador|id_assistencia|nome_da_assistancia|data_de_ingestao|
+--------------+-----------------+-----------+----------+-------+------------+--------------------+----------------+----------------------------+----------+---------------+--------------+-------------------+----------------+
|          Goal|      Normal Goal|       NULL|   1005649|    127|    Flamengo|https://media-4.a...|              12|                           0|      1771|   Ayrton Lucas|         30408|             Gerson|      2024-05-09|
|          Card|      Yellow Card|   Argument|   1005649|    127|    Flamengo|https://media-4.a...| 

### Salvando em formato parquet

In [17]:
eventos_df.write.mode("Overwrite").parquet(f"{caminho_silver}/eventos")

In [18]:
dbutils.notebook.exit('Os arquivos da pasta "eventos" foram processados e carregados na camada silver.')