This exercise was done using Databricks CE. Cluster with Runtime 12.2 LTS

----------

Databricks notebooks already initiate a Spark session for the user, and in this exercise we will use it.

In [0]:
import requests
from typing import Union, List

from pyspark.sql import Row
from pyspark.sql.functions import col, count, abs
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark

In [0]:
GITHUB_DATA_FILE_URL = "https://raw.githubusercontent.com/heri-macedo/mba-fiap/refs/heads/master/distributed_data_processing/resources/results.csv"

In [0]:
def create_dataframe_schema(column_list: List[str]) -> StructType:
    """
    Creates a StructType object to provide a schema to create a Spark Dataframe.

    Args:
        column_list: List[str] -> List os strings, each one is a column name
    Returns:
        schema: StructType
    """
    fields_list = []
    for column in column_list:
        fields_list.append(
            StructField(
                column, StringType(), True
            )
        )
    schema = StructType(fields_list)
    return schema

In [0]:
results_csv_file_content = requests.get(url=GITHUB_DATA_FILE_URL)
content = results_csv_file_content.content


In [0]:
decoded_content = content.decode('utf-8-sig')
lines = decoded_content.split("\n")
header_line = lines[0]

# Remove empty lines.
data_list = [
    tuple(line.split(","))
    for line in lines[1:]
    if line.strip()
]

data_rdd = spark.sparkContext.parallelize(data_list)
header_columns = header_line.split(",")

schema = create_dataframe_schema(column_list=header_columns)

results_df = spark.createDataFrame(data=data_rdd,schema=schema)

results_df = (
    results_df
    .withColumnRenamed("home_teamName", "home_team")
    .withColumnRenamed("away_teamName", "away_team")
    .withColumn("home_score", col("home_scoreHome").cast("int"))
    .withColumn("away_score", col("away_scoreAway").cast("int"))
    .withColumnRenamed("tournamentName", "tournament")
    .withColumnRenamed("cityCity", "city")
    .withColumnRenamed("countryCountry", "country")
    .drop("neutralTRUE\r")
    .drop("home_scoreHome")
    .drop("away_scoreAway")
    )

results_df.cache()
results_df.show(truncate=False)

+----------+----------------+---------+----------+---------+-------------------+----------+----------+
|date      |home_team       |away_team|tournament|city     |country            |home_score|away_score|
+----------+----------------+---------+----------+---------+-------------------+----------+----------+
|1872-11-30|Scotland        |England  |Friendly  |Glasgow  |Scotland           |0         |0         |
|1873-03-08|England         |Scotland |Friendly  |London   |England            |4         |2         |
|1874-03-07|Scotland        |England  |Friendly  |Glasgow  |Scotland           |2         |1         |
|1875-03-06|England         |Scotland |Friendly  |London   |England            |2         |2         |
|1876-03-04|Scotland        |England  |Friendly  |Glasgow  |Scotland           |3         |0         |
|1876-03-25|Scotland        |Wales    |Friendly  |Glasgow  |Scotland           |4         |0         |
|1877-03-03|England         |Scotland |Friendly  |London   |England      


# Question 1

##Quantos registros existem na base? 

In [0]:
total_count = results_df.count()
print(f"There are {total_count} records.")

There are 40839 records.


#Question 2

##Quantas equipes únicas mandantes existem na base?

In [0]:
distinct_home_teams = (
    results_df
    .select("home_team")
    .distinct()
)
count_distinct_home_teams = distinct_home_teams.count()
print(f"There are {count_distinct_home_teams} distinct home teams.")

There are 309 distinct home teams.


#Question 3

##Quantas vezes as equipes mandantes saíram vitoriosas?

In [0]:
results_df_q3 = (
  results_df
  .where(col("home_score") > col("away_score"))
)
total_home_team_wins = results_df_q3.count()
print(f"Home teams won {total_home_team_wins} times.")

Home teams won 19864 times.


#Question 4

##Quantas vezes as equipes visitantes saíram vitoriosas?

In [0]:
results_df_q4 = (
  results_df
  .where(col("away_score") > col("home_score"))
)
total_away_team_wins = results_df_q4.count()
print(f"Away teams won {total_away_team_wins} times.")

