<h1 style='font-size:40px'> Apache Spark's Structured APIs</h1>

<h2 style='font-size:30px'> Key Merits and Beneftis</h2>

In [7]:
# Criando o nosso primeiro DataFrame dentro do Spark.
# Vamos aproveitar a situação e fazer algumas operações.
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder.appName('Chapter3').getOrCreate()

# O método createDataFrame demanda uma lista de tuplas para que cada linha seja construída.
# Como segundo argumento, uma lista com o nome das colunas pode ser passada.
data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30),
                                ('TD', 35), ('Brooke', 25)],['name', 'age'])

data_df.show()

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+



In [9]:
# Agrupando os dados conforme o nome e extraindo a idade média.

# A sintaxe é um pouco diferente da do pandas, o método agg com um outro método de agregação como argumento deve acompahar 'groupBy'.
data_df.groupBy('name').agg(avg('age')).show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



<h2 style='font-size:30px'> The DataFrame API</h2>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            Os DataFrames do PySpark são como os do pandas. Têm estrutura de tabelas e permitem a nós fazermos operações sobre os seus dados.
        </li>
    </ul>
</div>

<h3 style='font-size:30px;font-style:italic'> Spark's Basic Data Types</h3>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            O Spark possui uma série de tipos de dados que podem ser designados às colunas dos DataFrames.
        </li>
    </ul>
</div>

<center> 
    <img src='data_types1.png'>
</center>

<h3 style='font-size:30px;font-style:italic'> Schemas and Creating DataFrames</h3>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            O <em> schema</em> de um DataFrame é a relação do nome de suas colunas com os seus respectivos tipos de dados.
        </li>
        <li> 
            O Spark é capaz de inferir o schema de uma tabela, mas isso pode ser demorado e demandar uma alta capacidade de processamento do computador. Por isso, é recomendável que nós façamos essa tarefa.
        </li>
    </ul>
</div>

In [15]:
# Vamos definir o schema do DataFrame que iremos construir.

# Do jeito mais formal. O jeito de se construir lembra o das Pipelines do scikit-learn.
from pyspark.sql.types import *
schema = StructType([
         StructField('author', StringType(), False),
         StructField('title', StringType(), False),
        StructField('pages', IntegerType(), False)])

In [24]:
# Do jeito mais simples.
schema = '''`Id` INT, `First` STRING, `Last` STRING, `URL` STRING, 
        `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>'''
data=[[1, 'Jules', 'Damji', 'https://www.oi1.com', '1/4/2016', 4535, ['twitter', 'LinkedIn']],
    [2, 'Brooke', 'Wenig', 'htps://oi2.com', '5/5/2018', 8908, ['twitter', 'LinkedIn']],
     [3, 'Tathagata', 'Das', 'htps://oi3.com', '5/12/2018', 10568,['twitter', 'FB']]]

# Criando o DataFrame.
blogs_df = spark.createDataFrame(data, schema)

# Pelo printSchema, é possível enxergar que as colunas têm o nome e o tipo de dados desejados.
blogs_df.printSchema(),blogs_df.show()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+---------+-----+-------------------+---------+-----+-------------------+
| Id|    First| Last|                URL|Published| Hits|          Campaigns|
+---+---------+-----+-------------------+---------+-----+-------------------+
|  1|    Jules|Damji|https://www.oi1.com| 1/4/2016| 4535|[twitter, LinkedIn]|
|  2|   Brooke|Wenig|     htps://oi2.com| 5/5/2018| 8908|[twitter, LinkedIn]|
|  3|Tathagata|  Das|     htps://oi3.com|5/12/2018|10568|      [twitter, FB]|
+---+---------+-----+-------------------+---------+-----+-------------------+



(None, None)

<h3 style='font-size:30px;font-style:italic'> Columns and Expressions </h3>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            Assim como no pandas e no SQL, os DataFrames Spark admitem que o usuário faça expressões com as suas colunas.
        </li>
    </ul>
</div>

In [54]:
from pyspark.sql.functions import *

# Os blogueiros têm mais de 1000 hits?
blogs_df.select('First',expr('Hits >1000')).show()

# Divida 'Hits' por 2;
blogs_df.select(expr('Hits /2')).show()

