# Big Data: Como instalar o PySpark no Google Colab

Como instalar o PySpark no Google Colab é uma dúvida comum entre aqueles que estão migrando seus projetos de Data Science para ambientes na nuvem.

O termo Big Data está cada vez mais presente, e mesmo projetos pessoais podem assumir uma grande dimensionalidade devido à quantidade de dados disponíveis.

Para analisar grandes volumes de dados, Big Data, com velocidade, o Apache Spark é uma ferramenta muito utilizada, dada a sua capacidade de processamento de dados e computação paralela.

O Spark foi pensado para ser acessível, oferecendo diversas APIs e frameworks em Python, Scala, SQL e diversas outras linguagens.

## PySpark no Google Colab

PySpark é a interface alto nível que permite você conseguir acessar e usar o Spark por meio da linguagem Python. Usando o PySpark, você consegue escrever todo o seu código usando apenas o nosso estilo Python de escrever código.

Já o Google Colab é uma ferramenta incrível, poderosa e gratuita – com suporte de GPU inclusive. Uma vez que roda 100% na nuvem, você não tem a necessidade de instalar qualquer coisa na sua própria máquina.

No entanto, apesar da maioria das bibliotecas de Data Science estarem previamente instaladas no Colab, o mesmo não acontece com o PySpark. Para conseguir usar o PySpark é necessário alguns passos intermediários, que não são triviais para aqueles que estão começando.

Dessa maneira, preparei um tutorial simples e direto ensinando a instalar as dependências e a biblioteca.

## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.4.4 junto com o Hadoop 2.7.

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
#!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz

In [2]:
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [3]:
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

In [4]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [5]:
import findspark
findspark.add_packages('mysql:mysql-connector-java:8.0.29')

Baixar: https://dev.mysql.com/downloads/connector/j/
- plataforma independente
- arrasta o arquivo: mysql-connector-java-8.0.29.jar para o diretório abaixo

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
 
spark = SparkSession\
                    .builder\
                    .appName("Word Count")\
                    .config("spark.driver.extraClassPath", "/content/spark-2.4.4-bin-hadoop2.7/jars/mysql-connector-java-8.0.29.jar")\
                    .getOrCreate()

Essa ferramenta cria uma VPN, porque o colab "não acessa localmente"
- https://dashboard.ngrok.com/get-started/setup
  - ngrok config add-authtoken XXXXXXXXXXXXXXXXXXXXXXXXXXX
  - ngrok tcp 3306

## Comandos Iniciais - SPARK 

In [8]:
# READNING
df_vendas = spark.read.format("jdbc")\
                    .option("url", "jdbc:mysql://0.tcp.sa.ngrok.io:13833/db_aula?user=root&password=Abcd1234")\
                    .option("dbtable", "tbvendas")\
                    .option("driver","com.mysql.cj.jdbc.Driver")\
                    .load();

In [9]:
# verifica o schema da tabela
df_vendas.printSchema()

root
 |-- cdVen: integer (nullable = true)
 |-- dtVen: timestamp (nullable = true)
 |-- cdCli: integer (nullable = true)
 |-- cdPro: integer (nullable = true)
 |-- qtPro: integer (nullable = true)
 |-- cdVendedor: integer (nullable = true)



In [None]:
df_vendas.show() # o select *

+-----+-------------------+-----+-----+-----+----------+
|cdVen|              dtVen|cdCli|cdPro|qtPro|cdVendedor|
+-----+-------------------+-----+-----+-----+----------+
|    1|2022-04-23 12:40:43|    3|    1|   10|      null|
|    2|2022-04-23 12:40:59|    3|    2|   20|      null|
|    3|2022-04-23 12:41:01|    1|    3|   30|      null|
|    4|2022-04-23 12:42:01|    2| null| null|      null|
|    6|2013-02-02 00:00:00|    2|    2|   25|         1|
|    7|2019-03-18 08:00:00|    3|    2|   32|         1|
|    8|2021-03-18 08:00:00|    3|    2|   32|         1|
|    9|2022-04-27 21:25:28|    3|    2|   20|         1|
|   10|2013-05-12 01:00:00|    2|    2|   25|         1|
+-----+-------------------+-----+-----+-----+----------+



