# **SETUP**

## Spark UI

In [None]:
!pip install -q pyspark
!pip install -q pyngrok

[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[K     |████████████████████████████████| 199 kB 48.4 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 745 kB 3.2 MB/s 
[?25h  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.ui.port', '4050').getOrCreate()
spark

In [None]:
# Fazer login no site https://dashboard.ngrok.com/get-started/setup para obter autenticação própria
ngrok_token = '2DAVBz1i9lgg7azFruTNntaLMVF_De65LN35iwB4nWkQJnMM'

In [None]:
get_ipython().system_raw(f'ngrok authtoken {ngrok_token}')
get_ipython().system_raw('ngrok http 4050 &')
!sleep 3
print('URL para interface Spark:')
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

URL para interface Spark:
https://ff58-104-196-60-87.ngrok.io


## Libraries

In [None]:
import pandas as pd
from google.colab import files

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Load data

In [None]:
# Load file
!gdown --id "1oyiYme7_7Ft44N5wv1NsmIXEr7hlldIv"

Downloading...
From: https://drive.google.com/uc?id=1oyiYme7_7Ft44N5wv1NsmIXEr7hlldIv
To: /content/vgsales.csv
100% 1.36M/1.36M [00:00<00:00, 144MB/s]


## Data Description

This dataset contains records of popular video games in North America, Japan, Europe and other parts of the world. Every video game in this dataset has at least 100k global sales.

[Source](https://www.kaggle.com/datasets/gregorut/videogamesales/code?datasetId=284&sortBy=voteCount) of dataset.

## Data Dictionary

| Column       | Explanation                                               |
|:-------------|:----------------------------------------------------------|
| Rank         | Ranking of overall sales                                  |
| Name         | Name of the game                                          |
| Platform     | Platform of the games release (i.e. PC,PS4, etc.)         |
| Year         | Year the game was released in                             |
| Genre        | Genre of the game                                         |
| Publisher    | Publisher of the game                                     |
| NA_Sales     | Number of sales in North America (in millions)            |
| EU_Sales     | Number of sales in Europe (in millions)                   |
| JP_Sales     | Number of sales in Japan (in millions)                    |
| Other_Sales  | Number of sales in other parts of the world (in millions) |
| Global_Sales | Number of total sales (in millions)                       |

# Questões

## Questão 1

Leia os dados com Spark, certificando-se que

1. A tabela tenha duas partições e

2. As colunas da tabela tenham este respectivo schema:

|Column|Data type|
|---|---|
|Rank|integer|
|Name|string|
|Platform|string|
|Year|integer|
|Genre|string|
|Publisher|string|
|NA_Sales|double|
|EU_Sales|double|
|JP_Sales|double|
|Other_Sales|double|
|Global_Sales|double|

In [None]:
# Carrecar dados com Spark
df = spark.read.csv('vgsales.csv', header=True, inferSchema=True)

In [None]:
df.show(5)

+----+--------------------+--------+----+------------+--------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|     Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------+--------+--------+--------+-----------+------------+
|3296|Dora the Explorer...|     GBA|2004|    Platform|  Gotham Games|    0.44|    0.16|     0.0|       0.01|        0.61|
|5695|Dynasty Warriors ...|     PS2|2004|      Action|    Tecmo Koei|     0.0|     0.0|    0.32|        0.0|        0.32|
|4986|San Francisco Rus...|      PS|1997|      Racing|GT Interactive|    0.21|    0.15|     0.0|       0.03|        0.38|
|1281|WWF WrestleMania ...|     N64|1999|    Fighting|           THQ|     1.2|    0.25|    0.02|       0.02|        1.48|
|4877|      Alpha Protocol|    X360|2010|Role-Playing|          Sega|    0.23|    0.13|     0.0|       0.04|         0.4|
+----+------------------

In [None]:
# Mostrando num de partições (como são poucos dados, Spark configurou apenas uma)
df.rdd.getNumPartitions()

1

In [None]:
# Reparticionando para duas partições
df = df.repartition(2)

In [None]:
df.rdd.getNumPartitions()

2

In [None]:
df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [None]:
df = df.withColumn('Year', F.col('Year').cast('integer'))

In [None]:
df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



## Questão 2

Construa uma tabela com:
- contagem das observações
- média
- desvio padrão
- valor mínimo
- valor máximo

para todas as coluna de vendas (que possuem "Sales" no nome).

**Dica:** uma **única função** consegue calcular todos estes valores.

In [None]:
df.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [None]:
sales_columns = ['NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales', 'Global_Sales']

df.describe(sales_columns).show()

+-------+------------------+-------------------+-------------------+--------------------+------------------+
|summary|          NA_Sales|           EU_Sales|           JP_Sales|         Other_Sales|      Global_Sales|
+-------+------------------+-------------------+-------------------+--------------------+------------------+
|  count|             16598|              16598|              16598|               16598|             16598|
|   mean|0.2646674298108155|0.14665200626581024|0.07778166044101428|0.048063019640918934|0.5374406555006714|
| stddev|0.8166830292988791| 0.5053512312869121| 0.3092906480822032| 0.18858840291271461|1.5550279355699124|
|    min|               0.0|                0.0|                0.0|                 0.0|              0.01|
|    max|             41.49|              29.02|              10.22|               10.57|             82.74|
+-------+------------------+-------------------+-------------------+--------------------+------------------+



## Questão 3

Para cada variável categórica do tipo string, calcule quantas categorias distintas estão presentes.

In [None]:
for tipo in df.dtypes:
  if tipo[1] == 'string':
    df.select(F.countDistinct(tipo[0])).distinct().show(truncate=False)

+--------------------+
|count(DISTINCT Name)|
+--------------------+
|11493               |
+--------------------+

+------------------------+
|count(DISTINCT Platform)|
+------------------------+
|31                      |
+------------------------+

+---------------------+
|count(DISTINCT Genre)|
+---------------------+
|12                   |
+---------------------+

+-------------------------+
|count(DISTINCT Publisher)|
+-------------------------+
|579                      |
+-------------------------+



In [None]:
cols_cat = ['Platform', 'Genre', 'Publisher']

df.select(*[F.count_distinct(col) for col in cols_cat]).show()

+------------------------+---------------------+-------------------------+
|count(DISTINCT Platform)|count(DISTINCT Genre)|count(DISTINCT Publisher)|
+------------------------+---------------------+-------------------------+
|                      31|                   12|                      579|
+------------------------+---------------------+-------------------------+



## Questão 4

Qual plataforma vendeu mais, mundialmente, considerando todo o período histórico diponível dos dados?  
Mostre as top 10 plataformas em vendas.

In [None]:
df.show(5)

+----+--------------------+--------+----+------------+--------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|     Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------+--------+--------+--------+-----------+------------+
|3296|Dora the Explorer...|     GBA|2004|    Platform|  Gotham Games|    0.44|    0.16|     0.0|       0.01|        0.61|
|5695|Dynasty Warriors ...|     PS2|2004|      Action|    Tecmo Koei|     0.0|     0.0|    0.32|        0.0|        0.32|
|4986|San Francisco Rus...|      PS|1997|      Racing|GT Interactive|    0.21|    0.15|     0.0|       0.03|        0.38|
|1281|WWF WrestleMania ...|     N64|1999|    Fighting|           THQ|     1.2|    0.25|    0.02|       0.02|        1.48|
|4877|      Alpha Protocol|    X360|2010|Role-Playing|          Sega|    0.23|    0.13|     0.0|       0.04|         0.4|
+----+------------------

In [None]:
(
  df
 .groupby('Name')
 .agg(
    F.count_distinct('Platform').alias('Numero_plataforma'),
  )
 .orderBy(F.desc('Numero_plataforma'))
 .show(10, truncate=False)
)

+------------+-----------------+
|Global_Sales|Numero_plataforma|
+------------+-----------------+
|0.07        |26               |
|0.14        |26               |
|0.06        |25               |
|0.08        |23               |
|0.02        |23               |
|0.12        |23               |
|0.03        |23               |
|0.28        |23               |
|0.04        |23               |
|0.05        |22               |
+------------+-----------------+
only showing top 10 rows



## Questão 5

Faça uma tabela com os jogos que aparecem em múltiplas plataformas, ordene de forma que os jogos com mais plataformas apareçam primeiro e responda:
- Qual jogo aparece em mais plataformas? Em quantas plataformas?

In [None]:
(
  df
  .groupby('Name')
  .agg(
    F.count_distinct('Platform').alias('Numero_plataforma'),
  )
 .orderBy(F.desc('Numero_plataforma'))
 .show(20, truncate=False)
)

+---------------------------------------+-----------------+
|Name                                   |Numero_plataforma|
+---------------------------------------+-----------------+
|Need for Speed: Most Wanted            |10               |
|Madden NFL 07                          |9                |
|LEGO Marvel Super Heroes               |9                |
|Ratatouille                            |9                |
|FIFA 14                                |9                |
|Terraria                               |8                |
|Monopoly                               |8                |
|FIFA Soccer 13                         |8                |
|LEGO Harry Potter: Years 5-7           |8                |
|Angry Birds Star Wars                  |8                |
|LEGO The Hobbit                        |8                |
|FIFA 15                                |8                |
|Madden NFL 08                          |8                |
|Lego Batman 3: Beyond Gotham           

**Resposta:** Need for Speed: Most Wanted. Aparece em 10 plataformas.

## Questão 6

Utilize a API do Pandas no Spark para calcular a soma das vendas globais para cada ano e gênro de jogo. Faça então um gráfico de linhas com os anos no eixo `x`, as vendas no eixo `y`, de forma que cada linha corresponda a um gênero de jogo.

**Dica:** após o cálculo, passar os dados para Pandas antes da plotagem, ou plotar diretamente aproveitando os métodos da classe `pyspark.pandas.DataFrame`.

Analise o gráfico e responda:
- Entre 1980 e 1990, quais gêneros mais venderam?
- Entre 2000 e 2015, quais gêneros mais venderam?

In [None]:
(
  df
  .groupby('Year', 'Genre')
  .agg(
    F.sum('Global_sales').alias('Sales'),
  )
 .orderBy(F.desc('Year'))
) \
.pandas_api() \
.plot.line(x='Year', y='Sales', color='Genre' )

In [None]:
df.pandas_api().groupby(['Year', 'Genre'])[['Global_Sales']].sum().sort_index().reset_index().plot.line(x='Year', y='Global_Sales', color='Genre' ) 

**Resposta:**
- Entre 1980 e 1990 sobressaíram as vendas de plataforma, tiro (shooter) e puzzle.
- Entre 2000 e 2015 teve vendas elevadas de jogos de ação e esporte.

## Questão 7

Registre a tabela usando `createOrReplaceTempView` e faça uso da linguagem SQL criar uma tabela que:
- Considere apenas os anos da década de 90 e 
- Agrupe por ano para responder quantos % cada região teve do total de vendas (vendas globais).

Salve o resultado desta query em uma variável chamada `df_questao7`.

Após isso, execute o seguinte comando `df_questao7.pandas_api().set_index('YEAR').style.background_gradient(cmap='Oranges')` e responda:
- Qual/quais regiões tiveram, relativamente (comparado às vendas globais), mais vendas no fim da década de noventa do que no início?








In [None]:
df.createOrReplaceTempView('vgsales')

In [None]:
query = """
SELECT
  Year,
  ROUND(SUM(NA_Sales) / SUM(Global_Sales) * 100, 2) as NA_Sales_Percent,
  ROUND(SUM(EU_Sales) / SUM(Global_Sales) * 100, 2) as EU_Sales_Percent,
  ROUND(SUM(JP_Sales) / SUM(Global_Sales) * 100, 2) as JP_Sales_Percent,
  ROUND(SUM(Other_Sales) / SUM(Global_Sales) * 100, 2) as Other_Sales_Percent,
  SUM(Global_Sales) as Global_Sales
FROM
  vgsales
WHERE
  Year BETWEEN 1990 AND 2000
GROUP BY
  Year
ORDER BY 
  Year
"""

df_questao7 = spark.sql(query)
df_questao7.show(truncate=False)

+----+----------------+----------------+----------------+-------------------+------------------+
|Year|NA_Sales_Percent|EU_Sales_Percent|JP_Sales_Percent|Other_Sales_Percent|Global_Sales      |
+----+----------------+----------------+----------------+-------------------+------------------+
|1990|51.55           |15.45           |30.13           |2.83               |49.39             |
|1991|39.59           |12.26           |45.86           |2.3                |32.230000000000004|
|1992|44.47           |15.38           |37.96           |2.17               |76.16             |
|1993|32.88           |10.11           |55.09           |1.94               |45.98             |
|1994|35.56           |18.79           |42.93           |2.78               |79.16999999999996 |
|1995|28.17           |16.91           |51.92           |3.0                |88.11             |
|1996|43.57           |23.73           |28.84           |3.86               |199.15000000000003|
|1997|47.14           |24.04  

In [None]:
df_questao7.pandas_api().set_index('Year').style.background_gradient(cmap='Oranges')

Unnamed: 0_level_0,NA_Sales_Percent,EU_Sales_Percent,JP_Sales_Percent,Other_Sales_Percent,Global_Sales
Year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1990,51.55,15.45,30.13,2.83,49.39
1991,39.59,12.26,45.86,2.3,32.23
1992,44.47,15.38,37.96,2.17,76.16
1993,32.88,10.11,55.09,1.94,45.98
1994,35.56,18.79,42.93,2.78,79.17
1995,28.17,16.91,51.92,3.0,88.11
1996,43.57,23.73,28.84,3.86,199.15
1997,47.14,24.04,24.32,4.54,200.98
1998,50.05,26.08,19.51,4.3,256.47
1999,50.17,24.94,20.83,4.0,251.27


**Resposta:** A região da Europa e "Outras regiões" tiverem um aumento notável quando comparado ao ínicio da década de 90.

## Questão 8

Calcule quantas vezes cada região teve vendas superiores às demais.

In [None]:
regions = ['NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales']


df_max_sale = \
(
  df
  .withColumn('max_sale', F.greatest(*regions))
  .withColumn(
    'Regiao_Maior_Venda',
    F.when(F.col('NA_Sales') == F.col('max_sale'), 'NA_Sales')
    .when(F.col('EU_Sales') == F.col('max_sale'), 'EU_Sales')
    .when(F.col('JP_Sales') == F.col('max_sale'), 'JP_Sales')
    .otherwise('Other_Sales')
  )
)

df_max_sale.show()

+-----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+--------+------------------+
| Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|max_sale|Regiao_Maior_Venda|
+-----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+--------+------------------+
| 3296|Dora the Explorer...|     GBA|2004|    Platform|        Gotham Games|    0.44|    0.16|     0.0|       0.01|        0.61|    0.44|          NA_Sales|
| 5695|Dynasty Warriors ...|     PS2|2004|      Action|          Tecmo Koei|     0.0|     0.0|    0.32|        0.0|        0.32|    0.32|          JP_Sales|
| 4986|San Francisco Rus...|      PS|1997|      Racing|      GT Interactive|    0.21|    0.15|     0.0|       0.03|        0.38|    0.21|          NA_Sales|
| 1281|WWF WrestleMania ...|     N64|1999|    Fighting|   

In [None]:
df_max_sale \
.groupBy('Regiao_Maior_Venda') \
.count() \
df.show()

+------------------+-----+
|Regiao_Maior_Venda|count|
+------------------+-----+
|          EU_Sales| 2380|
|          JP_Sales| 4029|
|          NA_Sales|10113|
|       Other_Sales|   76|
+------------------+-----+



## Questão 9

Construe uma tabela que mostre a diferença do total de vendas em um década com a década anterior e responda:
- Qual década apresentou a **menor diferença** comparada à década anterior?

In [None]:
# df = (
#   df
#   .withColumn('decade',
#     F.when(F.col('Year') < 1990, '80')
#     .when(F.col('Year') < 2000, '90')
#     .when(F.col('Year') < 2010, '2000')
#     .otherwise('2010')
#   )
# )

# df.show()

In [None]:
df = df.where('Year is not null').withColumn('decade', F.floor(F.col('Year')/10)*10) 
df.show()

+-----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------+
| Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|decade|
+-----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------+
| 3296|Dora the Explorer...|     GBA|2004|    Platform|        Gotham Games|    0.44|    0.16|     0.0|       0.01|        0.61|  2000|
| 5695|Dynasty Warriors ...|     PS2|2004|      Action|          Tecmo Koei|     0.0|     0.0|    0.32|        0.0|        0.32|  2000|
| 4986|San Francisco Rus...|      PS|1997|      Racing|      GT Interactive|    0.21|    0.15|     0.0|       0.03|        0.38|  1990|
| 1281|WWF WrestleMania ...|     N64|1999|    Fighting|                 THQ|     1.2|    0.25|    0.02|       0.02|        1.48|  1990|
| 4877|      Alpha Protocol|    X360|2010|Role-P

In [None]:
w = Window().orderBy('Decade')

(
  df
 .groupby('Decade')
 .agg(F.mean('Global_Sales').alias('global_sales_mean'))
 .withColumn('global_sales_mean_lag', F.lag('global_sales_mean').over(w))
 .withColumn('Delta_global_sales_mean', F.col('global_sales_mean') - F.col('global_sales_mean_lag'))
 .orderBy('Decade')
 .show()
) 

+------+-------------------+---------------------+-----------------------+
|Decade|  global_sales_mean|global_sales_mean_lag|Delta_global_sales_mean|
+------+-------------------+---------------------+-----------------------+
|  1980| 1.8369756097560979|                 null|                   null|
|  1990|   0.72295647258338|   1.8369756097560979|     -1.114019137172718|
|  2000| 0.5043462206776683|     0.72295647258338|    -0.2186102519057117|
|  2010|0.48999999999999744|   0.5043462206776683|   -0.01434622067767...|
|  2020|               0.29|  0.48999999999999744|   -0.19999999999999746|
+------+-------------------+---------------------+-----------------------+



## Questão 10 - PLUS

Utilizando apenas a sintaxe do pyspark, faça uma função que calcule a moda do dataframe para um grupo de colunas que o usuário vai especificar. Além da moda, o dataframe de saída deve conter a quantidade de vezes que a moda aparece, e tambem se é multimodal.

Sua função deve receber como entrada:
- O pyspark dataframe
- A coluna-alvo que desejamos saber a moda e
- Os grupos (colunas do dataframe) que iremos considerar para calcular a moda 

O resultado será uma linha por grupo, contendo a identificação do grupo, a moda, a quantidade de vezes que apareceu, e se é um caso multimodal.

Por exemplo, considerando o seguinte dataframe:

```
df_test = spark.createDataFrame([
  (2.4,'A','A1'), (2.4,'A','A1'), (2.5,'A','A1'), (2.6,'A','A1'), (2.7,'A','A1'),
  (2.4,'B','A1'), (2.5,'B','A1'), (2.5,'B','A1'), (2.6,'B','A1'), (2.4,'B','A1')
], ('values','inner_group', 'main_group')).select('main_group', 'inner_group', 'values')

df_test.show()
```

```
+----------+-----------+------+
|main_group|inner_group|values|
+----------+-----------+------+
|        A1|          A|   2.4|
|        A1|          A|   2.4|
|        A1|          A|   2.5|
|        A1|          A|   2.6|
|        A1|          A|   2.7|
|        A1|          B|   2.4|
|        A1|          B|   2.5|
|        A1|          B|   2.5|
|        A1|          B|   2.6|
|        A1|          B|   2.4|
+----------+-----------+------+
```

Ao aplicar a função:  
`calculate_mode(df=df_test, target_col='values', group_cols=['main_group','inner_group']).show()`

o retorno deve ser:

```
+----------+-----------+----+-----------+----------+
|main_group|inner_group|mode|mode_counts|multimodal|
+----------+-----------+----+-----------+----------+
|        A1|          A| 2.4|          2|     false|
|        A1|          B| 2.4|          2|      true|
+----------+-----------+----+-----------+----------+
```

pois no grupo `A1-A` o valor 2.4 é o que mais aparece (2x), e nenhum outro valor aparece duas vezes também. Já no grupo `A1-B` o valor 2.4 também é o que mais aparece (2X), porém em conjunto com o 2.5, que também aparece duas 2X. Neste caso, devemos reportar apenas uma das modas e informar que é multimodal.

In [None]:
df_test = spark.createDataFrame([
  (2.4,'A','A1'), (2.4,'A','A1'), (2.5,'A','A1'), (2.6,'A','A1'), (2.7,'A','A1'),
  (2.4,'B','A1'), (2.5,'B','A1'), (2.5,'B','A1'), (2.6,'B','A1'), (2.4,'B','A1')
], ('values','inner_group', 'main_group')).select('main_group', 'inner_group', 'values')

df_test.show()

+----------+-----------+------+
|main_group|inner_group|values|
+----------+-----------+------+
|        A1|          A|   2.4|
|        A1|          A|   2.4|
|        A1|          A|   2.5|
|        A1|          A|   2.6|
|        A1|          A|   2.7|
|        A1|          B|   2.4|
|        A1|          B|   2.5|
|        A1|          B|   2.5|
|        A1|          B|   2.6|
|        A1|          B|   2.4|
+----------+-----------+------+



In [None]:
def calculate_mode(df, target_col, group_cols=None):
  """Calculate mode

  Parameters
  ----------
  df : pyspark.DataFrame
    The dataframe in which the function will be applied
  target_col : str
    Column name in which the mode will be calculated from
  group_cols : list
    List column names to make the group. If None, no grouping will be considered.

  Returns
  -------
  output: pyspark.DataFrame
    Dataframe with group identification (one per line), the mode, value counts,
    and a column specifying if the group has more than one mode.
  """

  if group_cols is None:
    window_vals = Window.partitionBy('count')
    df_mode = df.groupby(target_col).count()
    max_count = df_mode.select(F.max('count')).collect()[0][0]

    df_mode = df_mode.filter(F.col('count')==max_count)
    nrows = df_mode.count()

    df_mode = df_mode.select(F.col(target_col).alias('mode'), F.col('count').alias('mode_counts'), (F.lit(nrows)>1).alias('multimodal'))

  else:
    window_grp = Window.partitionBy(*group_cols)
    window_grp_vals = Window.partitionBy(*group_cols, target_col)

    df_mode = (
      df
      .withColumn('counts', F.count(F.col(target_col)).over(window_grp_vals))
      .withColumn('counts_max', F.max('counts').over(window_grp))
      .filter(f'counts = counts_max')
      .dropDuplicates(subset=[*group_cols, target_col])
      .withColumn('multimodal', F.count(target_col).over(window_grp)>1)
      .withColumnRenamed(target_col, 'mode')
      .withColumnRenamed('counts_max', 'mode_counts')
      .dropDuplicates(subset=[*group_cols])
      .select(*group_cols, 'mode', 'mode_counts', 'multimodal')
    )
  return df_mode

In [None]:
calculate_mode(df_test, 'values', ['main_group','inner_group']).show()

+----------+-----------+----+-----------+----------+
|main_group|inner_group|mode|mode_counts|multimodal|
+----------+-----------+----+-----------+----------+
|        A1|          A| 2.4|          2|     false|
|        A1|          B| 2.4|          2|      true|
+----------+-----------+----+-----------+----------+