# Podemos fazer a mesma operação com o objeto 'col'.
blogs_df.select(col('Hits')/2).show()

# Uma nova coluna com nome e sobrenome do blogueiro.
blogs_df.withColumn('Full Name', concat(expr('First'), expr('Last'))).show()

# Reordenando o DF na ordem inversa dos ID's.
blogs_df.sort(col('Id').desc()).show()

+---------+-------------+
|    First|(Hits > 1000)|
+---------+-------------+
|    Jules|         true|
|   Brooke|         true|
|Tathagata|         true|
+---------+-------------+

+----------+
|(Hits / 2)|
+----------+
|    2267.5|
|    4454.0|
|    5284.0|
+----------+

+----------+
|(Hits / 2)|
+----------+
|    2267.5|
|    4454.0|
|    5284.0|
+----------+

+---+---------+-----+-------------------+---------+-----+-------------------+------------+
| Id|    First| Last|                URL|Published| Hits|          Campaigns|   Full Name|
+---+---------+-----+-------------------+---------+-----+-------------------+------------+
|  1|    Jules|Damji|https://www.oi1.com| 1/4/2016| 4535|[twitter, LinkedIn]|  JulesDamji|
|  2|   Brooke|Wenig|     htps://oi2.com| 5/5/2018| 8908|[twitter, LinkedIn]| BrookeWenig|
|  3|Tathagata|  Das|     htps://oi3.com|5/12/2018|10568|      [twitter, FB]|TathagataDas|
+---+---------+-----+-------------------+---------+-----+-------------------+----------

<h3 style='font-size:30px;font-style:italic'> Rows </h3>
<div> 
    <ul style='font-size:20px'> 
        <li> 
           Row é um objeto do pyspark que representa uma linha de DataFrame. Pode conter dados de diversas espécies.
        </li>
    </ul>
</div>

In [57]:
from pyspark.sql import Row
blog_row = Row(6, 'Reynold', 'Xin', 'htps://oi4.com', 255568, '3/2/2015', ['twitter', 'LinkedIn'])

# Podemos acessar o valor de uma das colunas da linha por meio de seu íindice.
blog_row[1]

'Reynold'

In [61]:
# Uma coleção de Rows pode dar origem a um DataFrame.
rows = [Row('Felipe', 'Veiga', 1),Row('Luiz', 'Villar', 2), Row('Eduardo', 'Rodrigues', 3)]
spark.createDataFrame(rows, '`First`STRING, `Last`STRING, `ID`INT').show()

+-------+---------+---+
|  First|     Last| ID|
+-------+---------+---+
| Felipe|    Veiga|  1|
|   Luiz|   Villar|  2|
|Eduardo|Rodrigues|  3|
+-------+---------+---+



<h3 style='font-size:30px;font-style:italic'> Common DataFrame Operations</h3>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            Com os conceitos de Column e Row apresentados, vamos nos atentar à leitura de fontes de dados prontas. Como o pandas, o Spark possui os métodos necessários para que isso seja feito.
        </li>
    </ul>
</div>

In [92]:
# Caso não queira definir todo o schema do DF, você pode definir o tamanho da amostra dos dados os quais o Spark 
# baseará a sua escolha dos tipos de dados.
fire_df = spark.read.csv('Fire_Incidents.csv', samplingRatio=1e-1, header=True)

<div> 
    <ul style='font-size:20px'> 
        <li> 
            Não apenas conseguimos ler dados, como também é possível criarmos arquivos de diversas extensões a partir de um DataFrame.
        </li>
        <li> 
            O Spark, por padrão, converte a tabela para um arquivo Parquet. A vantagem desse formato é que os data types são preservados em seu metadados, tornando desnecessária a definição de um schema caso o documento seja lido futuramente.
        </li>
    </ul>
</div>

<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> Projections and Filters</h4>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            A consulta de dados pertencentes a um DataFrame é inicializada a partir de um "select". Caso queiramos filtrar os dados com base em um critério, é possível usar os métodos "where" e "filter".
        </li>
    </ul>
</div>

In [99]:
# Listando incêndios fatais.
few_fire_df =(fire_df
              .select('Incident Number', 'Fire Fatalities', 'Call Number')
              .where(col('Fire Fatalities')>1) ).show()

