<a href="https://colab.research.google.com/github/VitorToni/PySpark/blob/main/Introducao_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Dependências**

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# Torna o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkFiles

import pandas as pd

# **Criar sessão e Importar CSV**

In [3]:
# iniciar uma sessão local 
#sc = SparkSession.builder.master('local[*]').getOrCreate()
spark = SparkSession.builder.appName("Introducao").getOrCreate()

In [None]:
print(spark)
print(spark.version)

In [None]:
#from google.colab import files
#files.upload()

In [139]:
# Adicionando csv ao Spark
# Obs:  Csv's usados são de repositórios públicos, caso não funcione, procure outra fonte (Não deve ser dificl, dado que estas bases são muito utilizadas pela comunidade)

spark.sparkContext.addFile('https://raw.githubusercontent.com/roberthryniewicz/datasets/master/airline-dataset/flights/flights.csv')

flights = spark.read.csv(SparkFiles.get("flights.csv"), 
               inferSchema = True,
               header = True)

In [None]:
print(spark.catalog.listTables())

In [None]:
print((flights.count(), len(flights.columns)))

In [None]:
flights.show(10)

In [None]:
flights.printSchema()

# **Criando Views e manipulando dados**

In [142]:
# Corrigindo formato da coluna

flights = flights.\
        withColumn("new_air_time", col("AirTime").cast("integer")).drop("AirTime").\
        withColumnRenamed("new_air_time","AirTime")

In [None]:
# Registrando o dataframe em uma view temporária
flights.createOrReplaceTempView("flights_v")

# Registtrando o dataframe como view global
flights.createGlobalTempView("flights_gv")

# A visão temporária global está vinculada a um banco de dados preservado pelo sistema `global_temp`
flight_counts = spark.sql("SELECT Origin, Dest, COUNT(*) as N FROM global_temp.flights_gv GROUP BY 1, 2")

# Transformando em uma DF
df = flight_counts.toPandas()
df

In [146]:
# Cria uma coluna com o resultado de uma operação
flights = flights.withColumn("duration_hrs", flights.AirTime / 60)

# **Selecting**

In [None]:
# Formas de Select
flights.select(flights.AirTime / 60).show()
# Poderia usar o ALIAS para renomear no select:
#flights.select((flights.AirTime / 60).alias("duration_hrs")).show()

flights.select("AirTime", "duration_hrs").show()

lista = ["AirTime", "duration_hrs"]
flights.select(lista).show()

flights.select(flights.AirTime, flights.duration_hrs).show()

# Pode usar o show(5) para limitar o retorno

# **Filtrando**

`.filter()` é uma contrapartida do Spark da cláusula `WHERE` do `SQL`.

In [None]:
flights.filter("AirTime > 120").show(5)
#flights.filter(flights.AirTime > 120).show(5)

flights.filter("AirTime >= 360 and Year = 2008").show()

In [None]:
filterA = flights.Origin == "PVD"
filterB = flights.Dest == "LAS"

# Filtrando de forma sequencial
flights.filter(filterA).filter(filterB).show(5)

# **Agregando**

In [None]:
# Achar a maior tempo de voo de SEA para outras cidades
flights.filter(flights.Origin == "SEA").groupBy().max("duration_hrs").show()

# Achar a menor distancia do voo de PDX para outras cidades
flights.filter(flights.Origin == "PDX").groupBy().min("distance").show()

# Duração Média dos Voos da compania delta
flights.filter(flights.UniqueCarrier == "XE").filter(flights.Origin == "DAY").groupBy().avg("AirTime").show()

# Tempo total em Horas no ar 
flights.groupBy().sum("duration_hrs").show()
# Caso deseje renomear:
#flights.groupBy().sum("duration_hrs")\
#  .withColumnRenamed("sum(duration_hrs)", "soma duration_hrs").show()

# **Join**

Parâmetros do `Join`:

*   Tabela que deseja juntar;
*   `On`, uma ou mais colunas;
*   Tipo de `Join`, por padrão é `Inner`, porém existe os seguintes tipos:  `inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftantie left_anti.`

In [228]:
# Obs:  Csv's usados são de repositórios públicos, caso não funcione, procure outra fonte (Não deve ser dificl, dado que estas bases são muito utilizadas pela comunidade)
spark.sparkContext.addFile('https://raw.githubusercontent.com/tidyverse/nycflights13/main/data-raw/airports.csv')

airports = spark.read.csv(SparkFiles.get("airports.csv"), 
               inferSchema = True,
               header = True)\
               .drop("tzone")\
               .withColumnRenamed("faa", "Dest")

In [None]:
# Juntando as tabelas para trazer as informações do aeroporto de destino

flights_with_airports = flights.join(airports, on="Dest", how="left")
flights_with_airports.show(5)

In [None]:
# O mesmo exemplo, porém inseriando alias, deixando as condições mais claras e selecionando as colunas desejadas

flights.alias("a").join(airports.alias("b"), 
                        col('a.Dest') == col('b.Dest'), 
                        "left")\
  .select(flights["*"], airports["name"])\
  .show(5)