# Overview

Nesta revisão, focaremos no sistema Spark SQL.

![Apache Spark Eco System](https://sparkbyexamples.com/wp-content/uploads/2020/02/spark-components-1.jpg)

Para referência completa de todas funcionalidades do Spark SQL, vide a [documentação oficial](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#core-classes).

Para tutoriais e exemplos práticos, vide site [Spark by examples](https://sparkbyexamples.com/).

# **SETUP**

## Spark UI

In [1]:
!pip install -q pyngrok
!pip install -q pyspark

[K     |████████████████████████████████| 745 kB 4.0 MB/s eta 0:00:01
[?25h  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[K     |████████████████████████████████| 199 kB 59.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.ui.port', '4050').getOrCreate()
spark

In [3]:
# Fazer login no site https://dashboard.ngrok.com/get-started/setup para obter autenticação própria
ngrok_token = '27WqUGZ1SkzPl4bQunMXJyasbWZ_5pCgLCsppL7ufk9rzK7j4'

In [4]:
get_ipython().system_raw(f'ngrok authtoken {ngrok_token}')
get_ipython().system_raw('ngrok http 4050 &')
!sleep 3
print('URL para interface Spark:')
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url":"(?=https)\K[^"]*'

URL para interface Spark:
https://43b8-34-74-39-250.ngrok.io


## Libraries

In [5]:
import pandas as pd
from google.colab import files

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

## Load data

In [6]:
# Load unicorn_companies.csv
_ = files.upload()

Saving unicorn_companies.csv to unicorn_companies.csv


In [7]:
# Load country.csv
_ = files.upload()

Saving country.csv to country.csv


In [8]:
# Mostrar tabelas com pandas
print('unicorn_companies')
display(pd.read_csv('unicorn_companies.csv'))
print('\ncountry')
display(pd.read_csv('country.csv'))

unicorn_companies


Unnamed: 0,Company,Valuation,Date_Added,Country,Category,Select_Investors
0,Bytedance,140.0,2017-04-07,China,Artificial intelligence,"Sequoia Capital China, SIG Asia Investments, S..."
1,SpaceX,100.3,2012-12-01,United States,Other,"Founders Fund, Draper Fisher Jurvetson, Rothen..."
2,Stripe,95.0,2014-01-23,United States,Fintech,"Khosla Ventures, LowercaseCapital, capitalG"
3,Klarna,45.6,2011-12-12,Sweden,Fintech,"Institutional Venture Partners, Sequoia Capita..."
4,Canva,40.0,2018-01-08,Australia,Internet software & services,"Sequoia Capital China, Blackbird Ventures, Mat..."
...,...,...,...,...,...,...
912,Heyday,1.0,2021-11-16,United States,E-commerce & direct-to-consumer,"Khosla Ventures,General Catalyst, Victory Park..."
913,PLACE,1.0,2021-11-17,United States,Internet software & services,"Goldman Sachs Asset Management, 3L"
914,Stytch,1.0,2021-11-18,United States,Cybersecurity,"Index Ventures, Benchmark, Thrive Capital"
915,Owkin,1.0,2021-11-18,United States,Artificial Intelligence,"Google Ventures, Cathay Innovation, NJF Capital"



country


Unnamed: 0,Country,ISO_Alpha3_Code,M49_Code,Region_1,Region_2,Continent
0,Afghanistan,AFG,4,Southern Asia,,Asia
1,Åland Islands,ALA,248,Northern Europe,,Europe
2,Albania,ALB,8,Southern Europe,,Europe
3,Algeria,DZA,12,Northern Africa,,Africa
4,American Samoa,ASM,16,Polynesia,,Oceania
...,...,...,...,...,...,...
244,Wallis and Futuna Islands,WLF,876,Polynesia,,Oceania
245,Western Sahara,ESH,732,Northern Africa,,Africa
246,Yemen,YEM,887,Western Asia,,Asia
247,Zambia,ZMB,894,Eastern Africa,Sub-Saharan Africa,Africa


In [9]:
# Carrecar dados com Spark
df = spark.read.csv('unicorn_companies.csv', header=True, inferSchema=True)
df_country = spark.read.csv('country.csv', header=True, inferSchema=True)

# Playground

Partitons: 
- Show number of partitions
- Repartition
- Coalesce

In [10]:
# Mostrando num de partições (como são poucos dados, Spark configurou apenas uma)
df.rdd.getNumPartitions()

1

In [11]:
# Reparticionando para duas partições
df = df.repartition(2)

In [12]:
df.rdd.getNumPartitions()

2

## Verify & cast data types

Show data schema

In [13]:
df

DataFrame[Company: string, Valuation: double, Date_Added: timestamp, Country: string, Category: string, Select_Investors: string]

In [14]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: timestamp (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



Changing data schema with `cast`

In [15]:
df = df.withColumn('Date_Added', F.col('Date_Added').cast('date'))

In [16]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



Specifying data schema with `StructType`

In [17]:
schema = StructType([
    StructField('Company', StringType()),
    StructField('Valuation', DoubleType()),
    StructField('Date_Added', DateType()),
    StructField('Country', StringType()),
    StructField('Category', StringType()),
    StructField('Select_Investors', StringType())
])

df = spark.read.csv('unicorn_companies.csv', header=True, schema=schema)
df = df.repartition(2)
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Valuation: double (nullable = true)
 |-- Date_Added: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Select_Investors: string (nullable = true)



## Describe function

In [18]:
df.describe().show()

+-------+----------+-----------------+---------+--------------------+--------------------+
|summary|   Company|        Valuation|  Country|            Category|    Select_Investors|
+-------+----------+-----------------+---------+--------------------+--------------------+
|  count|       917|              917|      917|                 917|                 916|
|   mean|      null|3.311690294438385|     null|                null|                null|
| stddev|      null|7.542281464806968|     null|                null|                null|
|    min|1047 Games|              1.0|Argentina|Artificial Intell...|01 Advisors, Zeev...|
|    max|     wefox|            140.0|  Vietnam|              Travel|next47, MaC Ventu...|
+-------+----------+-----------------+---------+--------------------+--------------------+



## Select & Filters

There are multiple ways to select columns:

In [19]:
df.select('Company', F.col('Company'), df.Company, df['Company'], df[0]).show(5)

+---------------+---------------+---------------+---------------+---------------+
|        Company|        Company|        Company|        Company|        Company|
+---------------+---------------+---------------+---------------+---------------+
|      GoStudent|      GoStudent|      GoStudent|      GoStudent|      GoStudent|
|SouChe Holdings|SouChe Holdings|SouChe Holdings|SouChe Holdings|SouChe Holdings|
|  Kuaigou Dache|  Kuaigou Dache|  Kuaigou Dache|  Kuaigou Dache|  Kuaigou Dache|
|       Rightway|       Rightway|       Rightway|       Rightway|       Rightway|
|       CarDekho|       CarDekho|       CarDekho|       CarDekho|       CarDekho|
+---------------+---------------+---------------+---------------+---------------+
only showing top 5 rows



Filtering with `filter` and `where`; PySpark and SQL syntaxes

In [20]:
df.filter(F.col('Country')=='Brazil').show(5)

+----------------+---------+----------+-------+--------------------+--------------------+
|         Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+----------------+---------+----------+-------+--------------------+--------------------+
|           Unico|      1.0|2021-08-03| Brazil|Artificial intell...|Big Bets, General...|
|       Nuvemshop|      3.1|2021-08-17| Brazil|E-commerce & dire...|Kaszek Ventures, ...|
|         C6 Bank|     5.05|2020-12-02| Brazil|             Fintech|       Credit Suisse|
|           iFood|      1.0|2018-11-13| Brazil|Supply chain, log...|Movile, Just Eat,...|
|Wildlife Studios|      3.0|2019-12-05| Brazil|               Other|Benchmark, Bessem...|
+----------------+---------+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [21]:
df.where(F.col('Country')=='Brazil').show(5)

+----------------+---------+----------+-------+--------------------+--------------------+
|         Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+----------------+---------+----------+-------+--------------------+--------------------+
|           Unico|      1.0|2021-08-03| Brazil|Artificial intell...|Big Bets, General...|
|       Nuvemshop|      3.1|2021-08-17| Brazil|E-commerce & dire...|Kaszek Ventures, ...|
|         C6 Bank|     5.05|2020-12-02| Brazil|             Fintech|       Credit Suisse|
|           iFood|      1.0|2018-11-13| Brazil|Supply chain, log...|Movile, Just Eat,...|
|Wildlife Studios|      3.0|2019-12-05| Brazil|               Other|Benchmark, Bessem...|
+----------------+---------+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [22]:
# sql-like syntax
df.filter('Country="Brazil" AND Valuation > 5').show()

+-----------+---------+----------+-------+--------------------+--------------------+
|    Company|Valuation|Date_Added|Country|            Category|    Select_Investors|
+-----------+---------+----------+-------+--------------------+--------------------+
|    C6 Bank|     5.05|2020-12-02| Brazil|             Fintech|       Credit Suisse|
|QuintoAndar|      5.1|2019-09-09| Brazil|E-commerce & dire...|Kaszek Ventures, ...|
|     Nubank|     30.0|2018-03-01| Brazil|             Fintech|Sequoia Capital, ...|
+-----------+---------+----------+-------+--------------------+--------------------+



## Group By / Order By

Using pyspark syntax

In [23]:
(
  df
 .groupby('Category')
 .agg(
    F.sum('Valuation').alias('valuation_sum'),
    F.mean('Valuation').alias('valuation_mean')
 )
 .orderBy(F.desc('valuation_mean'))
 .show(truncate=False)
)

+-----------------------------------+------------------+------------------+
|Category                           |valuation_sum     |valuation_mean    |
+-----------------------------------+------------------+------------------+
|Finttech                           |10.0              |10.0              |
|Other                              |245.46            |4.812941176470589 |
|Artificial intelligence            |308.14            |4.465797101449275 |
|Fintech                            |726.39            |3.9477717391304346|
|Data management & analytics        |120.7             |3.448571428571429 |
|Auto & transportation              |101.35000000000001|3.3783333333333334|
|Edtech                             |85.37             |3.161851851851852 |
|Hardware                           |98.59             |3.0809375         |
|Internet software & services       |479.7099999999999 |2.9250609756097554|
|E-commerce & direct-to-consumer    |283.11999999999995|2.918762886597938 |
|Consumer & 

Using SQL syntax

In [24]:
df.createOrReplaceTempView('unicorn')

In [25]:
query = """
SELECT
  Category,
  SUM(Valuation) AS valuation_sum,
  AVG(Valuation) AS valuation_mean
FROM
  unicorn
GROUP BY
  Category
ORDER BY
  valuation_mean DESC
"""

spark.sql(query).show(truncate=False)

+-----------------------------------+------------------+------------------+
|Category                           |valuation_sum     |valuation_mean    |
+-----------------------------------+------------------+------------------+
|Finttech                           |10.0              |10.0              |
|Other                              |245.46            |4.812941176470589 |
|Artificial intelligence            |308.14            |4.465797101449275 |
|Fintech                            |726.39            |3.9477717391304346|
|Data management & analytics        |120.7             |3.448571428571429 |
|Auto & transportation              |101.35000000000001|3.3783333333333334|
|Edtech                             |85.37             |3.161851851851852 |
|Hardware                           |98.59             |3.0809375         |
|Internet software & services       |479.7099999999999 |2.9250609756097554|
|E-commerce & direct-to-consumer    |283.11999999999995|2.918762886597938 |
|Consumer & 

## Distinct values

Count distinct values 

In [26]:
df.select(F.countDistinct('Category')).distinct().show()

+------------------------+
|count(DISTINCT Category)|
+------------------------+
|                      17|
+------------------------+



Show distinct values

In [27]:
df.select('Category').distinct().show(truncate=False)

+-----------------------------------+
|Category                           |
+-----------------------------------+
|Cybersecurity                      |
|E-commerce & direct-to-consumer    |
|Artificial intelligence            |
|Travel                             |
|Health                             |
|Fintech                            |
|Edtech                             |
|Data management & analytics        |
|Auto & transportation              |
|Other                              |
|Supply chain, logistics, & delivery|
|Mobile & telecommunications        |
|Finttech                           |
|Artificial Intelligence            |
|Internet software & services       |
|Hardware                           |
|Consumer & retail                  |
+-----------------------------------+



Count and show distinct values as an array

In [28]:
df.select(F.countDistinct('Category'), F.collect_set('Category')).show()

+------------------------+---------------------+
|count(DISTINCT Category)|collect_set(Category)|
+------------------------+---------------------+
|                      17| [Cybersecurity, H...|
+------------------------+---------------------+



Drop duplicates

In [29]:
df.drop_duplicates(subset=['Category']).show()

+--------------------+---------+----------+--------------+--------------------+--------------------+
|             Company|Valuation|Date_Added|       Country|            Category|    Select_Investors|
+--------------------+---------+----------+--------------+--------------------+--------------------+
|               Owkin|      1.0|2021-11-18| United States|Artificial Intell...|Google Ventures, ...|
|               Faire|     12.4|2019-10-30| United States|Artificial intell...|Khosla Ventures, ...|
|             ENOVATE|     1.85|2019-04-15|         China|Auto & transporta...|Automobile Indust...|
|             BrewDog|     1.24|2017-04-10|United Kingdom|   Consumer & retail|TSG Consumer Part...|
|              Stytch|      1.0|2021-11-18| United States|       Cybersecurity|Index Ventures, B...|
|            Cohesity|      3.7|2018-06-11| United States|Data management &...|SoftBank Group, S...|
|             Mia.com|      1.0|2015-09-08|         China|E-commerce & dire...|Sequoia Capi

## Join

In [30]:
df.join(df_country, on='Country').show()

+-------------+--------------+---------+----------+--------------------+--------------------+---------------+--------+--------------------+--------+-------------+
|      Country|       Company|Valuation|Date_Added|            Category|    Select_Investors|ISO_Alpha3_Code|M49_Code|            Region_1|Region_2|    Continent|
+-------------+--------------+---------+----------+--------------------+--------------------+---------------+--------+--------------------+--------+-------------+
|        China|       Enflame|     1.24|2021-01-05|            Hardware|Tencent Holdings,...|            CHN|     156|        Eastern Asia|    null|         Asia|
|United States|Human Interest|      1.0|2021-08-04|             Fintech|Wing Venture Capi...|            USA|     840|    Northern America|    null|North America|
|United States|     Attentive|     5.99|2020-09-23|Mobile & telecomm...|NextView Ventures...|            USA|     840|    Northern America|    null|North America|
|United States|       

## -> Example Break point

**Exemplo utilizando todo conteúdo mostrado acima**

Considerando apenas empresas com avaliação maior que 1 bilhão de dólares, quantas empresas temos por continente? Estas empresas fazem parte de quantos países distintos no continente?

In [31]:
(
  df
 .where('Valuation>1')
 .join(df_country, on='Country')
 .groupby('Continent')
 .agg(
     F.countDistinct('Country').alias('n_country'),
     F.count('*').alias('total_observations')
 )
 .orderBy(F.desc('n_country'))
 .show()
)

+-------------+---------+------------------+
|    Continent|n_country|total_observations|
+-------------+---------+------------------+
|       Europe|       17|                83|
|         Asia|       14|               205|
|North America|        4|               394|
|South America|        4|                12|
|       Africa|        3|                 3|
|      Oceania|        1|                 4|
+-------------+---------+------------------+



## Add, update and rename columns

Add columns

In [32]:
df.withColumn('Year', F.year('Date_Added')).show(5)

+--------------+---------+----------+-------------+--------------------+--------------------+----+
|       Company|Valuation|Date_Added|      Country|            Category|    Select_Investors|Year|
+--------------+---------+----------+-------------+--------------------+--------------------+----+
|       Enflame|     1.24|2021-01-05|        China|            Hardware|Tencent Holdings,...|2021|
|Human Interest|      1.0|2021-08-04|United States|             Fintech|Wing Venture Capi...|2021|
|     Attentive|     5.99|2020-09-23|United States|Mobile & telecomm...|NextView Ventures...|2020|
|        Stytch|      1.0|2021-11-18|United States|       Cybersecurity|Index Ventures, B...|2021|
|  Huisuanzhang|     1.28|2021-05-24|        China|             Fintech|IDG Capital, Gaoc...|2021|
+--------------+---------+----------+-------------+--------------------+--------------------+----+
only showing top 5 rows



Add multiple columns with pyspark chain syntax *plus* `collect` (valuation_mean)

In [33]:
valuation_mean = df.select(F.mean('Valuation')).collect()[0][0]

(
  df
 .withColumn('Year', F.year('Date_Added'))
 .withColumn('Valuation_mean', F.lit(valuation_mean))
 .show(5)
)

+--------------+---------+----------+-------------+--------------------+--------------------+----+------------------+
|       Company|Valuation|Date_Added|      Country|            Category|    Select_Investors|Year|    Valuation_mean|
+--------------+---------+----------+-------------+--------------------+--------------------+----+------------------+
|       Enflame|     1.24|2021-01-05|        China|            Hardware|Tencent Holdings,...|2021|3.3116902944383892|
|Human Interest|      1.0|2021-08-04|United States|             Fintech|Wing Venture Capi...|2021|3.3116902944383892|
|     Attentive|     5.99|2020-09-23|United States|Mobile & telecomm...|NextView Ventures...|2020|3.3116902944383892|
|        Stytch|      1.0|2021-11-18|United States|       Cybersecurity|Index Ventures, B...|2021|3.3116902944383892|
|  Huisuanzhang|     1.28|2021-05-24|        China|             Fintech|IDG Capital, Gaoc...|2021|3.3116902944383892|
+--------------+---------+----------+-------------+-----

Rename column

In [34]:
df.withColumnRenamed('Date_Added', 'Date').show(5)

+--------------+---------+----------+-------------+--------------------+--------------------+
|       Company|Valuation|      Date|      Country|            Category|    Select_Investors|
+--------------+---------+----------+-------------+--------------------+--------------------+
|       Enflame|     1.24|2021-01-05|        China|            Hardware|Tencent Holdings,...|
|Human Interest|      1.0|2021-08-04|United States|             Fintech|Wing Venture Capi...|
|     Attentive|     5.99|2020-09-23|United States|Mobile & telecomm...|NextView Ventures...|
|        Stytch|      1.0|2021-11-18|United States|       Cybersecurity|Index Ventures, B...|
|  Huisuanzhang|     1.28|2021-05-24|        China|             Fintech|IDG Capital, Gaoc...|
+--------------+---------+----------+-------------+--------------------+--------------------+
only showing top 5 rows



## Filtering/detecting patterns

`lower`/`upper` + `contains`

In [35]:
(
  df
 .filter(F.lower('Category').contains('art'))
 .select('Category')
 .distinct()
 .show(truncate=False)
)

+-----------------------+
|Category               |
+-----------------------+
|Artificial intelligence|
|Artificial Intelligence|
+-----------------------+



Replace values using either `withColumn`/`F.regexp_replace` or `replace`

In [36]:
df = df.withColumn('Category', F.regexp_replace('Category', 'intelligence', 'Intelligence'))
# df.replace({'Artificial intelligence': 'Artificial Intelligence'}, subset=['Category'])

In [37]:
df.select('Category').distinct().show(truncate=False)

+-----------------------------------+
|Category                           |
+-----------------------------------+
|Cybersecurity                      |
|E-commerce & direct-to-consumer    |
|Travel                             |
|Health                             |
|Fintech                            |
|Edtech                             |
|Data management & analytics        |
|Auto & transportation              |
|Other                              |
|Supply chain, logistics, & delivery|
|Mobile & telecommunications        |
|Finttech                           |
|Artificial Intelligence            |
|Internet software & services       |
|Hardware                           |
|Consumer & retail                  |
+-----------------------------------+



## Case when

In [38]:
(
  df
 .withColumn(
     'Valuation_category',
     F.when(F.col('Valuation')<10, '10')
     .when(F.col('Valuation')<20, '20')
     .when(F.col('Valuation')<30, '30')
     .when(F.col('Valuation')<40, '40')
     .when(F.col('Valuation')<50, '50')
     .otherwise('50+')
     )
 .show()
)

+--------------+---------+----------+-------------+--------------------+--------------------+------------------+
|       Company|Valuation|Date_Added|      Country|            Category|    Select_Investors|Valuation_category|
+--------------+---------+----------+-------------+--------------------+--------------------+------------------+
|       Enflame|     1.24|2021-01-05|        China|            Hardware|Tencent Holdings,...|                10|
|Human Interest|      1.0|2021-08-04|United States|             Fintech|Wing Venture Capi...|                10|
|     Attentive|     5.99|2020-09-23|United States|Mobile & telecomm...|NextView Ventures...|                10|
|        Stytch|      1.0|2021-11-18|United States|       Cybersecurity|Index Ventures, B...|                10|
|  Huisuanzhang|     1.28|2021-05-24|        China|             Fintech|IDG Capital, Gaoc...|                10|
|     Handshake|      1.5|2021-05-12|United States|Internet software...|Kleiner Perkins C...|   

## Window functions

![](https://sparkbyexamples.com/wp-content/uploads/2019/12/spark-sql-window-functions-768x435.jpg)

[Source](https://sparkbyexamples.com/spark/spark-sql-window-functions/): Spark by examples

In [39]:
w = Window().partitionBy('Country').orderBy(F.desc('Valuation'))

df_top3_by_country = (
  df
 .withColumn('rn', F.row_number().over(w))
 .where('rn <= 3')
 .select('Country','rn','Company','Category','Valuation')
 .orderBy('Country','rn')
)

df_top3_by_country.show(truncate=False)

+---------+---+--------------+-----------------------------------+---------+
|Country  |rn |Company       |Category                           |Valuation|
+---------+---+--------------+-----------------------------------+---------+
|Argentina|1  |Uala          |Fintech                            |2.45     |
|Australia|1  |Canva         |Internet software & services       |40.0     |
|Australia|2  |Airwallex     |Fintech                            |5.5      |
|Australia|3  |SafetyCulture |Internet software & services       |1.6      |
|Austria  |1  |BitPanda      |Fintech                            |4.11     |
|Austria  |2  |GoStudent     |Edtech                             |1.7      |
|Belgium  |1  |Collibra      |Data management & analytics        |5.25     |
|Belgium  |2  |Odoo          |Internet software & services       |2.3      |
|Bermuda  |1  |Afiniti       |Artificial Intelligence            |1.6      |
|Brazil   |1  |Nubank        |Fintech                            |30.0     |

`Lag` / `lead` functions

In [40]:
w = Window().partitionBy('Country').orderBy('rn')

(
  df_top3_by_country
 .withColumn('Valuation_lag', F.lag('Valuation').over(w))
 .withColumn('Valuation_lead', F.lead('Valuation').over(w))
 .orderBy('Country','rn')
 .drop('Company','Category')
 .show()
)

+---------+---+---------+-------------+--------------+
|  Country| rn|Valuation|Valuation_lag|Valuation_lead|
+---------+---+---------+-------------+--------------+
|Argentina|  1|     2.45|         null|          null|
|Australia|  1|     40.0|         null|           5.5|
|Australia|  2|      5.5|         40.0|           1.6|
|Australia|  3|      1.6|          5.5|          null|
|  Austria|  1|     4.11|         null|           1.7|
|  Austria|  2|      1.7|         4.11|          null|
|  Belgium|  1|     5.25|         null|           2.3|
|  Belgium|  2|      2.3|         5.25|          null|
|  Bermuda|  1|      1.6|         null|          null|
|   Brazil|  1|     30.0|         null|           5.1|
|   Brazil|  2|      5.1|         30.0|          5.05|
|   Brazil|  3|     5.05|          5.1|          null|
|   Canada|  1|      7.6|         null|          4.15|
|   Canada|  2|     4.15|          7.6|           4.0|
|   Canada|  3|      4.0|         4.15|          null|
|    Chile

## Pivot table

In [41]:
df_top3_by_country.groupby('Country').pivot('rn').agg(F.first('Company')).show(truncate=False)

+--------------+-----------+--------------------+--------------------------+
|Country       |1          |2                   |3                         |
+--------------+-----------+--------------------+--------------------------+
|Argentina     |Uala       |null                |null                      |
|Australia     |Canva      |Airwallex           |SafetyCulture             |
|Austria       |BitPanda   |GoStudent           |null                      |
|Belgium       |Collibra   |Odoo                |null                      |
|Bermuda       |Afiniti    |null                |null                      |
|Brazil        |Nubank     |QuintoAndar         |C6 Bank                   |
|Canada        |Dapper Labs|SSENSE              |PointClickCare            |
|Chile         |NotCo      |null                |null                      |
|China         |Bytedance  |Xiaohongshu         |Yuanfudao                 |
|Colombia      |Rappi      |LifeMiles           |null                      |

## Pandas API on Spark

In [42]:
df.pandas_api().head() # to_koalas, to_pandas_on_spark, pandas_api



Unnamed: 0,Company,Valuation,Date_Added,Country,Category,Select_Investors
0,Enflame,1.24,2021-01-05,China,Hardware,"Tencent Holdings, Delta Capital, Redpoint Vent..."
1,Human Interest,1.0,2021-08-04,United States,Fintech,"Wing Venture Capital, Slow Ventures, Uncork Ca..."
2,Attentive,5.99,2020-09-23,United States,Mobile & telecommunications,"NextView Ventures, Eniac Ventures, Sequoia Cap..."
3,Stytch,1.0,2021-11-18,United States,Cybersecurity,"Index Ventures, Benchmark, Thrive Capital"
4,Huisuanzhang,1.28,2021-05-24,China,Fintech,"IDG Capital, Gaocheng Capital, Chuanrong Capital"


In [43]:
df.pandas_api().groupby(['Country','Category'])[['Valuation']].sum().head(10)

Unnamed: 0_level_0,Unnamed: 1_level_0,Valuation
Country,Category,Unnamed: 2_level_1
India,"Supply chain, logistics, & delivery",14.68
Colombia,Other,1.15
United Kingdom,Cybersecurity,8.6
United States,E-commerce & direct-to-consumer,83.67
South Africa,Consumer & retail,1.59
India,Edtech,29.84
Norway,E-commerce & direct-to-consumer,2.2
South Korea,Fintech,8.4
Israel,Fintech,8.0
China,Edtech,33.83


### Plot using pandas API

In [44]:
df.groupby('Category').agg(F.sum('Valuation').alias('Valuation')).pandas_api().plot.bar(x='Category', y='Valuation')

## -> Example Break point

Calcule a soma das avaliações por ano e categoria, então faça um gráfico de linhas com o ano no eixo x e avaliação no eixo y, sendo uma linha por categoria.


In [45]:
(
  df
 .withColumn('Year', F.year('Date_Added'))
 .groupby('Year','Category')
 .sum('Valuation')
 .orderBy('Year','Category')
 .pandas_api()
 .plot.line(x='Year', y='sum(Valuation)', color='Category')
)

## Further functionalities

Coalesce - isNull/isNotNull - concat/concat_ws - split - getItem - explode

In [46]:
(
    df.join(df_country, on='Country')
    .withColumn('region_test', F.coalesce('Region_2', 'Region_1', F.lit('NA')))
    .withColumn('country_company', F.concat_ws('_', 'country','company'))
    .withColumn('country_company_split', F.split('country_company', '_'))
    .filter(F.col('Region_2').isNotNull())
    .withColumn('country_recovery', F.col('country_company_split').getItem(0))
    .withColumn('exploded', F.explode('country_company_split'))
    .show()
)

+---------+----------------+---------+----------+--------------------+--------------------+---------------+--------+---------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+
|  Country|         Company|Valuation|Date_Added|            Category|    Select_Investors|ISO_Alpha3_Code|M49_Code|       Region_1|            Region_2|    Continent|         region_test|     country_company|country_company_split|country_recovery|        exploded|
+---------+----------------+---------+----------+--------------------+--------------------+---------------+--------+---------------+--------------------+-------------+--------------------+--------------------+---------------------+----------------+----------------+
|   Brazil|          Movile|      1.0|2018-07-12|Mobile & telecomm...|Innova Capital - ...|            BRA|      76|  South America|Latin America and...|South America|Latin America and...|       Brazil_

## DataType-specific funtions

Please refer to [this link](https://sparkbyexamples.com/) to see the functions by data type (Spark SQL Functions).

## Date functions

Please refer to [this link](https://sparkbyexamples.com/spark/spark-sql-date-and-time-functions/) to see all possible date/timestamp functions.

In [47]:
(
  df
 .select(
     F.date_trunc('month', 'Date_Added'),
     F.month('Date_Added'),
     F.dayofyear('Date_Added'),
     F.lit(F.current_date())
  )
 .show(10)
)

+-----------------------------+-----------------+---------------------+--------------+
|date_trunc(month, Date_Added)|month(Date_Added)|dayofyear(Date_Added)|current_date()|
+-----------------------------+-----------------+---------------------+--------------+
|          2021-11-01 00:00:00|               11|                  306|    2022-08-11|
|          2021-11-01 00:00:00|               11|                  306|    2022-08-11|
|          2017-06-01 00:00:00|                6|                  180|    2022-08-11|
|          2018-12-01 00:00:00|               12|                  345|    2022-08-11|
|          2015-03-01 00:00:00|                3|                   90|    2022-08-11|
|          2021-02-01 00:00:00|                2|                   47|    2022-08-11|
|          2019-10-01 00:00:00|               10|                  296|    2022-08-11|
|          2020-10-01 00:00:00|               10|                  290|    2022-08-11|
|          2021-03-01 00:00:00|            

## Exemplos de perguntas

Quantos investidores distintos tem por continente-país? Ordene pela quantidade de investidores distintos de forma decrescente.

In [48]:
(
  df
 .join(df_country, on='Country')
 .withColumn('investors', F.split('Select_investors', ','))
 .withColumn('investors', F.explode('investors'))
 .groupby('Continent','Country')
 .agg(F.countDistinct('investors').alias('n_investors'))
 .orderBy(F.desc('n_investors'))
 .show()
)

+-------------+--------------+-----------+
|    Continent|       Country|n_investors|
+-------------+--------------+-----------+
|North America| United States|        717|
|         Asia|         China|        311|
|         Asia|         India|         91|
|       Europe|United Kingdom|         81|
|       Europe|       Germany|         57|
|         Asia|        Israel|         53|
|       Europe|        France|         46|
|North America|        Canada|         42|
|         Asia|     Singapore|         33|
|South America|        Brazil|         33|
|         Asia|   South Korea|         22|
|         Asia|         Japan|         18|
|         Asia|     Hong Kong|         18|
|         Asia|     Indonesia|         15|
|      Oceania|     Australia|         14|
|       Europe|        Sweden|         11|
|       Europe|   Netherlands|         10|
|North America|        Mexico|         10|
|       Europe|   Switzerland|          9|
|       Europe|        Norway|          9|
+----------