### **Extração da tabela de artilheiros**

**1 - Extraindo as tabelas de artilharia do site transfermarkt**

Essa etapa foi feita no ambiente google colab, abaixo está apenas o código usado.

In [0]:
import requests
from bs4 import BeautifulSoup
import csv
import time
import random

# Função para buscar dados de uma página específica
def get_page_data(page_num, season, seen_players):
    url = f'https://www.transfermarkt.com.br/serie-a/torschuetzenliste/wettbewerb/IT1/ajax/yw1/saison_id/{season}/page/{page_num}/sort/goal'

    # Define os headers para simular um navegador web
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
    }

    # Envia a requisição para obter o conteúdo da página
    response = requests.get(url, headers=headers)

    # Checa se a requisição foi bem-sucedida
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print(f"Erro HTTP: {err}")
        return [], False  # Retorna uma lista vazia se houver erro

    # Parseia o HTML da página
    soup = BeautifulSoup(response.text, 'html.parser')

    # Localiza a tabela de artilheiros
    table = soup.find('table', {'class': 'items'})
    if not table:
        return [], False  # Se a tabela não for encontrada, retorna uma lista vazia e False

    # Extrai os dados da tabela
    players_data = []

    for row in table.find_all('tr')[1:]:  # Ignora o cabeçalho da tabela (linha 0)
        cols = row.find_all('td')
        if len(cols) >= 8:  # Garantir que há dados na linha
            ranking = cols[0].text.strip()  # Posição no ranking
            player = cols[1].text.strip().split('\n\n\n')[0]  # Nome do jogador
            position = cols[1].text.strip().split('\n\n\n')[1]  # Posição do jogador
            goals = cols[9].text.strip()  # Gols
            matches = cols[8].text.strip()  # Número de partidas

            # Verifica se o jogador já foi adicionado
            if player not in seen_players:
                seen_players.add(player)  # Marca o jogador como visto
                players_data.append((ranking, player, position, goals, matches))

    return players_data

# Função principal para buscar dados de todas as páginas (1 até 12) e salvar em CSV
def scrape_and_save_to_csv(season, file_name):
    all_players_data = []
    seen_players = set()  # Cria um conjunto para armazenar jogadores já vistos

    # Loop de 1 a 12 para pegar as páginas
    for page_num in range(1, 13):  # A partir da página 1 até a 12
        print(f'Buscando dados da página {page_num} para a temporada {season}...')
        players_data = get_page_data(page_num, season, seen_players)

        if not players_data:
            print(f'Não há dados ou erro na página {page_num} da temporada {season}.')
            break  # Se não retornar dados, paramos

        all_players_data.extend(players_data)

        time.sleep(random.uniform(1, 3))  # Atraso aleatório entre as requisições

    # Exporta os dados para um arquivo CSV com o nome especificado
    with open(file_name, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(['Ranking', 'Jogador', 'Posição', 'Gols', 'Partidas'])  # Cabeçalho
        writer.writerows(all_players_data)  # Escreve os dados

    print(f'Dados exportados para "{file_name}"')

# Função para fazer o scraping para todas as temporadas entre 2010 e 2023
def scrape_all_seasons():
    for season in range(2010, 2024):  # Temporadas de 2010 até 2023
        file_name = f'artilheiros_serie_a_{season}.csv'  # Nome do arquivo CSV com base na temporada
        scrape_and_save_to_csv(season, file_name)

# Chama a função para fazer o scraping de todas as temporadas
scrape_all_seasons()


### **Carga da tabela de artilheiros**

**2 - Importando CSV de Artilheiros**


Foi feito um "upload data to dbfs" dos arquivos gerados pelo código acima. 

In [0]:
art2010 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2010.csv")
art2011 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2011.csv")
art2012 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2012.csv")
art2013 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2013.csv")
art2014 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2014.csv")
art2015 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2015.csv")
art2016 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2016.csv")
art2017 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2017.csv")
art2018 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2018.csv")
art2019 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2019.csv")
art2020 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2020.csv")
art2021 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2021.csv")
art2022 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2022.csv")
art2023 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/artilheiros/artilheiros_serie_a_2023.csv")


### **Transformação da tabela de artilheiros**

Ajustando as colunas e a temporada

In [0]:
from pyspark.sql.functions import col, lit

# Função para ajustar o tipo de dados das colunas, renomear a coluna 'posição' para 'posicao' e adicionar a coluna temporada
def ajustar_tipos(df):
    return df \
        .withColumn("ranking", col("ranking").cast("int")) \
        .withColumn("gols", col("gols").cast("int")) \
        .withColumn("partidas", col("partidas").cast("int")) \
        .withColumnRenamed("posição", "posicao")  # Renomeando a coluna "posição" para "posicao"

# Aplicando a função para ajustar os tipos de dados nas tabelas (para cada ano)
art2010 = ajustar_tipos(art2010).withColumn("temporada", lit("2010-2011"))
art2011 = ajustar_tipos(art2011).withColumn("temporada", lit("2011-2012"))
art2012 = ajustar_tipos(art2012).withColumn("temporada", lit("2012-2013"))
art2013 = ajustar_tipos(art2013).withColumn("temporada", lit("2013-2014"))
art2014 = ajustar_tipos(art2014).withColumn("temporada", lit("2014-2015"))
art2015 = ajustar_tipos(art2015).withColumn("temporada", lit("2015-2016"))
art2016 = ajustar_tipos(art2016).withColumn("temporada", lit("2016-2017"))
art2017 = ajustar_tipos(art2017).withColumn("temporada", lit("2017-2018"))
art2018 = ajustar_tipos(art2018).withColumn("temporada", lit("2018-2019"))
art2019 = ajustar_tipos(art2019).withColumn("temporada", lit("2019-2020"))
art2020 = ajustar_tipos(art2020).withColumn("temporada", lit("2020-2021"))
art2021 = ajustar_tipos(art2021).withColumn("temporada", lit("2021-2022"))
art2022 = ajustar_tipos(art2022).withColumn("temporada", lit("2022-2023"))
art2023 = ajustar_tipos(art2023).withColumn("temporada", lit("2023-2024"))


Criando uma única tabela com todos os artilheiros

In [0]:
tabela_artilheiros = art2010.union(art2011).union(art2012).union(art2013).union(art2014).union(art2015).union(art2016).union(art2017).union(art2018).union(art2019).union(art2020).union(art2021).union(art2022).union(art2023)


--------------------------------------------------------------------------------------------

**3 - Testando a qualidade dos dados de artilharia importados**

Checar se existem jogadores repetidos na mesma temporada:

In [0]:
#Não podem existir jogadores com o mesmo nome na mesma temporada
tabela_artilheiros.groupBy("jogador","temporada").count().filter("count > 1").show()

+-------+---------+-----+
|jogador|temporada|count|
+-------+---------+-----+
+-------+---------+-----+



Checar partidas maiores que 38 ou negativas:

In [0]:
#Em uma temporada só existem 38 partidas
tabela_artilheiros.filter((col("partidas") > 38) |(col("partidas") < 0)) .show()


+-------+-------+-------+----+--------+---------+
|ranking|Jogador|posicao|gols|partidas|temporada|
+-------+-------+-------+----+--------+---------+
+-------+-------+-------+----+--------+---------+



Checar gols negativos:

In [0]:
#Checar gols negativos

#Não podem existir valores negativos de gols
tabela_artilheiros.filter(col("gols") < 0).show()

+-------+-------+-------+----+--------+---------+
|ranking|Jogador|posicao|gols|partidas|temporada|
+-------+-------+-------+----+--------+---------+
+-------+-------+-------+----+--------+---------+



Checar valores nulos:

In [0]:
#Não podem existir valores nulos na tabela
from pyspark.sql import functions as F

tabela_artilheiros.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in tabela_artilheiros.columns]).show()

