# Big Data: Como instalar o PySpark no Google Colab

Como instalar o PySpark no Google Colab é uma dúvida comum entre aqueles que estão migrando seus projetos de Data Science para ambientes na nuvem.

O termo Big Data está cada vez mais presente, e mesmo projetos pessoais podem assumir uma grande dimensionalidade devido à quantidade de dados disponíveis.

Para analisar grandes volumes de dados, Big Data, com velocidade, o Apache Spark é uma ferramenta muito utilizada, dada a sua capacidade de processamento de dados e computação paralela.

O Spark foi pensado para ser acessível, oferecendo diversas APIs e frameworks em Python, Scala, SQL e diversas outras linguagens.

## PySpark no Google Colab

PySpark é a interface alto nível que permite você conseguir acessar e usar o Spark por meio da linguagem Python. Usando o PySpark, você consegue escrever todo o seu código usando apenas o nosso estilo Python de escrever código.

Já o Google Colab é uma ferramenta incrível, poderosa e gratuita – com suporte de GPU inclusive. Uma vez que roda 100% na nuvem, você não tem a necessidade de instalar qualquer coisa na sua própria máquina.

No entanto, apesar da maioria das bibliotecas de Data Science estarem previamente instaladas no Colab, o mesmo não acontece com o PySpark. Para conseguir usar o PySpark é necessário alguns passos intermediários, que não são triviais para aqueles que estão começando.

Dessa maneira, preparei um tutorial simples e direto ensinando a instalar as dependências e a biblioteca.

## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.3.2 junto com o Hadoop 2.7.

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

import pyspark

In [3]:
#sc.stop()

In [4]:
#sc = pyspark.SparkContext.getOrCreate()
sc = pyspark.SparkContext(appName='BigDataTarefa2')

In [5]:
flights_file = '/content/flights.csv'
flights_RDD = sc.textFile(flights_file)

In [6]:
# a. Tempo total de voo de cada companhia
def TempoTotalVooCia(line):
    record = line.split(',')
    return (record[1],float(record[9]))

print(flights_RDD.map(TempoTotalVooCia).reduceByKey(lambda a,b: a+b).take(10))

[('99999', 1000.0), ('19930', 2001295.0), ('20409', 3009096.0), ('20366', 4142864.0), ('20437', 799733.0), ('19690', 449242.0), ('20355', 4194811.0), ('21171', 916098.0), ('19805', 6273011.0), ('19790', 7747256.0)]


In [7]:
# b. Destino mais visado
def DestinoMaisVisado(line):
    record = line.split(',')
    return (record[4],record[1])

tuple = flights_RDD.map(DestinoMaisVisado).collect()
tuple_RDD = sc.parallelize(tuple)
#print(tuple_RDD.countByKey())

maxKey = tuple_RDD.max()
print(f'Destino mais visado: {maxKey}')

Destino mais visado: ('YUM', '20304')


In [8]:
# c. Quais aeroportos cada companhia passou
def aeroportosOrigem(line):
    record = line.split(',')
    return (record[1],record[3])

def aeroportosDestino(line):
    record = line.split(',')
    return (record[1],record[4])

listaOrigem = flights_RDD.map(aeroportosOrigem).take(90000)
listaDestino = flights_RDD.map(aeroportosDestino).take(90000)

listaOrigem_RDD = sc.parallelize(listaOrigem)
listaDestino_RDD = sc.parallelize(listaDestino)

grouped_elementsDestino = listaOrigem_RDD.union(listaDestino_RDD).distinct().groupByKey().take(90000)
for key, values in grouped_elementsDestino:    
    print(key, list(values))