In [None]:
df_vendas.collect() # o select *

[Row(cdVen=1, dtVen=datetime.datetime(2022, 4, 23, 12, 40, 43), cdCli=3, cdPro=1, qtPro=10, cdVendedor=None),
 Row(cdVen=2, dtVen=datetime.datetime(2022, 4, 23, 12, 40, 59), cdCli=3, cdPro=2, qtPro=20, cdVendedor=None),
 Row(cdVen=3, dtVen=datetime.datetime(2022, 4, 23, 12, 41, 1), cdCli=1, cdPro=3, qtPro=30, cdVendedor=None),
 Row(cdVen=4, dtVen=datetime.datetime(2022, 4, 23, 12, 42, 1), cdCli=2, cdPro=None, qtPro=None, cdVendedor=None),
 Row(cdVen=6, dtVen=datetime.datetime(2013, 2, 2, 0, 0), cdCli=2, cdPro=2, qtPro=25, cdVendedor=1),
 Row(cdVen=7, dtVen=datetime.datetime(2019, 3, 18, 8, 0), cdCli=3, cdPro=2, qtPro=32, cdVendedor=1),
 Row(cdVen=8, dtVen=datetime.datetime(2021, 3, 18, 8, 0), cdCli=3, cdPro=2, qtPro=32, cdVendedor=1),
 Row(cdVen=9, dtVen=datetime.datetime(2022, 4, 27, 21, 25, 28), cdCli=3, cdPro=2, qtPro=20, cdVendedor=1),
 Row(cdVen=10, dtVen=datetime.datetime(2013, 5, 12, 1, 0), cdCli=2, cdPro=2, qtPro=25, cdVendedor=1)]

In [None]:
# escolhendo só uma coluna e mostrando apenas 3 registros
df_vendas.select('cdven').show(3)

+-----+
|cdven|
+-----+
|    3|
|    4|
|    6|
+-----+
only showing top 3 rows



## Criando uma view para executar comandos padrão SQL

In [None]:
df_vendas.createOrReplaceTempView("vw_vendas")

In [None]:
spark.sql("SELECT * FROM vw_vendas").show()

+-----+-------------------+-----+-----+-----+----------+
|cdVen|              dtVen|cdCli|cdPro|qtPro|cdVendedor|
+-----+-------------------+-----+-----+-----+----------+
|    1|2022-04-23 12:40:43|    3|    1|   10|      null|
|    2|2022-04-23 12:40:59|    3|    2|   20|      null|
|    3|2022-04-23 12:41:01|    1|    3|   30|      null|
|    4|2022-04-23 12:42:01|    2| null| null|      null|
|    6|2013-02-02 00:00:00|    2|    2|   25|         1|
|    7|2019-03-18 08:00:00|    3|    2|   32|         1|
|    8|2021-03-18 08:00:00|    3|    2|   32|         1|
|    9|2022-04-27 21:25:28|    3|    2|   20|         1|
|   10|2013-05-12 01:00:00|    2|    2|   25|         1|
+-----+-------------------+-----+-----+-----+----------+



## Carregando outros tabelas e criando view

In [None]:
# Tabela de Clientes
df_cliente = spark.read.format("jdbc")\
                    .option("url", "jdbc:mysql://0.tcp.sa.ngrok.io:16784/db_aula?user=root&password=Abcd1234")\
                    .option("dbtable", "tbcliente")\
                    .option("driver","com.mysql.cj.jdbc.Driver")\
                    .load();

In [None]:
df_cliente.createOrReplaceTempView("vw_cliente")

In [None]:
spark.sql("SELECT * FROM vw_cliente").show()

+-----+-----------+---------+--------------+
|cdCli|      dsCPF|    nmCli|        estado|
+-----+-----------+---------+--------------+
|    1|11111111111|Cliente 1|         Ceará|
|    2|11111111112|Cliente 2|         Ceará|
|    3|11111111113|Cliente 3|     São Paulo|
|    4|11111111114|Cliente 4|Rio de Janeiro|
+-----+-----------+---------+--------------+



## Comandos SQL - do SPARK

In [None]:
# testar
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.rlike.html