+-------+-------+-------+----+--------+---------+
|ranking|Jogador|posicao|gols|partidas|temporada|
+-------+-------+-------+----+--------+---------+
|      0|      0|      0|   0|       0|        0|
+-------+-------+-------+----+--------+---------+



Checar valores iguais a 0:

In [0]:
#Não podem existir valores 0 na tabela
from pyspark.sql.functions import count, when

zeros = tabela_artilheiros.select([count(when(col(c) == 0, c)).alias(c) for c in tabela_artilheiros.columns])

zeros.show()

+-------+-------+-------+----+--------+---------+
|ranking|Jogador|posicao|gols|partidas|temporada|
+-------+-------+-------+----+--------+---------+
|      0|      0|      0|   0|       0|        0|
+-------+-------+-------+----+--------+---------+



--------------------------------------------------------------------------------------------

**4 - Respondendo as perguntas sobre artilharia**

**Quais os jogadores com mais gols?**

In [0]:
# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("gols_por_jogador")

# Comando SQL para somar os gols, somar as partidas e calcular a média de gols por partida
query = """
    SELECT
        jogador,
        SUM(gols) AS total_gols,
        SUM(partidas) AS total_partidas,
        SUM(gols) / SUM(partidas) AS media_gols_por_partida
    FROM gols_por_jogador
    GROUP BY jogador
    ORDER BY total_gols DESC
"""

# Executando a consulta SQL no PySpark
gols_por_jogador = spark.sql(query)

# Exibindo o resultado
gols_por_jogador.show()


+-------------------+----------+--------------+----------------------+
|            jogador|total_gols|total_partidas|media_gols_por_partida|
+-------------------+----------+--------------+----------------------+
|     Ciro Immobile |       201|           350|    0.5742857142857143|
|Fabio Quagliarella |       130|           376|   0.34574468085106386|
|   Gonzalo Higuaín |       125|           224|    0.5580357142857143|
|      Paulo Dybala |       123|           324|   0.37962962962962965|
|  Domenico Berardi |       122|           314|    0.3885350318471338|
|      Mauro Icardi |       121|           219|    0.5525114155251142|
|      Duván Zapata |       121|           319|    0.3793103448275862|
|     Dries Mertens |       113|           295|   0.38305084745762713|
|    Andrea Belotti |       112|           299|    0.3745819397993311|
| Antonio Di Natale |       107|           193|    0.5544041450777202|
|        Edin Džeko |       107|           268|   0.39925373134328357|
|     