[Stage 180:>                                                        (0 + 3) / 3]

+---------------+---------------+-----------+
|Incident Number|Fire Fatalities|Call Number|
+---------------+---------------+-----------+
|       11050532|              2|  111530113|
+---------------+---------------+-----------+



                                                                                

In [107]:
# Vamos contar o número de ocorrências por bairro de São Francisco.
from pyspark.sql.functions import *

# Agrupando os dados de acordo com o bairro e contando o número de 'Incident Number''s distintos.
# Em cima disso, escolhi conferir um outro nome à 'Incident Number'.
(fire_df
        .select('neighborhood_district', 'Incident Number')
        .where(col('neighborhood_district').isNotNull())
        .groupBy('neighborhood_district')
        .agg(countDistinct('Incident Number').alias('Number of Incidents'))).show()



+---------------------+-------------------+
|neighborhood_district|Number of Incidents|
+---------------------+-------------------+
|         Inner Sunset|              10970|
|       Haight Ashbury|               9528|
|         Lincoln Park|                800|
|            Japantown|               3828|
|          North Beach|              11949|
|    Lone Mountain/USF|               7869|
|     Western Addition|              24058|
|       Bernal Heights|              11807|
|          Mission Bay|               8921|
|         Hayes Valley|              15124|
| Financial Distric...|              47324|
|            Lakeshore|               9950|
| Bayview Hunters P...|              41135|
|    Visitacion Valley|               6106|
|       Inner Richmond|               8318|
|             Nob Hill|              20141|
| Oceanview/Merced/...|               8792|
|       Outer Richmond|              14941|
|      Treasure Island|               3212|
|            Chinatown|         

                                                                                

<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> Renaming, adding and dropping columns</h4>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            Aqui, faremos algumas operações comuns em análise de dados.
        </li>
    </ul>
</div>

In [112]:
# Renaming

# Vamos dar um outro nome à coluna 'Incident Number'.
(fire_df.withColumnRenamed('Incident Number', 'Incident no')
        .select('Incident no')).show(5)

+-----------+
|Incident no|
+-----------+
|    8028304|
|    8028303|
|    8028309|
|    8028314|
|    8028319|
+-----------+
only showing top 5 rows



In [150]:
fire_df.printSchema()

root
 |-- Incident Number: string (nullable = true)
 |-- Exposure Number: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Incident Date: string (nullable = true)
 |-- Call Number: string (nullable = true)
 |-- Alarm DtTm: string (nullable = true)
 |-- Arrival DtTm: string (nullable = true)
 |-- Close DtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Suppression Units: string (nullable = true)
 |-- Suppression Personnel: string (nullable = true)
 |-- EMS Units: string (nullable = true)
 |-- EMS Personnel: string (nullable = true)
 |-- Other Units: string (nullable = true)
 |-- Other Personnel: string (nullable = true)
 |-- First Unit On Scene: string (nullable = true)
 |-- Estimated Property Loss: string (nullable = true)
 |-- Estimated Contents Loss: str

In [163]:
fire_df.select('Alarm DtTm').show(2)

+-------------------+
|         Alarm DtTm|
+-------------------+
|2008-04-01T18:06:37|
|2008-04-01T18:00:52|
+-------------------+
only showing top 2 rows



In [167]:
# Alteração de datatypes.

# Vamos usar métodos do pyspark para converter o tipo de uma coluna para timestamp.
((fire_df
        .withColumn('New', to_timestamp(col('Alarm DtTm'), format='yyyy-MM-ddhh:mm:ss')))
        .select(year('New'))).show()

+---------+
|year(New)|
+---------+
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
|     null|
+---------+
only showing top 20 rows



<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> Aggregations</h4>
<div> 
    <ul style='font-size:20px'> 
        <li> 
            Usando o groupBy e outras funções estatísticas.
        </li>
    </ul>
</div>

In [175]:
# Quais são as situações primárias mais comuns?
(fire_df
        .select('Primary Situation')
        .where((col('Incident Number').isNotNull())&(col('Primary Situation').isNotNull()))
        .groupBy('Primary Situation')
        .agg(count('Primary Situation').alias('Number of Occurences'))
        .orderBy('Number of Occurences',ascending=False)).show()