19930 ['DCA', 'EWR', 'BOS', 'PDX', 'FLL', 'ADQ', 'FAI', 'PSP', 'SNA', 'ONT', 'PHX', 'AUS', 'ATL', 'PHL', 'ANC', 'SCC', 'BRW', 'KTN', 'CDV', 'SIT', 'WRG', 'PSG', 'SAN', 'OAK', 'SMF', 'BUR', 'BLI', 'DFW', 'SLC', 'STL', 'SEA', 'LAX', 'ORD', 'MSP', 'JNU', 'YAK', 'OME', 'OTZ', 'SFO', 'LAS', 'DEN', 'IAH', 'KOA', 'ADK', 'MCO', 'BET', 'SJC', 'TUS', 'SAT', 'GEG', 'MCI', 'OGG', 'LIH', 'HNL']
20409 ['DCA', 'FLL', 'BOS', 'SJU', 'SWF', 'EWR', 'JAX', 'CLT', 'TPA', 'HOU', 'BTV', 'CHS', 'DTW', 'RIC', 'SAV', 'AUS', 'BQN', 'PVD', 'PHX', 'PDX', 'SRQ', 'PHL', 'BUF', 'STX', 'HPN', 'DFW', 'BWI', 'SAN', 'PWM', 'LGA', 'OAK', 'SMF', 'SLC', 'ABQ', 'BUR', 'STT', 'RSW', 'PBI', 'IAD', 'ORH', 'PIT', 'ORD', 'ROC', 'LAS', 'LGB', 'DEN', 'SEA', 'SFO', 'LAX', 'JFK', 'MCO', 'MSY', 'RDU', 'SYR', 'BDL', 'PSE', 'SJC']
21171 ['EWR', 'AUS', 'DCA', 'PSP', 'FLL', 'BOS', 'PDX', 'PHL', 'SAN', 'DFW', 'LAX', 'SFO', 'SEA', 'ORD', 'LAS', 'IAD', 'JFK', 'SJC', 'MCO']
19805 ['DFW', 'MIA', 'LGA', 'SLC', 'STL', 'BWI', 'SAN', 'ABQ', 'RNO',

In [9]:
# d. Vôo de maior distância de cada companhia
def DistanciasPorCia(line):
    record = line.split(',')
    return (record[1],float(record[10]))

distancias = flights_RDD.map(DistanciasPorCia).collect()
maxDist_RDD = sc.parallelize(distancias).reduceByKey(max).collect()
print(maxDist_RDD)

[('99999', 11.0), ('19930', 2874.0), ('20409', 2704.0), ('20366', 1389.0), ('20437', 2139.0), ('19690', 4983.0), ('20355', 2979.0), ('21171', 2704.0), ('19805', 3784.0), ('19790', 4502.0), ('20436', 1703.0), ('20398', 1379.0), ('20304', 1535.0), ('19977', 4962.0), ('19393', 2335.0)]


In [10]:
# e. Qual é o vôo mais frequente de cada companhia
def voos(line):
    record = line.split(',')
    return (record[1],record[3]+record[4])

lista = flights_RDD.map(voos).collect()
lista_RDD = sc.parallelize(lista)

grouped_elements = lista_RDD.groupByKey().collect()
qtdVoosPorCia = lista_RDD.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).groupByKey().collect()
print(qtdVoosPorCia)

for key, values in qtdVoosPorCia:
    print(key, list(values))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
('20366', 'ATLPIT') [26]
('20366', 'ATLVPS') [31]
('20366', 'BUFLGA') [46]
('20366', 'DTWFWA') [29]
('20366', 'ATWATL') [58]
('20366', 'FLLTLH') [24]
('20366', 'ATLABY') [84]
('20366', 'ATLJAN') [25]
('20366', 'MSNDTW') [24]
('20366', 'ATLOMA') [31]
('20366', 'OMAATL') [30]
('20366', 'MYRATL') [39]
('20366', 'ATLTYS') [29]
('20366', 'XNADTW') [25]
('20366', 'GRRDTW') [30]
('20366', 'STLMSP') [26]
('20366', 'EYWATL') [30]
('20366', 'LGADTW') [1]
('20366', 'DTWMDT') [29]
('20366', 'MSPBNA') [28]
('20366', 'TVCDTW') [24]
('20366', 'IAHGRK') [77]
('20366', 'FOEORD') [54]
('20366', 'GRKIAH') [77]
('20366', 'BTVIAD') [56]
('20366', 'IADBDL') [56]
('20366', 'CHSIAD') [111]
('20366', 'SYRIAD') [92]
('20366', 'IADJFK') [114]
('20366', 'JFKIAD') [115]
('20366', 'SAVIAD') [117]
('20366', 'ORDBDL') [59]
('20366', 'BUFORD') [82]
('20366', 'ATWORD') [88]
('20366', 'ORDATW') [88]
('20366', 'ORFMSP') [26]
('20366', 'LGARDU') [26]
('20366

In [11]:
# Quantidade de cada vôo por companhia
def voos(line):
    record = line.split(',')
    return (record[1],record[3]+record[4])

lista = flights_RDD.map(voos).collect()
lista_RDD = sc.parallelize(lista)

grouped_elements = lista_RDD.groupByKey().collect()

print(lista_RDD.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).collect())