**Qual jogador fez mais gols?**

In [0]:
#A consulta abaixo trará o nome do jogador que fez mais gols.
gols_por_jogador.show(1)

+--------------+----------+--------------+----------------------+
|       jogador|total_gols|total_partidas|media_gols_por_partida|
+--------------+----------+--------------+----------------------+
|Ciro Immobile |       201|           350|    0.5742857142857143|
+--------------+----------+--------------+----------------------+
only showing top 1 row



**Quais os artilheiros de cada temporada?**

In [0]:
#A lista de artilheiros tem que necessariamente ter todos os jogadores que ficaram em primeiro no ranking de cada temporada.

# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("vencedores_art_view")

# Comando SQL para somar os gols de cada jogador
query = """
    SELECT 
        jogador,temporada,ranking,gols
    FROM vencedores_art_view
    where ranking=1
    ORDER BY temporada ASC
"""

# Executando a consulta SQL
lista_de_artilheiros = spark.sql(query)

display(lista_de_artilheiros)

jogador,temporada,ranking,gols
Antonio Di Natale,2010-2011,1,28
Zlatan Ibrahimović,2011-2012,1,28
Edinson Cavani,2012-2013,1,29
Ciro Immobile,2013-2014,1,22
Mauro Icardi,2014-2015,1,22
Gonzalo Higuaín,2015-2016,1,36
Edin Džeko,2016-2017,1,29
Ciro Immobile,2017-2018,1,29
Fabio Quagliarella,2018-2019,1,26
Ciro Immobile,2019-2020,1,36


**Qual jogador foi mais vezes artilheiro?**

In [0]:
# Filtrar os jogadores que estão no topo do ranking (ranking == 1)
artilheiro = lista_de_artilheiros.filter(col("ranking") == 1)

# Contar quantas vezes cada jogador foi primeiro do ranking
artilheiros_count = lista_de_artilheiros.groupBy("jogador").count()

# Ordenar pelo número de vezes que o jogador foi primeiro do ranking, selecionando apenas o primeiro
artilheiros_count.orderBy(col("count"), ascending=False).show(1)


+--------------+-----+
|       jogador|count|
+--------------+-----+
|Ciro Immobile |    4|
+--------------+-----+
only showing top 1 row



**Qual a maior média total de gols por partida?**

In [0]:
#A consulta abaixo trará os valores de médias de gols por jogador, analisando os resultados sem filtro, vi que existiam alguns outliers, jogadores que apenas jogaram 1 ou 2 jogos e ficaram com médias altas, por isso optei por um filtro de pelo menos 10 partidas.

# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("tabela_artilheiros")

# Comando SQL para calcular a média de gols por partida, considerando um número mínimo de 10 partidas jogadas
query = """
    SELECT 
        jogador,  
        SUM(gols) / SUM(partidas) AS media_gols_por_partida
    FROM tabela_artilheiros
    where partidas>10
    GROUP BY jogador
    ORDER BY media_gols_por_partida DESC
    LIMIT 1
"""

# Executando a consulta SQL no PySpark
media_gols_por_partida = spark.sql(query)

# Exibindo o resultado
media_gols_por_partida.show()

+------------------+----------------------+
|           jogador|media_gols_por_partida|
+------------------+----------------------+
|Cristiano Ronaldo |    0.8350515463917526|
+------------------+----------------------+



Esse filtro de 10 partidas foi necessário pois numa primeira checagem existiam alguns jogadores desconhecidos que jogaram poucas partidas e tinham feito muitos gols, ficando com uma média muito alta, ao adicionar um número mínimo de partidas os resultados ficaram mais razoáveis com o que era esperado.

**Qual a maior média de gols por partida por temporada?**

In [0]:
#A consulta abaixo trará a média de gols por temporada, considerando um número mínimo de partidas, para evitar outliers.
from pyspark.sql import functions as F

# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("tabela_artilheiros")

# Comando SQL para calcular a média de gols por partida e filtrar o maior de cada temporada
query = """
    SELECT 
        jogador,
        temporada,
        media_gols_por_partida
    FROM (
        SELECT 
            jogador,
            temporada,
            SUM(gols) / SUM(partidas) AS media_gols_por_partida,
            ROW_NUMBER() OVER (PARTITION BY temporada ORDER BY SUM(gols) / SUM(partidas) DESC) AS row_num
        FROM tabela_artilheiros
        where partidas>10
        GROUP BY jogador, temporada
    ) AS ranked
    WHERE row_num = 1
    ORDER BY temporada ASC
"""

# Executando a consulta SQL no PySpark
maior_media_por_temporada = spark.sql(query)

# Exibindo o resultado
maior_media_por_temporada.show()


