<a href="https://colab.research.google.com/github/Sissaz/pyspark/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PySpark no Google Colab**

*   Spark é uma plataforma para computação em cluster. O Spark permite que você distribua dados e faça cálculos em clusters com vários nós (pense em cada nó como um computador separado). 
*   Dividir seus dados facilita o trabalho com conjuntos de dados muito grandes porque cada nó funciona apenas com uma pequena quantidade de dados.



### Instalando o PySpark no Google Colab

Para instalar o PySpark no Google Colab é necessário instalar dependencias como o Java 8, Apache Spark e o Hadoop. 

In [10]:
# Instalando as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente para habilitar o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

In [11]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init('spark-3.3.2-bin-hadoop3')

### Criando uma SparkSession

A criação de vários SparkSessions e SparkContexts pode causar problemas, portanto, é uma prática recomendada usar o método SparkSession.builder.getOrCreate(). Isso retorna uma SparkSession se já houver uma no ambiente ou cria uma nova, se necessário!

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Introducao").getOrCreate()

In [13]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fb95085f520>


In [14]:
print(spark.version)

3.3.2


### Usando DataFrames



*   A estrutura de dados principal do Spark é o RDD, qual é um objeto que permite que o Spark divida os dados em vários nós no cluster.
*   Porém, como ele é dificil de trabalhar diretamente, usaremos o DataFrame.
*   O Spark DataFrame se comporta como uma tabela SQL, é mais facil de entender e mais otimizado para operações complicadas.
*   Uma das vantagens também da interface DataFrame é a possibilidade de executar consultas SQL nas tabelas em seu cluster Spark.



### Importando tabelas e fazendo queries

Iremos importar a tabela 'flights' qual contém uma linha para cada voo que saiu do Aeroporto Internacional de Portland (PDX) ou do Aeroporto Internacional de Seattle-Tacoma (SEA) em 2014 e 2015. 

In [16]:
from google.colab import files

try:
  uploaded_file = files.upload()
  # Verifica se o arquivo foi carregado corretamente
  if len(uploaded_file) > 0:
    print('Importação concluída com sucesso')
  else:
    print('Falha na importação')
except Exception as e:
  print('Falha na importação:', e)

Saving airports.csv to airports (1).csv
Saving planes.csv to planes (1).csv
Saving flights_small.csv to flights_small (1).csv
Importação concluída com sucesso


In [17]:
arquivo = "flights_small.csv"
flights = spark\
          .read.format("csv")\
          .option("inferSchema", "True")\
          .option("header", "True")\
          .csv(arquivo)

# '\' significa quebrar a linha e ler a proxima
# 'inferSchema' pede para o spark advinhar o tipo de dado
# 'header' seria o cabeçalho

In [18]:
# Verificando o Shape do PySpark DataFrame
print((flights.count(), len(flights.columns)))
# 10000 linhas e 16 colunas

(10000, 16)


In [19]:
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [20]:
# Verificando se o Spark inferiu corretamente os tipos dos dados
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [21]:
# Note que air_time foi inferido como string, porém, é um numero inteiro.

In [22]:
from pyspark.sql.functions import col

In [23]:
# Para alterar o tipo, será necessario criar uma nova coluna 'new_air_time' e defini-la como inteiro e excluir a coluna 'air_time'

In [24]:
flights = flights.\
          withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time")

In [25]:
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|new_air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|         132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|         360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|         111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|          83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|         127|
|2014|  

In [26]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- new_air_time: integer (nullable = true)



In [28]:
flights = flights.withColumnRenamed("new_air_time", "air_time")
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

### Temp View x Global Temp View
Uma visualização temporária é útil para consultas que precisam ser executadas apenas dentro de uma sessão Spark, enquanto uma view global é útil para consultas que precisam ser executadas em várias sessões Spark ou em diferentes aplicativos Spark.

In [32]:
# Registrando o DataFrame em uma View Temporária
flights.createOrReplaceTempView("flights")

print(spark.catalog.listTables())

query = "SELECT * FROM flights LIMIT 10"

flights10 = spark.sql(query)

flights10.show()

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS|

In [42]:
# Registrando o DataFrame em uma Global View
flights.createGlobalTempView("flights_global")

# A visão temporária global está vinculada a um banco de dados preservado pelo sistema `global_temp`
spark.sql("SELECT * FROM global_temp.flights_global LIMIT 10").show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
+----+-----+---+--------+---------+-----

In [41]:
# Para excluir uma view
spark.catalog.dropGlobalTempView("flights_global")

True