<a href="https://colab.research.google.com/github/Rafaelfyh/Challenge_DataScience/blob/main/Revisa%CC%83o_spark_sql.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Overview

Apache Spark é uma ferramenta para trabalhar com grandes volumes de dados de forma paralela e distribuída. 

O ecosistema Spark pode ser dividido em diferentes módulos:

- Spark Core API
- Spark SQL
- Spark Streaming
- Spark MLlib
- GraphX

Nesta revisão, focaremos no módulo Spark SQL.

![Apache Spark Eco System](https://sparkbyexamples.com/wp-content/uploads/2020/02/spark-components-1.jpg)

Para referência completa de todas funcionalidades do Spark SQL, vide a [documentação oficial](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#core-classes).

Para tutoriais e exemplos práticos, vide site [Spark by examples](https://sparkbyexamples.com/).

# **SETUP**

## Spark UI

O Spark oferece uma _user interface_ (UI) para auxiliar no monitoramento das aplicações. Caso o notebook seja executado no Google Colab, precisamos fazer uma conexão segura com o ambiente onde o notebook está rodando.

Na primeira execução deste notebook, precisamos de 2 bibliotecas instaladas: 
- `pyspark`: biblioteca com API Python do Spark
- `pyngrok` (opcional): biblioteca para abrir conexões seguras a partir de URLs públicas 

Para realizar a instalação, basta descomentar as linhas de código abaixo e rodar a célula.

In [None]:
!pip install -q pyngrok
!pip install -q pyspark

[?25l[K     |▍                               | 10 kB 15.6 MB/s eta 0:00:01[K     |▉                               | 20 kB 5.7 MB/s eta 0:00:01[K     |█▎                              | 30 kB 7.7 MB/s eta 0:00:01[K     |█▊                              | 40 kB 4.6 MB/s eta 0:00:01[K     |██▏                             | 51 kB 4.5 MB/s eta 0:00:01[K     |██▋                             | 61 kB 5.2 MB/s eta 0:00:01[K     |███                             | 71 kB 5.4 MB/s eta 0:00:01[K     |███▌                            | 81 kB 5.5 MB/s eta 0:00:01[K     |████                            | 92 kB 6.1 MB/s eta 0:00:01[K     |████▍                           | 102 kB 5.3 MB/s eta 0:00:01[K     |████▉                           | 112 kB 5.3 MB/s eta 0:00:01[K     |█████▎                          | 122 kB 5.3 MB/s eta 0:00:01[K     |█████▊                          | 133 kB 5.3 MB/s eta 0:00:01[K     |██████▏                         | 143 kB 5.3 MB/s eta 0:00:01[K    

Para realizar operações em Spark, utilizamos a [SparkSession](https://sparkbyexamples.com/spark/sparksession-explained-with-examples/) que encapsula uma série de funcionalidades e contextos para utilizarmos nas aplicações do Spark, como:

- SparkContext
- SQLContext
- HiveContext
- Streaming Application

Para criar uma SparkSession, utilizaremos os seguintes métodos:

- `Builder()` para definir as configurações da sessão
- `config(key, value)` para definir configurações da sessão
- `getOrCreate()` para criar uma sessão com tais configurações (caso uma sessão com as mesmas configurações já exista, ela será retornada)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().config('spark.ui.port', '4050').getOrCreate()
spark


A Spark Session traz algumas informações relevantes:

- Versão do spark
- Nome da aplicação Spark
- Arquitetura Spark

A arquitetura Spark `local[*]` indica que a aplicação será executada na máquina local utilizando todas as thread disponíveis. Ao executar a aplicação em um cluster, temos a seguinte arquitetura:

![Spark Application Components](https://i0.wp.com/sparkbyexamples.com/wp-content/uploads/2022/08/image.png?w=596&ssl=1)

Para mais informações sobre os modos de gerenciamento do cluster, acesse este [link](https://sparkbyexamples.com/spark/what-does-setmaster-local-mean-in-spark/).

Caso queira explorar a Spark UI, utilize as células abaixo:

In [None]:
# Fazer login no site https://dashboard.ngrok.com/get-started/setup para obter autenticação própria
ngrok_token = '2Gj7SZZO6nxgVem2G9JEAbfSt2Q_86zYn3y1KrVNEcD4KEj6i'

In [None]:
get_ipython().system_raw(f'ngrok authtoken {ngrok_token}')
get_ipython().system_raw('ngrok http 4050 &')
!sleep 3
print('URL para interface Spark:')
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

URL para interface Spark:
https://6c72-35-196-73-145.ngrok.io


## Libraries

In [None]:
import pandas as pd
from google.colab import files

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

## Load data

Para esta revisão, utilizaremos dados em 2 arquivos:
- *unicorn_companies.csv*: contém informações de empresas "unicórnio".
- *country.csv*: contém metadados dos países.

O método `files.upload()` do Google colab nos ajuda a subir o arquivo para o ambiente Colab.

In [None]:
# Load unicorn_companies.csv
_ = files.upload()

Saving unicorn_companies.csv to unicorn_companies.csv


In [None]:
# Load country.csv
_ = files.upload()

Saving country.csv to country.csv


In [None]:
# Mostrar tabelas com pandas
print('unicorn_companies')
display(pd.read_csv('unicorn_companies.csv').tail())

unicorn_companies


Unnamed: 0,Company,Valuation,Date_Added,Country,Category,Select_Investors
912,Heyday,1.0,2021-11-16,United States,E-commerce & direct-to-consumer,"Khosla Ventures,General Catalyst, Victory Park..."
913,PLACE,1.0,2021-11-17,United States,Internet software & services,"Goldman Sachs Asset Management, 3L"
914,Stytch,1.0,2021-11-18,United States,Cybersecurity,"Index Ventures, Benchmark, Thrive Capital"
915,Owkin,1.0,2021-11-18,United States,Artificial Intelligence,"Google Ventures, Cathay Innovation, NJF Capital"
916,Expel,1.0,2021-11-18,United States,Cybersecurity,"Paladin Capital Group, Greycroft, Scale Ventur..."


In [None]:
print('country')
display(pd.read_csv('country.csv').tail())

country


Unnamed: 0,Country,ISO_Alpha3_Code,M49_Code,Region_1,Region_2,Continent
244,Wallis and Futuna Islands,WLF,876,Polynesia,,Oceania
245,Western Sahara,ESH,732,Northern Africa,,Africa
246,Yemen,YEM,887,Western Asia,,Asia
247,Zambia,ZMB,894,Eastern Africa,Sub-Saharan Africa,Africa
248,Zimbabwe,ZWE,716,Eastern Africa,Sub-Saharan Africa,Africa


Podemos carregar arquivos utilizando a `SparkSession` através do módulo `read`, que possibilita a leitura de arquivos csv, parquet, dentre outros.

In [None]:
# Carregar dados com Spark
df = spark.read.csv('unicorn_companies.csv', header=True, inferSchema=True)
df_country = spark.read.csv('country.csv', header=True, inferSchema=True)

# Playground



## Partitions 

O particionamento no Spark é uma propriedade dos _[Resilient Distributed Datasets](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)_ (RDDs), que são elementos de dados distribuídos **sem schema definido**. Para explorar esta propriedade, podemos converter os DataFrames em RDDs e utilizar os seguintes métodos:

- `DataFrame.rdd.getNumPartitions()`
- `DataFrame.rdd.repartition()`

Leia mais em: [Apache Spark RDD vs DataFrame vs DataSet](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/)

In [None]:
# Mostrando num de partições (como são poucos dados, Spark configurou apenas uma)
df.rdd.getNumPartitions()

1

In [None]:
# Reparticionando para duas partições
df = df.repartition(2)
df.rdd.getNumPartitions()

2

In [None]:
# df.write.mode("overwrite").partitionBy("Country").csv('./data', header=True)
df.repartition(5, "Country").write.mode("overwrite").csv('./data', header=True)

## Verify & cast data types

Como o DataFrame é uma estrutura de dados com schema (ou seja, colunas com nomes e tipos definidos), precisamos garantir que o schema possua os tipos corretos.

In [None]:
df

DataFrame[Company: string, Valuation: double, Date_Added: timestamp, Country: string, Category: string, Select_Investors: string]

In [None]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: timestamp (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



In [None]:
df.show(5)

+-------------+---------+-------------------+--------------+--------------------+--------------------+
|      Company|Valuation|         Date_Added|       Country|            Category|    Select_Investors|
+-------------+---------+-------------------+--------------+--------------------+--------------------+
|          FTX|     25.0|2021-07-20 00:00:00|     Hong Kong|             Fintech|Sequoia Capital, ...|
|        Unico|      1.0|2021-08-03 00:00:00|        Brazil|Artificial intell...|Big Bets, General...|
|  Star Charge|     2.41|2021-05-19 00:00:00|         China|Auto & transporta...|Shunwei Capital P...|
|Intellifusion|      1.0|2019-03-22 00:00:00|         China|Artificial intell...|BOC International...|
|    Matillion|      1.5|2021-09-15 00:00:00|United Kingdom|Data management &...|Scale Venture Par...|
+-------------+---------+-------------------+--------------+--------------------+--------------------+
only showing top 5 rows



Podemos realizar modificações no schema do DataFrame de diversas formas: 
- Através do método `Column.cast()`
- Especificando o schema na leitura do arquivo através de um `StructType`

In [None]:
df = df.withColumn('Date_Added', F.col('Date_Added').cast('date'))

In [None]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



Especificando um schema com `StructType`, que também pode ser usado para definir quais colunas serão carregadas no DataFrame.

A listagem de todos os tipos se encontra na [documentação](https://spark.apache.org/docs/latest/sql-ref-datatypes.html). 

In [None]:
schema = StructType([
    StructField('Company', StringType()),
    StructField('Valuation', DoubleType()),
    StructField('Date_Added', DateType()),
    StructField('Country', StringType()),
    StructField('Category', StringType()),
    StructField('Select_Investors', StringType())
])

df = spark.read.csv('unicorn_companies.csv', header=True, schema=schema)
df = df.repartition(2)
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



## Describe function

O método `describe` traz diversas estatísticas de todas as colunas do DataFrame, como `count`, `mean`, `min` e `max`. 

In [None]:
df.describe().show()

+-------+----------+-----------------+---------+--------------------+--------------------+
|summary|   Company|        Valuation|  Country|            Category|    Select_Investors|
+-------+----------+-----------------+---------+--------------------+--------------------+
|  count|       917|              917|      917|                 917|                 916|
|   mean|      null|3.311690294438385|     null|                null|                null|
| stddev|      null|7.542281464806968|     null|                null|                null|
|    min|1047 Games|              1.0|Argentina|Artificial Intell...|01 Advisors, Zeev...|
|    max|     wefox|            140.0|  Vietnam|              Travel|next47, MaC Ventu...|
+-------+----------+-----------------+---------+--------------------+--------------------+



Se quisermos selecionar a estatística de uma determinada coluna e utilizá-la como variável, podemos fazer isso da seguinte forma:

In [None]:
df.select(
    F.max('Valuation').alias('Max'), 
    F.mean('Valuation').alias('Mean'),
).collect()#[0]['Max']

[Row(Max=140.0, Mean=3.3116902944383892)]

Diferença dos métodos [.show()](https://sparkbyexamples.com/pyspark/pyspark-show-display-dataframe-contents-in-table/) e  [.collect()](https://sparkbyexamples.com/pyspark/pyspark-collect/):

- `.show()`: somente imprime o resultado (análogo ao `print`). Retorna `NoneType`.
- `.collect()`: retorna objetos da classe [Row](https://sparkbyexamples.com/pyspark/pyspark-row-using-rdd-dataframe/) (cuidado com a memória!)

## Select & Filters

Selecionando colunas e linhas (_Equivalente SQL query: `SELECT` e `WHERE`_)

In [None]:
df.select('Company', F.col('Company'), df.Company, df['Company'], df[0]).show(5)

+----------------+----------------+----------------+----------------+----------------+
|         Company|         Company|         Company|         Company|         Company|
+----------------+----------------+----------------+----------------+----------------+
|           Hopin|           Hopin|           Hopin|           Hopin|           Hopin|
|           Carro|           Carro|           Carro|           Carro|           Carro|
|Wildlife Studios|Wildlife Studios|Wildlife Studios|Wildlife Studios|Wildlife Studios|
| Yipin Shengxian| Yipin Shengxian| Yipin Shengxian| Yipin Shengxian| Yipin Shengxian|
|           Loggi|           Loggi|           Loggi|           Loggi|           Loggi|
+----------------+----------------+----------------+----------------+----------------+
only showing top 5 rows



Pata filtrar linhas a partir de uma condição, utilizamos os métodos `filter` ou `where`.

In [None]:
df.filter(F.col('Country')=='Brazil').show(5)

+--------------+---------+----------+-------+--------------------+--------------------+
|       Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+--------------+---------+----------+-------+--------------------+--------------------+
|MadeiraMadeira|      1.0|2021-01-07| Brazil|E-commerce & dire...|Flybridge Capital...|
|        Movile|      1.0|2018-07-12| Brazil|Mobile & telecomm...|Innova Capital - ...|
|         Loggi|      2.0|2019-06-05| Brazil|Supply chain, log...|Qualcomm Ventures...|
|        CargoX|      1.0|2021-10-21| Brazil|Supply chain, log...|Valor Capital Gro...|
|      Creditas|     1.75|2020-12-18| Brazil|             Fintech|Kaszek Ventures, ...|
+--------------+---------+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
df.where(F.col('Country')=='Brazil').show(5)

+--------------+---------+----------+-------+--------------------+--------------------+
|       Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+--------------+---------+----------+-------+--------------------+--------------------+
|MadeiraMadeira|      1.0|2021-01-07| Brazil|E-commerce & dire...|Flybridge Capital...|
|        Movile|      1.0|2018-07-12| Brazil|Mobile & telecomm...|Innova Capital - ...|
|         Loggi|      2.0|2019-06-05| Brazil|Supply chain, log...|Qualcomm Ventures...|
|        CargoX|      1.0|2021-10-21| Brazil|Supply chain, log...|Valor Capital Gro...|
|      Creditas|     1.75|2020-12-18| Brazil|             Fintech|Kaszek Ventures, ...|
+--------------+---------+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# sql-like syntax
df.filter('Country="Brazil" AND Valuation > 5').show()

+-----------+---------+----------+-------+--------------------+--------------------+
|    Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+-----------+---------+----------+-------+--------------------+--------------------+
|     Nubank|     30.0|2018-03-01| Brazil|             Fintech|Sequoia Capital, ...|
|    C6 Bank|     5.05|2020-12-02| Brazil|             Fintech|       Credit Suisse|
|QuintoAndar|      5.1|2019-09-09| Brazil|E-commerce & dire...|Kaszek Ventures, ...|
+-----------+---------+----------+-------+--------------------+--------------------+



## Group By / Order By

Utilizando sintaxe PySpark

In [None]:
(
  df
 .groupby('Category')
 .agg(
    F.sum('Valuation').alias('valuation_sum'),
    F.mean('Valuation').alias('valuation_mean')
 )
 .orderBy(F.desc('valuation_mean'))
 .show(truncate=False)
)

+-----------------------------------+------------------+------------------+
|Category                           |valuation_sum     |valuation_mean    |
+-----------------------------------+------------------+------------------+
|Finttech                           |10.0              |10.0              |
|Other                              |245.46            |4.812941176470589 |
|Artificial intelligence            |308.14            |4.465797101449275 |
|Fintech                            |726.39            |3.9477717391304346|
|Data management & analytics        |120.7             |3.448571428571429 |
|Auto & transportation              |101.35000000000001|3.3783333333333334|
|Edtech                             |85.37             |3.161851851851852 |
|Hardware                           |98.59             |3.0809375         |
|Internet software & services       |479.7099999999999 |2.9250609756097554|
|E-commerce & direct-to-consumer    |283.11999999999995|2.918762886597938 |
|Consumer & 

Utilizando SQL syntax

In [None]:
df.createOrReplaceTempView('unicorn')

In [None]:
query = """
SELECT
  Category,
  SUM(Valuation) AS valuation_sum,
  AVG(Valuation) AS valuation_mean
FROM
  unicorn
GROUP BY
  Category
ORDER BY
  valuation_mean DESC
"""

spark.sql(query).show(truncate=False)

+-----------------------------------+------------------+------------------+
|Category                           |valuation_sum     |valuation_mean    |
+-----------------------------------+------------------+------------------+
|Finttech                           |10.0              |10.0              |
|Other                              |245.46            |4.812941176470589 |
|Artificial intelligence            |308.14            |4.465797101449275 |
|Fintech                            |726.39            |3.9477717391304346|
|Data management & analytics        |120.7             |3.448571428571429 |
|Auto & transportation              |101.35000000000001|3.3783333333333334|
|Edtech                             |85.37             |3.161851851851852 |
|Hardware                           |98.59             |3.0809375         |
|Internet software & services       |479.7099999999999 |2.9250609756097554|
|E-commerce & direct-to-consumer    |283.11999999999995|2.918762886597938 |
|Consumer & 

## Distinct values

Contagem de valores distintos (_Equivalente SQL query: `COUNT (DISTINCT coluna)`_)

In [None]:
df.select(F.countDistinct('Category')).show()

+------------------------+
|count(DISTINCT Category)|
+------------------------+
|                      17|
+------------------------+



Apresentação de valores distintos (_Equivalente SQL query: SELECT DISTINCT_)

In [None]:
df.select('Category').distinct().show(truncate=False)

+-----------------------------------+
|Category                           |
+-----------------------------------+
|Cybersecurity                      |
|E-commerce & direct-to-consumer    |
|Artificial intelligence            |
|Travel                             |
|Health                             |
|Fintech                            |
|Edtech                             |
|Data management & analytics        |
|Auto & transportation              |
|Other                              |
|Supply chain, logistics, & delivery|
|Mobile & telecommunications        |
|Artificial Intelligence            |
|Internet software & services       |
|Hardware                           |
|Consumer & retail                  |
|Finttech                           |
+-----------------------------------+



Contar e apresentar valores distintos como um array

In [None]:
df.select(F.countDistinct('Category'), F.collect_set('Category')).show()

+------------------------+---------------------+
|count(DISTINCT Category)|collect_set(Category)|
+------------------------+---------------------+
|                      17| [Cybersecurity, H...|
+------------------------+---------------------+



Drop duplicates

In [None]:
df.drop_duplicates(subset=['Category']).show()

+--------------------+---------+----------+--------------+--------------------+--------------------+
|             Company|Valuation|Date_Added|       Country|            Category|    Select_Investors|
+--------------------+---------+----------+--------------+--------------------+--------------------+
|           SmartMore|      1.2|2021-06-24|         China|Artificial Intell...|IDG Capital, Zhen...|
|    Shift Technology|      1.0|2021-05-05|        France|Artificial intell...|Griffin Gaming Pa...|
|                 Via|     2.25|2020-03-30| United States|Auto & transporta...|83North, RiverPar...|
|               Razor|      1.0|2021-11-08|       Germany|   Consumer & retail|Global Founders C...|
|                Snyk|      8.6|2020-01-21|United Kingdom|       Cybersecurity|BOLDstart Venture...|
|              Axtria|      1.0|2021-05-13| United States|Data management &...|Helion Venture Pa...|
|                Away|      1.4|2019-05-15| United States|E-commerce & dire...|Global Found

## Join

In [None]:
df_country.show(5)

+--------------+---------------+--------+---------------+--------+---------+
|       Country|ISO_Alpha3_Code|M49_Code|       Region_1|Region_2|Continent|
+--------------+---------------+--------+---------------+--------+---------+
|   Afghanistan|            AFG|       4|  Southern Asia|    null|     Asia|
| Åland Islands|            ALA|     248|Northern Europe|    null|   Europe|
|       Albania|            ALB|       8|Southern Europe|    null|   Europe|
|       Algeria|            DZA|      12|Northern Africa|    null|   Africa|
|American Samoa|            ASM|      16|      Polynesia|    null|  Oceania|
+--------------+---------------+--------+---------------+--------+---------+
only showing top 5 rows



In [None]:
df.join(df_country, on='Country', how='inner').show()

+-------------+-----------------+---------+----------+--------------------+--------------------+---------------+--------+------------------+--------+-------------+
|      Country|          Company|Valuation|Date_Added|            Category|    Select_Investors|ISO_Alpha3_Code|M49_Code|          Region_1|Region_2|    Continent|
+-------------+-----------------+---------+----------+--------------------+--------------------+---------------+--------+------------------+--------+-------------+
|        China|           Maimai|      1.0|2017-11-15|Mobile & telecomm...|Morningside Ventu...|            CHN|     156|      Eastern Asia|    null|         Asia|
|        China|       Apus Group|     1.73|2015-01-16|Mobile & telecomm...|Redpoint Ventures...|            CHN|     156|      Eastern Asia|    null|         Asia|
|United States| Pantheon Systems|      1.0|2021-07-13|Internet software...|Foundry Group, Sc...|            USA|     840|  Northern America|    null|North America|
|        India| 

## -> Example Break point

**Exemplo utilizando todo conteúdo mostrado acima**

Considerando apenas empresas com avaliação maior que 1 bilhão de dólares, quantas empresas temos por continente? Estas empresas fazem parte de quantos países distintos no continente?

In [None]:
(
  df
 .where('Valuation>1')
 .join(df_country, on='Country')
 .groupby('Continent')
 .agg(
     F.countDistinct('Country').alias('n_country'),
     F.count('*').alias('total_observations')
 )
 .orderBy(F.desc('n_country'))
 .show()
)

+-------------+---------+------------------+
|    Continent|n_country|total_observations|
+-------------+---------+------------------+
|       Europe|       17|                83|
|         Asia|       14|               205|
|North America|        4|               394|
|South America|        4|                12|
|       Africa|        3|                 3|
|      Oceania|        1|                 4|
+-------------+---------+------------------+



## Add, update and rename columns

Adicionando colunas a partir de transformações em colunas existentes

In [None]:
df.withColumn('Year', F.year('Date_Added')).show(5)

+----------------+---------+----------+-------------+--------------------+--------------------+----+
|         Company|Valuation|Date_Added|      Country|            Category|    Select_Investors|Year|
+----------------+---------+----------+-------------+--------------------+--------------------+----+
|          Maimai|      1.0|2017-11-15|        China|Mobile & telecomm...|Morningside Ventu...|2017|
|      Apus Group|     1.73|2015-01-16|        China|Mobile & telecomm...|Redpoint Ventures...|2015|
|Pantheon Systems|      1.0|2021-07-13|United States|Internet software...|Foundry Group, Sc...|2021|
|           Groww|      3.0|2021-04-07|        India|             Fintech|Tiger Global Mana...|2021|
|          Axtria|      1.0|2021-05-13|United States|Data management &...|Helion Venture Pa...|2021|
+----------------+---------+----------+-------------+--------------------+--------------------+----+
only showing top 5 rows



Adicionando colunas em cadeia (veja o método [.lit()](https://sparkbyexamples.com/pyspark/pyspark-lit-add-literal-constant/))

In [None]:
valuation_mean = df.select(F.mean('Valuation')).collect()[0][0]

(
  df
 .withColumn('Year', F.year('Date_Added'))
 .withColumn('Valuation_mean', F.lit(valuation_mean))
 .show(5)
)

+----------------+---------+----------+-------------+--------------------+--------------------+----+------------------+
|         Company|Valuation|Date_Added|      Country|            Category|    Select_Investors|Year|    Valuation_mean|
+----------------+---------+----------+-------------+--------------------+--------------------+----+------------------+
|          Maimai|      1.0|2017-11-15|        China|Mobile & telecomm...|Morningside Ventu...|2017|3.3116902944383892|
|      Apus Group|     1.73|2015-01-16|        China|Mobile & telecomm...|Redpoint Ventures...|2015|3.3116902944383892|
|Pantheon Systems|      1.0|2021-07-13|United States|Internet software...|Foundry Group, Sc...|2021|3.3116902944383892|
|           Groww|      3.0|2021-04-07|        India|             Fintech|Tiger Global Mana...|2021|3.3116902944383892|
|          Axtria|      1.0|2021-05-13|United States|Data management &...|Helion Venture Pa...|2021|3.3116902944383892|
+----------------+---------+----------+-

Renomeando colunas com o método [.withColumnRenamed()](https://sparkbyexamples.com/pyspark/pyspark-rename-dataframe-column/)

In [None]:
df.withColumnRenamed('Date_Added', 'Date').show(5)

+----------------+---------+----------+-------------+--------------------+--------------------+
|         Company|Valuation|      Date|      Country|            Category|    Select_Investors|
+----------------+---------+----------+-------------+--------------------+--------------------+
|          Maimai|      1.0|2017-11-15|        China|Mobile & telecomm...|Morningside Ventu...|
|      Apus Group|     1.73|2015-01-16|        China|Mobile & telecomm...|Redpoint Ventures...|
|Pantheon Systems|      1.0|2021-07-13|United States|Internet software...|Foundry Group, Sc...|
|           Groww|      3.0|2021-04-07|        India|             Fintech|Tiger Global Mana...|
|          Axtria|      1.0|2021-05-13|United States|Data management &...|Helion Venture Pa...|
+----------------+---------+----------+-------------+--------------------+--------------------+
only showing top 5 rows



## Filtering/detecting patterns

`lower`/`upper` + `contains`

In [None]:
(
  df
 .filter(F.lower('Category').contains('art'))
 .select('Category')
 .distinct()
 .show(truncate=False)
)

+-----------------------+
|Category               |
+-----------------------+
|Artificial intelligence|
|Artificial Intelligence|
+-----------------------+



Substituindo valores em _strings_ com o método [.regexp_replace()](https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/#regexp_replace-replace-string-columns).

In [None]:
df = df.withColumn('Category', F.regexp_replace('Category', 'intelligence', 'Intelligence'))
# df.replace({'Artificial intelligence': 'Artificial Intelligence'}, subset=['Category'])

In [None]:
df.select('Category').distinct().show(truncate=False)

+-----------------------------------+
|Category                           |
+-----------------------------------+
|Cybersecurity                      |
|E-commerce & direct-to-consumer    |
|Travel                             |
|Health                             |
|Fintech                            |
|Edtech                             |
|Data management & analytics        |
|Auto & transportation              |
|Other                              |
|Supply chain, logistics, & delivery|
|Mobile & telecommunications        |
|Artificial Intelligence            |
|Internet software & services       |
|Hardware                           |
|Consumer & retail                  |
|Finttech                           |
+-----------------------------------+



## Case when

In [None]:
(
  df
 .withColumn(
     'Valuation_category',
     F.when(F.col('Valuation')<10, '10')
     .when(F.col('Valuation')<20, '20')
     .when(F.col('Valuation')<30, '30')
     .when(F.col('Valuation')<40, '40')
     .when(F.col('Valuation')<50, '50')
     .otherwise('50+')
     )
 .show(30)
)

+-----------------+---------+----------+--------------+--------------------+--------------------+------------------+
|          Company|Valuation|Date_Added|       Country|            Category|    Select_Investors|Valuation_category|
+-----------------+---------+----------+--------------+--------------------+--------------------+------------------+
|           Maimai|      1.0|2017-11-15|         China|Mobile & telecomm...|Morningside Ventu...|                10|
|       Apus Group|     1.73|2015-01-16|         China|Mobile & telecomm...|Redpoint Ventures...|                10|
| Pantheon Systems|      1.0|2021-07-13| United States|Internet software...|Foundry Group, Sc...|                10|
|            Groww|      3.0|2021-04-07|         India|             Fintech|Tiger Global Mana...|                10|
|           Axtria|      1.0|2021-05-13| United States|Data management &...|Helion Venture Pa...|                10|
|             Away|      1.4|2019-05-15| United States|E-commerc

## Window functions

As window functions operam em um conjunto de linhas e retornam um único valor para essas linhas. Elas podem ser divididas em:

- Ranking functions
- Analytic functions
- Aggregate functions

![](https://sparkbyexamples.com/wp-content/uploads/2019/12/spark-sql-window-functions-768x435.jpg)

[Source](https://sparkbyexamples.com/spark/spark-sql-window-functions/): Spark by examples

Construa um dataframe onde cada linha é um país e as colunas são ordenadas pelas empresas de maior para menor valuation de um determinado país. Considere no máximo 3 empresas para cada país.

In [None]:
from pyspark.sql.window import Window
w = Window().partitionBy('Country').orderBy(F.desc('Valuation'))

df_top3_by_country = (
  df
 .withColumn('rn', F.row_number().over(w))
 .where('rn <= 3')
 .select('Country','rn','Company','Category','Valuation')
 .orderBy('Country','rn')
)

df_top3_by_country.show(truncate=False)

+---------+---+--------------+-----------------------------------+---------+
|Country  |rn |Company       |Category                           |Valuation|
+---------+---+--------------+-----------------------------------+---------+
|Argentina|1  |Uala          |Fintech                            |2.45     |
|Australia|1  |Canva         |Internet software & services       |40.0     |
|Australia|2  |Airwallex     |Fintech                            |5.5      |
|Australia|3  |SafetyCulture |Internet software & services       |1.6      |
|Austria  |1  |BitPanda      |Fintech                            |4.11     |
|Austria  |2  |GoStudent     |Edtech                             |1.7      |
|Belgium  |1  |Collibra      |Data management & analytics        |5.25     |
|Belgium  |2  |Odoo          |Internet software & services       |2.3      |
|Bermuda  |1  |Afiniti       |Artificial Intelligence            |1.6      |
|Brazil   |1  |Nubank        |Fintech                            |30.0     |

Funções `.lag()` e `.lead()` são usadas para "arrastar" os elementos de uma coluna "para cima e para baixo"

In [None]:
w = Window().partitionBy('Country').orderBy('rn')

(
  df_top3_by_country
 .withColumn('Valuation_lag', F.lag('Valuation').over(w))
 .withColumn('Valuation_lead', F.lead('Valuation').over(w))
 .orderBy('Country','rn')
 .drop('Company','Category')
 .show()
)

+---------+---+---------+-------------+--------------+
|  Country| rn|Valuation|Valuation_lag|Valuation_lead|
+---------+---+---------+-------------+--------------+
|Argentina|  1|     2.45|         null|          null|
|Australia|  1|     40.0|         null|           5.5|
|Australia|  2|      5.5|         40.0|           1.6|
|Australia|  3|      1.6|          5.5|          null|
|  Austria|  1|     4.11|         null|           1.7|
|  Austria|  2|      1.7|         4.11|          null|
|  Belgium|  1|     5.25|         null|           2.3|
|  Belgium|  2|      2.3|         5.25|          null|
|  Bermuda|  1|      1.6|         null|          null|
|   Brazil|  1|     30.0|         null|           5.1|
|   Brazil|  2|      5.1|         30.0|          5.05|
|   Brazil|  3|     5.05|          5.1|          null|
|   Canada|  1|      7.6|         null|          4.15|
|   Canada|  2|     4.15|          7.6|           4.0|
|   Canada|  3|      4.0|         4.15|          null|
|    Chile

## Pivot table

O método [.pivot()](https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/) é utilizado para transformar linhas em colunas

In [None]:
df_top3_by_country.groupby('Country').pivot('rn').agg(F.first('Company')).show(truncate=False)

+--------------+-----------+--------------------+--------------------------+
|Country       |1          |2                   |3                         |
+--------------+-----------+--------------------+--------------------------+
|Argentina     |Uala       |null                |null                      |
|Australia     |Canva      |Airwallex           |SafetyCulture             |
|Austria       |BitPanda   |GoStudent           |null                      |
|Belgium       |Collibra   |Odoo                |null                      |
|Bermuda       |Afiniti    |null                |null                      |
|Brazil        |Nubank     |QuintoAndar         |C6 Bank                   |
|Canada        |Dapper Labs|SSENSE              |PointClickCare            |
|Chile         |NotCo      |null                |null                      |
|China         |Bytedance  |Xiaohongshu         |Yuanfudao                 |
|Colombia      |Rappi      |LifeMiles           |null                      |

## Pandas API on Spark

In [None]:
df.pandas_api().head() 



Unnamed: 0,Company,Valuation,Date_Added,Country,Category,Select_Investors
0,Maimai,1.0,2017-11-15,China,Mobile & telecommunications,"Morningside Venture Capital, IDG Capital, DCM ..."
1,Apus Group,1.73,2015-01-16,China,Mobile & telecommunications,"Redpoint Ventures, QiMing Venture Partners, Ch..."
2,Pantheon Systems,1.0,2021-07-13,United States,Internet software & services,"Foundry Group, Scale Venture Partners, SoftBan..."
3,Groww,3.0,2021-04-07,India,Fintech,"Tiger Global Management, Sequoia Capital India..."
4,Axtria,1.0,2021-05-13,United States,Data management & analytics,"Helion Venture Partners, Bain Capital Tech Opp..."


In [None]:
df.pandas_api().groupby(['Country','Category'])[['Valuation']].sum().head(10)

Unnamed: 0_level_0,Unnamed: 1_level_0,Valuation
Country,Category,Unnamed: 2_level_1
United States,Travel,9.55
India,"Supply chain, logistics, & delivery",14.68
Japan,Artificial Intelligence,2.0
United States,E-commerce & direct-to-consumer,83.67
United Arab Emirates,"Supply chain, logistics, & delivery",1.0
Germany,Health,3.52
Norway,E-commerce & direct-to-consumer,2.2
Israel,Fintech,8.0
Malaysia,E-commerce & direct-to-consumer,1.3
Israel,Auto & transportation,1.5


### Plot using pandas API

In [None]:
df.groupby('Category').agg(F.sum('Valuation').alias('Valuation')).pandas_api().plot.bar(x='Category', y='Valuation')

## -> Example Break point

Calcule a soma das avaliações por ano e categoria, então faça um gráfico de linhas com o ano no eixo x e avaliação no eixo y, sendo uma linha por categoria.


In [None]:
(
  df
 .withColumn('Year', F.year('Date_Added'))
 .groupby('Year','Category')
 .sum('Valuation')
 .orderBy('Year','Category')
 .pandas_api()
 .plot.line(x='Year', y='sum(Valuation)', color='Category')
)

## Further functionalities

- [.coalesce()](https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-coalesce/): seleciona o primeiro item não-nulo de uma lista
- [.isNull()/.isNotNull()](https://sparkbyexamples.com/pyspark/pyspark-isnull/): indica se um valor é nulo/não-nulo
- [.concat()/.concat_ws()](https://sparkbyexamples.com/pyspark/pyspark-concatenate-columns/): concatena valores (_with separator_)
- [.split()](https://sparkbyexamples.com/pyspark/pyspark-split-dataframe-column-into-multiple-columns/): quebra uma string em um array a partir de um caracter
- [.getItem()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.getItem.html): seleciona o item _i_ de uma lista ou a valor da chave _i_ de um dicionário
- [.explode()](https://sparkbyexamples.com/pyspark/pyspark-explode-array-and-map-columns-to-rows/): quebra um array em várias linhas

In [None]:
(
    df.join(df_country, on='Country')
    .withColumn('region_test', F.coalesce('Region_2', 'Region_1', F.lit('NA')))
    .withColumn('country_company', F.concat_ws('_', 'country','company'))
    .withColumn('country_company_split', F.split('country_company', '_'))
    .filter(F.col('Region_2').isNotNull())
    .withColumn('country_recovery', F.col('country_company_split').getItem(0))
    .withColumn('exploded', F.explode('country_company_split'))
    .show()
)

+------------+-------------------+---------+----------+--------------------+--------------------+---------------+--------+---------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+-------------------+
|     Country|            Company|Valuation|Date_Added|            Category|    Select_Investors|ISO_Alpha3_Code|M49_Code|       Region_1|            Region_2|    Continent|         region_test|     country_company|country_company_split|country_recovery|           exploded|
+------------+-------------------+---------+----------+--------------------+--------------------+---------------+--------+---------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+-------------------+
|     Nigeria|               Opay|      2.0|2021-08-23|             Fintech|Sequoia Capital C...|            NGA|     566| Western Africa|  Sub-Saharan Africa|       Africa|  

## DataType-specific funtions

Acesse [este link](https://sparkbyexamples.com/) para ver outros métodos para cada data type (Spark SQL Functions).

## Date functions

Acesse [este link](https://sparkbyexamples.com/spark/spark-sql-date-and-time-functions/) para ver todas cas possibilidades de funções de data/timestamp.

In [None]:
(
  df
 .select(
     F.date_trunc('month', 'Date_Added'),
     F.month('Date_Added'),
     F.dayofyear('Date_Added'),
     F.lit(F.current_date())
  )
 .show(10)
)

+-----------------------------+-----------------+---------------------+--------------+
|date_trunc(month, Date_Added)|month(Date_Added)|dayofyear(Date_Added)|current_date()|
+-----------------------------+-----------------+---------------------+--------------+
|          2021-11-01 00:00:00|               11|                  306|    2022-11-04|
|          2021-02-01 00:00:00|                2|                   38|    2022-11-04|
|          2019-12-01 00:00:00|               12|                  337|    2022-11-04|
|          2017-01-01 00:00:00|                1|                    1|    2022-11-04|
|          2019-07-01 00:00:00|                7|                  212|    2022-11-04|
|          2017-05-01 00:00:00|                5|                  131|    2022-11-04|
|          2017-08-01 00:00:00|                8|                  237|    2022-11-04|
|          2021-03-01 00:00:00|                3|                   84|    2022-11-04|
|          2021-03-01 00:00:00|            

## Exemplos de perguntas

Quantos investidores distintos tem por continente-país? Ordene pela quantidade de investidores distintos de forma decrescente.

In [None]:
(
  df
 .join(df_country, on='Country')
 .withColumn('investors', F.split('Select_investors', ','))
 .withColumn('investors', F.explode('investors'))
 .groupby('Continent','Country')
 .agg(F.countDistinct('investors').alias('n_investors'))
 .orderBy(F.desc('n_investors'))
 .show()
)

+-------------+--------------+-----------+
|    Continent|       Country|n_investors|
+-------------+--------------+-----------+
|North America| United States|        717|
|         Asia|         China|        311|
|         Asia|         India|         91|
|       Europe|United Kingdom|         81|
|       Europe|       Germany|         57|
|         Asia|        Israel|         53|
|       Europe|        France|         46|
|North America|        Canada|         42|
|         Asia|     Singapore|         33|
|South America|        Brazil|         33|
|         Asia|   South Korea|         22|
|         Asia|         Japan|         18|
|         Asia|     Hong Kong|         18|
|         Asia|     Indonesia|         15|
|      Oceania|     Australia|         14|
|       Europe|        Sweden|         11|
|       Europe|   Netherlands|         10|
|North America|        Mexico|         10|
|       Europe|   Switzerland|          9|
|       Europe|        Norway|          9|
+----------