+-------------------+---------+----------------------+
|            jogador|temporada|media_gols_por_partida|
+-------------------+---------+----------------------+
| Antonio Di Natale |2010-2011|    0.7777777777777778|
|Zlatan Ibrahimović |2011-2012|                 0.875|
|   Mario Balotelli |2012-2013|    0.9230769230769231|
|    Giuseppe Rossi |2013-2014|    0.7619047619047619|
|      Carlos Tevez |2014-2015|                 0.625|
|   Gonzalo Higuaín |2015-2016|    1.0285714285714285|
|     Dries Mertens |2016-2017|                   0.8|
|     Ciro Immobile |2017-2018|    0.8787878787878788|
|Fabio Quagliarella |2018-2019|    0.7027027027027027|
|     Ciro Immobile |2019-2020|     0.972972972972973|
| Cristiano Ronaldo |2020-2021|    0.8787878787878788|
|     Ciro Immobile |2021-2022|    0.8709677419354839|
|    Victor Osimhen |2022-2023|                0.8125|
|  Lautaro Martínez |2023-2024|    0.7272727272727273|
+-------------------+---------+----------------------+



**Qual o maior artilheiro por posição?**

In [0]:
#A consulta abaixo trará o jogador com maior quantidade de gols por posição

# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("gols_por_posicao")

# Comando SQL para somar os gols por posição e por jogador
query = """
    SELECT 
        posicao,
        jogador,
        SUM(gols) AS total_gols
    FROM gols_por_posicao
    GROUP BY posicao, jogador
    ORDER BY posicao, total_gols DESC
"""

# Executando a consulta SQL no PySpark
gols_por_posicao = spark.sql(query)

# Agora, aplicamos o RANK() para encontrar o jogador com mais gols em cada posição
query_com_rank = """
    SELECT 
        posicao,
        jogador,
        total_gols
    FROM (
        SELECT 
            posicao,
            jogador,
            total_gols,
            RANK() OVER (PARTITION BY posicao ORDER BY total_gols DESC) as rank
        FROM (
            SELECT 
                posicao,
                jogador,
                SUM(gols) AS total_gols
            FROM gols_por_posicao
            GROUP BY posicao, jogador
        ) t
    ) t2
    WHERE rank = 1
"""

# Executando a consulta SQL com RANK()
gols_por_posicao_com_rank = spark.sql(query_com_rank)

# Exibindo o resultado
gols_por_posicao_com_rank.show()


+--------------+--------------------+----------+
|       posicao|             jogador|total_gols|
+--------------+--------------------+----------+
|  Centroavante|      Ciro Immobile |       201|
|       Goleiro|   Alberto Brignoli |         1|
|  Lateral Dir.|Alessandro Florenzi |        28|
|  Lateral Esq.|       Robin Gosens |        28|
|  Meia Central|       Marek Hamsik |        70|
|  Meia Direita|      Juan Cuadrado |        43|
| Meia Esquerda|        Senad Lulic |        28|
| Meia Ofensivo|       Josip Ilicic |        96|
| Ponta Direita|   Domenico Berardi |       122|
|Ponta Esquerda|    Lorenzo Insigne |        96|
| Seg. Atacante|       Paulo Dybala |       123|
|       Volante|   Hakan Çalhanoğlu |        45|
|      Zagueiro|   Leonardo Bonucci |        31|
+--------------+--------------------+----------+



A pergunta abaixo eu não tinha pensado originalmente, mas checando a tabela de artilheiros achei interessante ver os destaques negativos também.

**Qual o centroavante com a pior média total de gols?**

In [0]:
#Essa pergunta não pode ser respondida completamente, pois os dados de artilharia contêm apenas jogadores que fizeram gols.

# Registrar o DataFrame como uma view temporária
tabela_artilheiros.createOrReplaceTempView("tabela_artilheiros")

# Comando SQL para calcular a média de gols por partida, considerando um número mínimo de 10 partidas jogadas
query = """
    SELECT 
        jogador,  
        SUM(gols) / SUM(partidas) AS media_gols_por_partida
    FROM tabela_artilheiros
    where partidas>10 and posicao ="Centroavante"
    GROUP BY jogador
    ORDER BY media_gols_por_partida ASC
    LIMIT 1
"""

# Executando a consulta SQL no PySpark
centroavante_menor_media_gols_por_partida = spark.sql(query)

# Exibindo o resultado
centroavante_menor_media_gols_por_partida.show()

+-----------+----------------------+
|    jogador|media_gols_por_partida|
+-----------+----------------------+
|Lucas Boyé |   0.03333333333333333|
+-----------+----------------------+



**Qual o artilheiro de cada time por temporada?**

Essa pergunta não pode ser respondida, pois durante a extração dos dados de jogadores do site transfermarkt, não foi possível criar uma coluna pois os nomes dos times eram apenas imagens e não uma string para ser copiada.

**Quem foi o maior marcador italiano?**

Essa pergunta não pode ser respondida, pois durante a extração dos dados de jogadores do site transfermarkt, não foi possível criar uma coluna pois as nacionalidades eram apenas imagens e não uma string para ser copiada, além de existirem jogadores com mais de uma nacionalidade.

**Quem foi o maior marcador brasileiro?**

Essa pergunta não pode ser respondida, pois durante a extração dos dados de jogadores do site transfermarkt, não foi possível criar uma coluna pois as nacionalidades eram apenas imagens e não uma string para ser copiada, além de existirem jogadores com mais de uma nacionalidade.

--------------------------------------------------------------------------------------------

### **Extração da tabela dos campeonatos**

**5 - Extraindo as tabelas do campeonato do site transfermarkt**

