<a href="https://colab.research.google.com/github/MarceloNevesDS/MarceloNevesDS/blob/main/C%C3%B3pia_de_Damos_lhe_as_boas_vindas_ao_Colaboratory.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Instalar as dependencias
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

# Torna PySpark "importável"
import findspark
findspark.init('spark-3.3.0-bin-hadoop3')


In [None]:
!ls -la /usr/lib/jvm/

total 28
drwxr-xr-x 1 root root 4096 Sep 19 18:20 .
drwxr-xr-x 1 root root 4096 Sep 14 13:37 ..
lrwxrwxrwx 1 root root   25 Feb 20  2019 default-java -> java-1.11.0-openjdk-amd64
lrwxrwxrwx 1 root root   21 Jul 22 09:14 java-1.11.0-openjdk-amd64 -> java-11-openjdk-amd64
-rw-r--r-- 1 root root 2047 Jul 22 09:14 .java-1.11.0-openjdk-amd64.jinfo
drwxr-xr-x 9 root root 4096 Sep 14 13:37 java-11-openjdk-amd64
lrwxrwxrwx 1 root root   20 Jul 23 16:13 java-1.8.0-openjdk-amd64 -> java-8-openjdk-amd64
-rw-r--r-- 1 root root 2764 Jul 23 16:13 .java-1.8.0-openjdk-amd64.jinfo
drwxr-xr-x 7 root root 4096 Sep 19 18:20 java-8-openjdk-amd64


In [None]:
# Iniciar uma sessão local
from pyspark.sql import SparkSession
#sc = SparkSession.builder.master('local[*]').getOrCreate()
spark = SparkSession.builder.appName("Introducao").getOrCreate()


In [None]:
#Verify SparkContext
print(spark)

#Print Spark version
print(spark.version)


In [None]:
from google.colab import files
files.upload()

In [None]:
# Listar o catálogo Spark
print(spark.catalog.listTables())


In [None]:
!ls -la ./

In [None]:
# Importando arquivo flights
arquivo = "./sample_data/flights_small.csv"
flights = spark\
          .read.format("csv")\
          .option("inferSchema", "True")\
          .option("header", "True")\
          .csv(arquivo)


In [None]:
# Verificando o shape do pyspark dataframe
print((flights.count(), len(flights.columns)))

In [None]:
flights.show(10)

In [None]:
flights.printSchema()

In [None]:
from pyspark.sql.functions import col

In [None]:
flights = flights.\
          withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time")

In [None]:
flights.printSchema()

In [None]:
# Renomeando colunas
flights = flights.withColumnRenamed("new_air_time", "air_time")

In [None]:
### Removendo colunas de um dataframe pySpark   -->   flights = flights.drop("new_air_time")

In [None]:
flights.show(10)

In [None]:
# Registrando o dataframe em uma view temporária
flights.createOrReplaceTempView("flights")

query = "FROM flights SELECT * LIMIT 10"

# Selecionando as primeiras 10 linhas
flights10 = spark.sql(query)

# Imprimindo o resultado
flights10.show()


In [None]:
# Registrando o dataframe como view global
flights.createOrReplaceGlobalTempView("flights")

# A visão temporária global flights fica vinculada ao banco de dados preservado pelo sistema 'global_temp'
spark.sql("SELECT * FROM global_temp.flights LIMIT 10").show()


In [None]:
# usando GROUP BY
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Executando a query
flight_counts = spark.sql(query)

flight_counts.show(10)

In [None]:
# Convertendo o resultado pyspark para pandas
import pandas as pd
df = flight_counts.toPandas()
df.head()


# Convertendo a tabela inteira de pyspark para pandas
dfpd = flights.toPandas()
dfpd.head()

In [None]:
# incluindo nova coluna para registrar o tempo de voo em horas
flights = flights.withColumn("Durantion_hrs", flights.air_time / 60)

flights.show(10)

In [None]:
flights = flights.withColumnRenamed("Durantion_hrs", "air_time_hrs")
flights.show(10)

In [None]:
# Selecionando um conjunto de colunas do dataset
selected1 = flights.select("tailnum", "origin", "dest")
selected1.show(10)

In [None]:
# Filtrando resultados 1
flights.filter("air_time > 120").show()

In [None]:
# Filtrando resultados 2
flights.filter(flights.air_time > 120).show(5)

In [None]:
# Definindo o primeiro filtro
filtroA = flights.origin == "SEA"

# Definindo o segundo filtro
filtroB = flights.dest =="PDX"

# Filtrando os dados, primeiro pelo filtroA em seguida pelo filtroB
selected2 = selected1.filter(filtroA).filter(filtroB)

selected2.show()

In [None]:
# Agregando... Encontrando o voo com maior tempo partindo de Seattle
flights.filter(flights.origin == "SEA").groupby().max("air_time").show()

In [None]:
# Agregando... Encontrando o voo com a menor distancia partindo de Phoenix
flights.filter(flights.origin == "PDX").groupby().min("distance").show()

In [None]:
# Agregando... Duração média dos voos da companhia Delta que partem de Seattle
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupby().avg("air_time").show()

In [None]:
# Tempo total em horas voadas
flights.withColumn("duration_hrs", flights.air_time/60).groupby().sum("duration_hrs").show()

In [None]:
# pyspark functions
import pyspark.sql.functions as F

#group by por mes e destino
by_month_dest = flights.groupby("month", "dest")

# Desvio padrão

by_month_dest.agg(F.stddev("dep_delay")).show()

In [None]:
# JOIN

# Importando o arquivo airports
arquivo = "./sample_data/airports.csv"
airports = spark\
          .read.format("csv")\
          .option("inferSchema", "True")\
          .option("header", "True")\
          .csv(arquivo)


In [None]:
airports.show(10)

In [None]:
flights.show(10)

In [None]:
# Renomenao a coluna FAA
airports = airports.withColumnRenamed("faa", "dest")

In [None]:
# JOIN dos dataframes
flights_with_airports = flights.join(airports, on="dest", how="leftouter")

In [None]:
flights_with_airports.show()