# Seleciona os registros que termina com "rá", como se foss o like
df_cliente.filter(df_cliente.estado.rlike('rá$')).collect()

[Row(cdCli=1, dsCPF=11111111111, nmCli='Cliente 1', estado='Ceará'),
 Row(cdCli=2, dsCPF=11111111112, nmCli='Cliente 2', estado='Ceará')]

## Joins

In [None]:
# https://luminousmen.com/post/introduction-to-pyspark-join-types
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.crossJoin.html#pyspark.sql.DataFrame.crossJoin

# testar adicionando outra tabela (clientes)
df_vendas.select("cdVen", "cdPro", "qtPro", "cdCli").collect()
#[Row(age=2, name='Alice'), Row(age=5, name='Bob')]

[Row(cdVen=1, cdPro=1, qtPro=10, cdCli=3),
 Row(cdVen=2, cdPro=2, qtPro=20, cdCli=3),
 Row(cdVen=3, cdPro=3, qtPro=30, cdCli=1),
 Row(cdVen=4, cdPro=None, qtPro=None, cdCli=2),
 Row(cdVen=6, cdPro=2, qtPro=25, cdCli=2),
 Row(cdVen=7, cdPro=2, qtPro=32, cdCli=3),
 Row(cdVen=8, cdPro=2, qtPro=32, cdCli=3),
 Row(cdVen=9, cdPro=2, qtPro=20, cdCli=3),
 Row(cdVen=10, cdPro=2, qtPro=25, cdCli=2)]

In [None]:
df_cliente.select("nmcli", "estado").collect()
#[Row(name='Tom', height=80), Row(name='Bob', height=85)]

[Row(nmcli='Cliente 1', estado='Ceará'),
 Row(nmcli='Cliente 2', estado='Ceará'),
 Row(nmcli='Cliente 3', estado='São Paulo'),
 Row(nmcli='Cliente 4', estado='Rio de Janeiro')]

INNER JOIN

In [None]:
df_vendas.join(df_cliente, on = 'cdcli', how='inner').show()
#[Row(age=2, name='Alice', height=80), Row(age=2, name='Alice', height=85),
# Row(age=5, name='Bob', height=80), Row(age=5, name='Bob', height=85)]

+-----+-----+-------------------+-----+-----+----------+-----------+---------+---------+
|cdCli|cdVen|              dtVen|cdPro|qtPro|cdVendedor|      dsCPF|    nmCli|   estado|
+-----+-----+-------------------+-----+-----+----------+-----------+---------+---------+
|    1|    3|2022-04-23 12:41:01|    3|   30|      null|11111111111|Cliente 1|    Ceará|
|    3|    1|2022-04-23 12:40:43|    1|   10|      null|11111111113|Cliente 3|São Paulo|
|    3|    2|2022-04-23 12:40:59|    2|   20|      null|11111111113|Cliente 3|São Paulo|
|    3|    7|2019-03-18 08:00:00|    2|   32|         1|11111111113|Cliente 3|São Paulo|
|    3|    8|2021-03-18 08:00:00|    2|   32|         1|11111111113|Cliente 3|São Paulo|
|    3|    9|2022-04-27 21:25:28|    2|   20|         1|11111111113|Cliente 3|São Paulo|
|    2|    4|2022-04-23 12:42:01| null| null|      null|11111111112|Cliente 2|    Ceará|
|    2|    6|2013-02-02 00:00:00|    2|   25|         1|11111111112|Cliente 2|    Ceará|
|    2|   10|2013-05-

CROSS JOIN

In [None]:
df_vendas.crossJoin(df_cliente.select("nmcli", "estado")).select("CdVen", "cdPro", "qtPro", "nmcli", "estado").collect()
#[Row(age=2, name='Alice', height=80), Row(age=2, name='Alice', height=85),
# Row(age=5, name='Bob', height=80), Row(age=5, name='Bob', height=85)]