Essa etapa foi feita no ambiente google colab, abaixo está apenas o código usado.

In [0]:
import requests
from bs4 import BeautifulSoup
import csv

# Função para buscar dados da tabela da página específica
def get_teams_data(season):
    url = f'https://www.transfermarkt.com.br/serie-a/tabelle/wettbewerb/IT1/saison_id/{season}'

    # Define os headers para simular um navegador web
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
    }

    # Envia a requisição para obter o conteúdo da página
    response = requests.get(url, headers=headers)

    # Checa se a requisição foi bem-sucedida
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print(f"Erro HTTP: {err}")
        return [], False  # Retorna uma lista vazia se houver erro

    # Parseia o HTML da página
    soup = BeautifulSoup(response.text, 'html.parser')

    # Localiza a tabela de classificação
    table = soup.find('table', {'class': 'items'})

    if not table:
        print(f"Tabela de times para a temporada {season} não encontrada.")
        return [], False  # Se a tabela não for encontrada, retorna uma lista vazia e False

    # Extrai os dados da tabela de times
    teams_data = []

    for row in table.find_all('tr')[1:]:  # Ignora o cabeçalho da tabela (linha 0)
        cols = row.find_all('td')
        if len(cols) >= 10:  # Verifica se a linha tem dados suficientes (incluindo GP e GC)
            position = cols[0].text.strip()  # Posição do time
            team = cols[2].find('a').text.strip()  # Nome do time
            games_played = cols[3].text.strip()  # Jogos disputados
            wins = cols[4].text.strip()  # Vitórias
            draws = cols[5].text.strip()  # Empates
            losses = cols[6].text.strip()  # Derrotas
            goals_for = cols[7].text.strip().split(':')[0]  # Gols pró (GP)
            goals_against = cols[7].text.strip().split(':')[1]  # Gols contra (GC)
            sg = cols[8].text.strip()  # Saldo de gols
            points = cols[9].text.strip()  # Pontos

            teams_data.append((position, team, games_played, wins, draws, losses, goals_for, goals_against, sg, points))

    return teams_data