Away teams won 11544 times.


#Question 5

##Quantas partidas resultaram em empate?

In [0]:
results_df_q5 = (
  results_df
  .where(col("away_score") == col("home_score"))
)
total_draws = results_df_q5.count()
print(f"Away teams won {total_draws} times.")

Away teams won 9431 times.


#Question 6

##Quantas partidas foram realizadas em cada país?

In [0]:
results_df_q6 = (
    results_df
    .groupBy("country")
    .count()
    .orderBy("count", ascending=False)
)
results_df_q6.show(truncate=False)

+--------------------+-----+
|country             |count|
+--------------------+-----+
|United States       |1144 |
|France              |801  |
|England             |687  |
|Malaysia            |644  |
|Sweden              |637  |
|Germany             |581  |
|Brazil              |529  |
|Spain               |517  |
|Thailand            |483  |
|Italy               |480  |
|Switzerland         |477  |
|Austria             |475  |
|United Arab Emirates|472  |
|South Africa        |470  |
|Qatar               |467  |
|South Korea         |453  |
|Argentina           |449  |
|Hungary             |431  |
|Chile               |405  |
|Belgium             |396  |
+--------------------+-----+
only showing top 20 rows



# Question 7

## Qual país teve mais partidas?

In [0]:
results_df_q7 = (
    results_df
    .groupBy("country")
    .count()
    .orderBy("count", ascending=False)
    .limit(1)
)
top_matches_country = results_df_q7.collect()
top_country_name = top_matches_country[0]["country"]
total_matches =  top_matches_country[0]["count"]
print(f"Country were the highest number of matches took place is {top_country_name} with {total_matches} total matches.")

Country were the highest number of matches took place is United States with 1144 total matches.


# Question 8

## Qual a partida com maior número de gols?

In [0]:
results_df_q8 = (
    results_df
    .withColumn("total_goals", col("home_score") + col("away_score"))
    .orderBy("total_goals", ascending=False)
    .limit(1)
)
top_goals_match = results_df_q8.collect()
row = top_goals_match[0]
print(f"The highest-scoring match was {row.home_team} against {row.away_team} in {row.tournament} on {row.date} with {row.total_goals} goals.")
results_df_q8.show(truncate=False)

The highest-scoring match was Australia against American Samoa in FIFA World Cup qualification on 2001-04-11 with 31 goals.
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+-----------+
|date      |home_team|away_team     |tournament                  |city         |country  |home_score|away_score|total_goals|
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+-----------+
|2001-04-11|Australia|American Samoa|FIFA World Cup qualification|Coffs Harbour|Australia|31        |0         |31         |
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+-----------+



# Question 9

## Qual a maior goleada?

In [0]:
results_df_q9 = (
    results_df
    .withColumn("goals_diff", abs(col("home_score") - col("away_score")))
    .orderBy("goals_diff", ascending=False)
    .limit(1)
)
top_goals_diff_match = results_df_q9.collect()
row = top_goals_diff_match[0]
print(
    f"{row.home_team}’s {row.home_score}-{row.away_score} win over {row.away_team} on {row.date} was the biggest victory."
)
results_df_q9.show(truncate=False)

Australia’s 31-0 win over American Samoa on 2001-04-11 was the biggest victory.
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+----------+
|date      |home_team|away_team     |tournament                  |city         |country  |home_score|away_score|goals_diff|
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+----------+
|2001-04-11|Australia|American Samoa|FIFA World Cup qualification|Coffs Harbour|Australia|31        |0         |31        |
+----------+---------+--------------+----------------------------+-------------+---------+----------+----------+----------+



# Question 10

## Quantos jogos ocorreram no Brasil?

In [0]:
results_df_q10 = (
    results_df
    .where(col("country") == "Brazil")
    .groupBy("country")
    .count()
)

count_matches_brazil = results_df_q10.collect()
row = count_matches_brazil[0]
print(f"Number of games in Brazil: {row['count']}")
results_df_q10.show(truncate=False)

Number of games in Brazil: 529
+-------+-----+
|country|count|
+-------+-----+
|Brazil |529  |
+-------+-----+