[Stage 248:>                                                        (0 + 4) / 4]

+--------------------+--------------------+
|   Primary Situation|Number of Occurences|
+--------------------+--------------------+
|711 - Municipal a...|               53324|
|700 False alarm o...|               30973|
|700 - False alarm...|               22254|
|745 - Alarm syste...|               19571|
|735 - Alarm syste...|               15314|
|711 Municipal ala...|               14130|
|745 Alarm system ...|               13974|
|311 - Medical ass...|               12807|
|500 Service Call,...|               12525|
|735 Alarm system ...|               12075|
|113 - Cooking fir...|               12046|
|743 Smoke detecto...|               11215|
|740 - Unintention...|               11155|
|322 Motor vehicle...|               10862|
|322 - Vehicle acc...|               10017|
|733 - Smoke detec...|                8683|
|500 - Service Cal...|                8624|
|118 - Trash or ru...|                8612|
|651 - Smoke scare...|                7642|
|  554 Assist invalid|          

                                                                                

<div> 
    <ul style='font-size:20px'> 
        <li> 
            Note que é possível coletar múltiplas estatísticas em uma única query.
        </li>
        <li> 
            Antes de fazermos a próxima query, observe que muitos dos métodos matemáticos do PySpark têm nomes iguais aos daqueles próprios do Python. Por isso, evite o comando "from pyspark.sql.functions import *".
        </li>
    </ul>
</div>

In [182]:
import pyspark.sql.functions as F

# Medindo o número médio de unidades de supressão usadas e a quantidade máxima de civis lesionados registrados 
# em uma única ocorrência.
(fire_df
        .select(F.avg('Suppression Units'), F.max('Civilian Injuries'))).show()

[Stage 258:>                                                        (0 + 4) / 4]

+----------------------+----------------------+
|avg(Suppression Units)|max(Civilian Injuries)|
+----------------------+----------------------+
|     2.524187052404998|                     9|
+----------------------+----------------------+



                                                                                

<h4 style='font-size:30px;font-style:italic;text-decoration:underline'> The Catalyst Optimizer</h4>
<div> 
    <ul style='font-size:20px'> 
        <li> 
           O Catalyst Optimizer é um dos principais componentes da engine SQL do Spark. Esse é responsável por otimizar os processos de uma query.
        </li>
    </ul>
</div>

In [185]:
# Para visualizar os processos executados em uma query, basta utilizar 'explain(True)'
gp = (fire_df
        .select('Primary Situation')
        .where((col('Incident Number').isNotNull())&(col('Primary Situation').isNotNull()))
        .groupBy('Primary Situation')
        .agg(count('Primary Situation').alias('Number of Occurences'))
        .orderBy('Number of Occurences',ascending=False))

gp.explain(True)

== Parsed Logical Plan ==
'Sort ['Number of Occurences DESC NULLS LAST], true
+- Aggregate [Primary Situation#5780], [Primary Situation#5780, count(Primary Situation#5780) AS Number of Occurences#11908L]
   +- Project [Primary Situation#5780]
      +- Filter (isnotnull(Incident Number#5752) AND isnotnull(Primary Situation#5780))
         +- Project [Primary Situation#5780, Incident Number#5752]
            +- Relation [Incident Number#5752,Exposure Number#5753,ID#5754,Address#5755,Incident Date#5756,Call Number#5757,Alarm DtTm#5758,Arrival DtTm#5759,Close DtTm#5760,City#5761,zipcode#5762,Battalion#5763,Station Area#5764,Box#5765,Suppression Units#5766,Suppression Personnel#5767,EMS Units#5768,EMS Personnel#5769,Other Units#5770,Other Personnel#5771,First Unit On Scene#5772,Estimated Property Loss#5773,Estimated Contents Loss#5774,Fire Fatalities#5775,... 40 more fields] csv

== Analyzed Logical Plan ==
Primary Situation: string, Number of Occurences: bigint
Sort [Number of Occurences#1

<p style='color:red'>Capítulo 4

<a href='https://databricks.com/notebooks/gallery/SanFranciscoFireCallsAnalysis.html'> Link do dataset de chamados de incêndio</a>