[(('99999', 'ABCCSL'), 2), (('99999', 'DEFABC'), 1), (('19805', 'LAXJFK'), 322), (('19805', 'JFKLAX'), 323), (('19805', 'OGGDFW'), 42), (('19805', 'DFWOGG'), 42), (('19805', 'HNLDFW'), 59), (('19805', 'OGGLAX'), 85), (('19805', 'BOSORD'), 281), (('19805', 'ATLMIA'), 120), (('19805', 'DCAMIA'), 293), (('19805', 'BOSLAX'), 89), (('19805', 'ORDBOS'), 284), (('19805', 'LGAPBI'), 60), (('19805', 'PBILGA'), 59), (('19805', 'DCADFW'), 283), (('19805', 'SLCDFW'), 149), (('19805', 'STLDFW'), 233), (('19805', 'DFWBWI'), 145), (('19805', 'MIAPHX'), 60), (('19805', 'MIAIAD'), 60), (('19805', 'JFKSJU'), 60), (('19805', 'ORDPBI'), 60), (('19805', 'PBIORD'), 60), (('19805', 'ONTDFW'), 118), (('19805', 'LAXSFO'), 178), (('19805', 'SFOLAX'), 178), (('19805', 'DENDFW'), 292), (('19805', 'DFWDEN'), 294), (('19805', 'DENMIA'), 30), (('19805', 'MIADEN'), 30), (('19805', 'ORDPDX'), 60), (('19805', 'DENORD'), 1), (('19805', 'LASJFK'), 60), (('19805', 'DFWMCO'), 300), (('19805', 'ORDSEA'), 145), (('19805', 'L

In [12]:
# Vôo de cada companhia
def voos(line):
    record = line.split(',')
    return (record[1],record[3]+record[4])

lista = flights_RDD.map(voos).take(90000)
lista_RDD = sc.parallelize(lista)

grouped_elements = lista_RDD.distinct().groupByKey().take(90000)
for key, values in grouped_elements:    
    print(key, list(values))

99999 ['ABCCSL', 'DEFABC', 'ABCDEF']
19930 ['DCALAX', 'LAXDCA', 'SEAEWR', 'SEABOS', 'SEAORD', 'PDXBOS', 'PHLSEA', 'SEAPHL', 'FLLSEA', 'BOSPDX', 'ADQANC', 'SCCBRW', 'BRWANC', 'SCCANC', 'ANCFAI', 'BRWFAI', 'FAIANC', 'JNUKTN', 'KTNSEA', 'YAKCDV', 'JNUSIT', 'ANCJNU', 'JNUANC', 'KTNWRG', 'SEAKTN', 'ANCCDV', 'JNUSEA', 'KTNJNU', 'ANCSEA', 'SEAFAI', 'PDXANC', 'ANCORD', 'ORDANC', 'LAXANC', 'OMEANC', 'ANCOTZ', 'SEASFO', 'SJCSEA', 'SFOPSP', 'SEAOAK', 'OAKSEA', 'SMFSEA', 'PDXSFO', 'PDXSJC', 'LAXSEA', 'SNASEA', 'SEABUR', 'BURSEA', 'ONTSEA', 'LAXPDX', 'PDXLAX', 'PDXSNA', 'SNAPDX', 'SEALAS', 'PDXLAS', 'BLILAS', 'SEAPHX', 'LASBLI', 'PDXPHX', 'PDXDFW', 'DFWPDX', 'DFWSEA', 'SEAAUS', 'SEADEN', 'SEAGEG', 'PDXORD', 'SATSEA', 'SEASAT', 'SLCSEA', 'SEAIAH', 'ATLSEA', 'SEASTL', 'STLSEA', 'SEAMCI', 'PDXDCA', 'BOSSAN', 'KOASEA', 'BLIOGG', 'OGGSJC', 'SEALIH', 'OGGSMF', 'PDXHNL', 'HNLPDX', 'LIHPDX', 'OGGOAK', 'LIHOAK', 'SEAKOA', 'SANOGG', 'OAKOGG', 'SEAHNL', 'PDXKOA', 'HNLSEA', 'SEAOGG', 'OGGSEA', 'HNLANC', 'OAKLI

In [15]:
import string
import nltk
nltk.download('stopwords')
nltk.download('punkt')

shakespeare_file = '/content/shakespeare_teste.txt'

stopwords = nltk.corpus.stopwords.words('english')
punctuation = string.punctuation

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [16]:
def tokenizer(line):
    for token in nltk.word_tokenize(line):
        if token not in stopwords and token not in punctuation:
            yield token.lower()

shakespeare_RDD = sc.textFile(shakespeare_file)
tokens_RDD = shakespeare_RDD.flatMap(tokenizer)
key_tokens_RDD = tokens_RDD.map(lambda x: (x,1))
token_counts = key_tokens_RDD.reduceByKey(lambda a, b: a+b).sortByKey()

print(token_counts.take(5))

[('aqui', 3), ('está', 2), ('o', 1), ('trecho', 2), ('um', 1)]