# Função para salvar os dados em um arquivo CSV
def save_to_csv(teams_data, season):
    if not teams_data:
        print("Nenhum dado para salvar.")
        return

    file_name = f'times_serie_a_{season}.csv'

    try:
        with open(file_name, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(['Posição', 'Time', 'Jogos', 'Vitórias', 'Empates', 'Derrotas', 'GP', 'GC', 'SG', 'Pontos'])  # Cabeçalho
            writer.writerows(teams_data)  # Escreve os dados
        print(f'Dados exportados para "{file_name}"')
    except Exception as e:
        print(f"Erro ao salvar o arquivo CSV: {e}")

# Função principal para buscar e salvar dados de times de várias temporadas
def scrape_and_save_teams():
    for season in range(2010, 2024):  # De 2010 até 2023
        print(f"Buscando dados da temporada {season}...")
        teams_data = get_teams_data(season)
        if teams_data:
            save_to_csv(teams_data, season)
        else:
            print(f"Não há dados para a temporada {season}.")

# Executa o processo de scraping e salva os dados
scrape_and_save_teams()


### **Carga da tabela dos campeonatos**

**6 - Importando CSV das classificações dos times nos campeonatos**


Foi feito um "upload data to dbfs" dos arquivos gerados pelo código acima. 

In [0]:
camp2010 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2010.csv")
camp2011 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2011.csv")
camp2012 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2012.csv")
camp2013 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2013.csv")
camp2014 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2014.csv")
camp2015 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2015.csv")
camp2016 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2016.csv")
camp2017 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2017.csv")
camp2018 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2018.csv")
camp2019 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2019.csv")
camp2020 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2020.csv")
camp2021 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2021.csv")
camp2022 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2022.csv")
camp2023 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/classificacao/times_serie_a_2023.csv")


### **Transformação da tabela dos campeonatos**

Ajustando tipos de colunas e adicionando a temporada

In [0]:
from pyspark.sql.functions import col, lit

# Função para ajustar o tipo de dados das colunas e adicionar a coluna temporada
def ajustar_tipos(df):
    return df \
        .withColumn("posição", col("posição").cast("int")) \
        .withColumn("gp", col("gp").cast("int")) \
        .withColumn("gc", col("gc").cast("int")) \
        .withColumn("sg", col("sg").cast("int")) \
        .withColumn("jogos", col("jogos").cast("int")) \
        .withColumn("vitórias", col("vitórias").cast("int")) \
        .withColumn("empates", col("empates").cast("int")) \
        .withColumn("derrotas", col("derrotas").cast("int")) \
        .withColumn("pontos", col("pontos").cast("int")) \
        .withColumnRenamed("vitórias", "vitorias")  # Renomeando a coluna "vitórias" para "vitorias"

# Aplicando a função para ajustar os tipos de dados nas tabelas (para cada ano)
camp2010 = ajustar_tipos(camp2010).withColumn("temporada", lit("2010-2011"))
camp2011 = ajustar_tipos(camp2011).withColumn("temporada", lit("2011-2012"))
camp2012 = ajustar_tipos(camp2012).withColumn("temporada", lit("2012-2013"))
camp2013 = ajustar_tipos(camp2013).withColumn("temporada", lit("2013-2014"))
camp2014 = ajustar_tipos(camp2014).withColumn("temporada", lit("2014-2015"))
camp2015 = ajustar_tipos(camp2015).withColumn("temporada", lit("2015-2016"))
camp2016 = ajustar_tipos(camp2016).withColumn("temporada", lit("2016-2017"))
camp2017 = ajustar_tipos(camp2017).withColumn("temporada", lit("2017-2018"))
camp2018 = ajustar_tipos(camp2018).withColumn("temporada", lit("2018-2019"))
camp2019 = ajustar_tipos(camp2019).withColumn("temporada", lit("2019-2020"))
camp2020 = ajustar_tipos(camp2020).withColumn("temporada", lit("2020-2021"))
camp2021 = ajustar_tipos(camp2021).withColumn("temporada", lit("2021-2022"))
camp2022 = ajustar_tipos(camp2022).withColumn("temporada", lit("2022-2023"))
camp2023 = ajustar_tipos(camp2023).withColumn("temporada", lit("2023-2024"))


Criando uma única tabela com todos os campeonatos

In [0]:
tabela_camp = camp2010.union(camp2011).union(camp2012).union(camp2013).union(camp2014).union(camp2015).union(camp2016).union(camp2017).union(camp2018).union(camp2019).union(camp2020).union(camp2021).union(camp2022).union(camp2023)



--------------------------------------------------------------------------------------------

**7 - Testando a qualidade dos dados dos campeonatos importados**

Checar se existem posições negativas ou maiores que 20:

In [0]:
#Em cada campeonato existem 20 times
tabela_camp.filter((col("posição") < 0) | (col("posição") > 20)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar times repetidos na mesma temporada:

In [0]:
# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("cont_tabela_camp")

# Comando SQL para mostrar a contagem de cada time por temporada
query = """
    SELECT
        time,
        temporada,
        COUNT(*) AS contagem
    FROM cont_tabela_camp
    GROUP BY time, temporada
    HAVING COUNT(*) > 1
"""

# Executando a consulta SQL no PySpark
cont_tabela_camp = spark.sql(query)

# Exibindo o resultado
cont_tabela_camp.show()


+----+---------+--------+
|time|temporada|contagem|
+----+---------+--------+
+----+---------+--------+



Checar se existem jogos diferentes de 38:

In [0]:
#Todos os times participam de 38 jogos em uma temporada
tabela_camp.filter(col("jogos") != 38).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar se existem vitórias negativas ou maiores do que 38:


In [0]:
#Um time não pode ter vitórias negativas e o valor máximo de vitórias é 38
tabela_camp.filter((col("vitorias") < 0) | (col("vitorias") > 38)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar se existem empates negativos ou maiores do que 38:

In [0]:
#Um time não pode ter empates negativos e o valor máximo de empates é 38
tabela_camp.filter((col("empates") < 0) | (col("empates") > 38)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar se existem derrotas negativas ou maiores do que 38:

In [0]:
#Um time não pode ter derrotas negativas e o valor máximo de derrotas é 38
tabela_camp.filter((col("derrotas") < 0) | (col("derrotas") > 38)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar gols pro negativos:

In [0]:
#Não podem existir gols pro negativos, o valor mínimo é 0
tabela_camp.filter(col("gp") < 0).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar gols contra negativos:

In [0]:
#Não podem existir gols contra negativos, o valor mínimo é 0
tabela_camp.filter(col("gc") < 0).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar se existem pontos negativos ou maiores que 114:

In [0]:
#Não podem existir pontos negativos, o valor mínimo é 0 e o valor máximo é 3*38 = 114
tabela_camp.filter((col("pontos") < 0) | (col("pontos") > 114)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Checar se existem valores nulos:

In [0]:
#Não podem existir valores nulos na tabela
tabela_camp.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in tabela_camp.columns]).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|      0|   0|    0|       0|      0|       0|  0|  0|  0|     0|        0|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



--------------------------------------------------------------------------------------------

**8 - Respondendo as perguntas sobre os campeonatos**

Abaixo está a tabela com todos os times e estatísticas acumuladas das temporadas

In [0]:
# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("tabela_camp_view")

# Comando SQL para somar os pontos e outras estatísticas para cada time
query = """
    SELECT 
        time,
        SUM(pontos) AS total_pontos,
        SUM(jogos) AS total_jogos,
        SUM(vitorias) AS total_vitorias,
        SUM(derrotas) AS total_derrotas,
        SUM(empates) AS total_empates,
        SUM(gp) AS total_gp,
        SUM(gc) AS total_gc
    FROM tabela_camp_view
    GROUP BY time
    ORDER BY total_pontos DESC
"""

# Executando a consulta SQL
pontos_por_time = spark.sql(query)

# Exibindo os resultados
display(pontos_por_time)


time,total_pontos,total_jogos,total_vitorias,total_derrotas,total_empates,total_gp,total_gc
Juventus,1149,532,350,73,109,976,421
Napoli,1049,532,311,105,116,1023,548
Inter,996,532,293,122,117,962,558
Milan,971,532,279,119,134,882,571
Roma,967,532,282,129,121,935,609
Lazio,914,532,269,156,107,884,642
Fiorentina,785,532,211,169,152,779,658
Atalanta,767,494,216,151,127,789,613
Udinese,663,532,173,215,144,658,736
Torino,598,456,149,155,152,601,607


**Qual time com maior quantidade de pontos?**

In [0]:
#A consulta abaixo trará o nome do time que fez a maior quantidade de pontos, considerando todas as temporadas.

pontos_por_time.show(1)

+--------+------------+-----------+--------------+--------------+-------------+--------+--------+
|    time|total_pontos|total_jogos|total_vitorias|total_derrotas|total_empates|total_gp|total_gc|
+--------+------------+-----------+--------------+--------------+-------------+--------+--------+
|Juventus|        1149|        532|           350|            73|          109|     976|     421|
+--------+------------+-----------+--------------+--------------+-------------+--------+--------+
only showing top 1 row



**Qual time com maior % de vitórias?**

In [0]:
#A consulta abaixo trará o nome do time com o maior percentual de vitórias considerando todas as temporadas.

# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("tabela_maior_perc_vitorias")

# Comando SQL para calcular o percentual de vitórias de cada time
query = """
    SELECT 
        time,
        (SUM(vitorias) / SUM(jogos)) * 100 AS perc_vitorias
    FROM tabela_maior_perc_vitorias
    GROUP BY time
    ORDER BY perc_vitorias DESC
"""

# Executando a consulta SQL
maior_percentual_vitorias_por_time = spark.sql(query)

# Exibindo os resultados
maior_percentual_vitorias_por_time.show(1)


+--------+-----------------+
|    time|    perc_vitorias|
+--------+-----------------+
|Juventus|65.78947368421053|
+--------+-----------------+
only showing top 1 row



A pergunta abaixo eu não tinha pensado originalmente, mas checando a tabela de campeonatos achei interessante ver os destaques negativos também.

**Qual time com menor % de vitórias?**

In [0]:
# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("tabela_menor_perc_vitorias")

# Comando SQL para calcular o percentual de vitórias de cada time
query = """
    SELECT 
        time,
        (SUM(vitorias) / SUM(jogos)) * 100 AS perc_vitorias
    FROM tabela_menor_perc_vitorias
    GROUP BY time
    ORDER BY perc_vitorias ASC
"""

# Executando a consulta SQL
menor_percentual_vitorias_por_time = spark.sql(query)

# Exibindo os resultados
menor_percentual_vitorias_por_time.show(1)


+-------+------------------+
|   time|     perc_vitorias|
+-------+------------------+
|Pescara|11.842105263157894|
+-------+------------------+
only showing top 1 row



**Qual time que fez mais gols?**

In [0]:
#A consulta abaixo trará o nome do time que fez mais gols considerando todas as temporadas.

# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("mais_gols_view")

# Comando SQL para somar os pontos e outras estatísticas para cada time
query = """
    SELECT 
        time,
        SUM(gp) AS total_gp
    FROM mais_gols_view
    GROUP BY time
    ORDER BY total_gp DESC
"""

# Executando a consulta SQL
mais_gols = spark.sql(query)

# Exibindo os resultados
mais_gols.show(1)


+------+--------+
|  time|total_gp|
+------+--------+
|Napoli|    1023|
+------+--------+
only showing top 1 row



**Qual time sofreu menos gols?**

In [0]:
#A consulta abaixo trará o nome do time que sofreu menos gols, porém como houveram times que foram rebaixados e não participaram de muitas temparadas, optei por considerar apenas os times que participaram de todas as temporadas, evitando distorções no resultado.

# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("menos_vazado_view")

# Comando SQL para somar os gols sofridos por cada time
query = """
    SELECT 
        time, 
        SUM(gc) AS total_gc
    FROM menos_vazado_view
    GROUP BY time
    HAVING SUM(jogos) = 532
    ORDER BY total_gc ASC
"""

# Executando a consulta SQL
menos_vazado = spark.sql(query)

# Exibindo os resultados
menos_vazado.show(1)

+--------+--------+
|    time|total_gc|
+--------+--------+
|Juventus|     421|
+--------+--------+
only showing top 1 row



A pergunta abaixo eu não tinha pensado originalmente, mas checando a tabela de campeonatos achei interessante ver os destaques negativos também.

**Qual time que fez menos gols?**

In [0]:
# Registrar o DataFrame como uma view temporária
tabela_camp.createOrReplaceTempView("menos_gols_view")

# Comando SQL para somar os pontos e outras estatísticas para cada time, filtrando por times que participaram de todas as temporadas
query = """
    SELECT 
        time,
        SUM(gp) AS total_gp
    FROM menos_gols_view
    GROUP BY time
    HAVING SUM(jogos)=532
    ORDER BY total_gp ASC
"""

# Executando a consulta SQL
menos_gols = spark.sql(query)

# Exibindo os resultados
menos_gols.show(1)


+-------+--------+
|   time|total_gp|
+-------+--------+
|Udinese|     658|
+-------+--------+
only showing top 1 row



Reparei que a quantidade de partidas entre diferentes times era muito diferente, então times que caíram para a segunda divisão e nunca mais subiram tinham poucos gols feitos, então pra isso decidi comparar apenas os times que jogaram todas as temporadas.

**Qual o maior saldo de gols numa temporada?**

In [0]:
#A consulta abaixo trará o nome do time que teve maior saldo de gols em uma temporada.

# Ordenando o DataFrame pela coluna 'sg' de forma decrescente e pegando a primeira linha
time_maior_sg = tabela_camp.orderBy(col("sg"), ascending=False).limit(1)

# Exibindo o time com maior quantidade de pontos
time_maior_sg.show()

+-------+-----+-----+--------+-------+--------+---+---+---+------+---------+
|posição| Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+-----+-----+--------+-------+--------+---+---+---+------+---------+
|      1|Inter|   38|      29|      7|       2| 89| 22| 67|    94|2023-2024|
+-------+-----+-----+--------+-------+--------+---+---+---+------+---------+



**Quais os campeões de cada temporada?**

In [0]:
#A consulta abaixo trará o nome do campeão de cada temporada.

tabela_camp.filter((col("posição") ==1)).display()

posição,Time,jogos,vitorias,empates,derrotas,gp,gc,sg,pontos,temporada
1,Milan,38,24,10,4,65,24,41,82,2010-2011
1,Juventus,38,23,15,0,68,20,48,84,2011-2012
1,Juventus,38,27,6,5,71,24,47,87,2012-2013
1,Juventus,38,33,3,2,80,23,57,102,2013-2014
1,Juventus,38,26,9,3,72,24,48,87,2014-2015
1,Juventus,38,29,4,5,75,20,55,91,2015-2016
1,Juventus,38,29,4,5,77,27,50,91,2016-2017
1,Juventus,38,30,5,3,86,24,62,95,2017-2018
1,Juventus,38,28,6,4,70,30,40,90,2018-2019
1,Juventus,38,26,5,7,76,43,33,83,2019-2020


**Teve algum campeão que ganhou todos os jogos na temporada?**

In [0]:
#Não teve nenhum time que ganhou todas as partidas
tabela_camp.filter((col("vitorias") ==38)).show()

+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
|posição|Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+
+-------+----+-----+--------+-------+--------+---+---+---+------+---------+



Como esperado, não houve nenhum time que ganhou todos os jogos em uma temporada.

**Teve algum campeão invicto?**

In [0]:
#A consulta abaixo trará o nome do time que foi campeão invicto em uma temporada.

tabela_camp.filter((col("derrotas") == 0) & (col("posição") == 1)).show()

+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+
|posição|    Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+
|      1|Juventus|   38|      23|     15|       0| 68| 20| 48|    84|2011-2012|
+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+



**Qual time ganhou com a maior quantidade de pontos em uma temporada?**

In [0]:
#A consulta abaixo trará o nome do time que fez a maior quantidade de pontos em uma única temporada.

# Ordenando o DataFrame pela coluna 'pontos' de forma decrescente e pegando a primeira linha
time_maior_pontos = tabela_camp.orderBy(col("pontos"), ascending=False).limit(1)

# Exibindo o time com maior quantidade de pontos
time_maior_pontos.show()

+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+
|posição|    Time|jogos|vitorias|empates|derrotas| gp| gc| sg|pontos|temporada|
+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+
|      1|Juventus|   38|      33|      3|       2| 80| 23| 57|   102|2013-2014|
+-------+--------+-----+--------+-------+--------+---+---+---+------+---------+



Abaixo está a lista de times que participaram de todas as temporadas

In [0]:
from pyspark.sql.functions import countDistinct

# Agrupar pelos times e contar as temporadas distintas
quantidade_temporadas = tabela_camp.groupBy("time").agg(countDistinct("temporada").alias("quantidade_temporadas"))

# No período importado tivemos 14 campeonatos, então temos que filtrar para mostrar apenas os times com quantidade_temporadas = 14
quantidade_temporadas_14 = quantidade_temporadas.filter(quantidade_temporadas["quantidade_temporadas"] == 14)

# Exibir os resultados
quantidade_temporadas_14.show()


+----------+---------------------+
|      time|quantidade_temporadas|
+----------+---------------------+
|     Lazio|                   14|
|      Roma|                   14|
|     Milan|                   14|
|Fiorentina|                   14|
|     Inter|                   14|
|    Napoli|                   14|
|  Juventus|                   14|
|   Udinese|                   14|
+----------+---------------------+



**Qual time mais vezes campeão?**

In [0]:
#A consulta abaixo trará o nome do time que foi campeão por mais vezes.

# Filtrar os times que foram campeões (posição == 1)
campeoes = tabela_camp.filter(col("posição") == 1)

# Contar quantas vezes cada time foi campeão
campeoes_count = campeoes.groupBy("time").count()

# Ordenar pelo número de campeonatos (do maior para o menor)
campeoes_count.orderBy(col("count"), ascending=False).show(1)


+--------+-----+
|    time|count|
+--------+-----+
|Juventus|    9|
+--------+-----+
only showing top 1 row



--------------------------------------------------------------------------------------------