[Row(CdVen=1, cdPro=1, qtPro=10, nmcli='Cliente 1', estado='Ceará'),
 Row(CdVen=1, cdPro=1, qtPro=10, nmcli='Cliente 2', estado='Ceará'),
 Row(CdVen=1, cdPro=1, qtPro=10, nmcli='Cliente 3', estado='São Paulo'),
 Row(CdVen=1, cdPro=1, qtPro=10, nmcli='Cliente 4', estado='Rio de Janeiro'),
 Row(CdVen=2, cdPro=2, qtPro=20, nmcli='Cliente 1', estado='Ceará'),
 Row(CdVen=2, cdPro=2, qtPro=20, nmcli='Cliente 2', estado='Ceará'),
 Row(CdVen=2, cdPro=2, qtPro=20, nmcli='Cliente 3', estado='São Paulo'),
 Row(CdVen=2, cdPro=2, qtPro=20, nmcli='Cliente 4', estado='Rio de Janeiro'),
 Row(CdVen=3, cdPro=3, qtPro=30, nmcli='Cliente 1', estado='Ceará'),
 Row(CdVen=3, cdPro=3, qtPro=30, nmcli='Cliente 2', estado='Ceará'),
 Row(CdVen=3, cdPro=3, qtPro=30, nmcli='Cliente 3', estado='São Paulo'),
 Row(CdVen=3, cdPro=3, qtPro=30, nmcli='Cliente 4', estado='Rio de Janeiro'),
 Row(CdVen=4, cdPro=None, qtPro=None, nmcli='Cliente 1', estado='Ceará'),
 Row(CdVen=4, cdPro=None, qtPro=None, nmcli='Cliente 2', es

## Lendo um CSV
- neste caso da web

In [None]:
# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

# download do http para arquivo local
!wget --quiet --show-progress https://github.com/databricks/spark-csv/raw/master/src/test/resources/cars.csv

# carregar dados do Airbnb
df_spark = sc.read.csv("./cars.csv", inferSchema=True, header=True)

# ver algumas informações sobre os tipos de dados de cada coluna
df_spark.printSchema()

root
 |-- year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)



In [None]:
df_spark.collect() # o select *

[Row(year=2012, make='Tesla', model='S', comment='No comment', blank=None),
 Row(year=1997, make='Ford', model='E350', comment='Go get one now they are going fast', blank=None),
 Row(year=2015, make='Chevy', model='Volt', comment=None, blank=None)]

In [None]:
df_spark.createOrReplaceTempView("cars")

In [None]:
spark.sql("SELECT * FROM cars").show()

+----+-----+-----+--------------------+-----+
|year| make|model|             comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla|    S|          No comment| null|
|1997| Ford| E350|Go get one now th...| null|
|2015|Chevy| Volt|                null| null|
+----+-----+-----+--------------------+-----+



In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
df = pd.read_csv("./cars.csv")

In [None]:
df

Unnamed: 0,year,make,model,comment,blank
0,2012,Tesla,S,No comment,
1,1997,Ford,E350,Go get one now they are going fast,
2,2015,Chevy,Volt,,


## Lendo um CSV um pouco maior

In [None]:
# Montando o google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# carregar dados do Censo
df_spark1 = sc.read.csv("/content/drive/MyDrive/Colab Notebooks/Censo2020_inep.csv")

In [None]:
df_spark1.count() # contando as linhas

181280

SPARK SQL

In [None]:
# realizando o select
df_spark1.collect() # o select *

PANDAS

In [None]:
# https://medium.com/analytics-vidhya/execute-mysql-queries-10x-faster-simple-pyspark-tutorial-with-databricks-a39164550749
# consultando com pandas
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Censo2020_inep.csv", sep=";")

In [None]:
# realizando o "select"
df

## Big Data e Python

A biblioteca PySpark permite você criar seu servidor Apache Spark, trabalhar com grandes volumes de dados e até mesmo fazer streaming em tempo real.

Na minha opinião, o Spark é o melhor framework para trabalhar com Big Data. Tenha certeza que o PySpark vai te ajudar muito ao criar uma interface Python que permita a comunicação entre seu projeto e o servidor.

Neste artigo, o meu objetivo foi unicamente apresentar a biblioteca, além de ensinar como você pode instalá-la em um ambiente de nuvem gratuito, o Google Colab. Aproveite e comece a usar hoje mesmo 🙂

### Resources

[Pyspark Operations](https://hendra-herviawan.github.io/)

[Spark SQL string Functions](https://sparkbyexamples.com/spark/usage-of-spark-sql-string-functions/)