#Instalação, imports e configurações

In [5]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
#configurando variável de ambiente
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = '1'

#Imports

In [7]:
import pandas as pd
import numpy as np

import pyspark.pandas as ps #pyspark-on-pandas
from pyspark.sql import SparkSession #iniciando uma sessão Spark, permitindo acesso as funcionalidades Spark
from pyspark import SparkContext

In [8]:
#Criando um "pandas-on-spark" Dataframe
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
     index=[10, 20, 30, 40, 50, 60])

psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [9]:
#Criando um dataframe com datas de 6 meses
#formato year-month-day
dates = pd.date_range('20230101', periods=6)
dates

DatetimeIndex(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04',
               '2023-01-05', '2023-01-06'],
              dtype='datetime64[ns]', freq='D')

In [10]:
#Criando um dataframe com as datas como indíces
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf

Unnamed: 0,A,B,C,D
2023-01-01,-0.628268,-0.949927,-1.987397,0.845043
2023-01-02,-0.661848,1.317996,0.761608,0.814377
2023-01-03,0.791435,0.520978,0.786309,0.159898
2023-01-04,0.027629,0.092285,-0.916997,-0.76118
2023-01-05,-0.21674,0.897644,0.669499,-0.583765
2023-01-06,-0.007615,-2.112817,1.059433,-0.95098


In [11]:
#Convertendo um df pandas em um "pandas-on-Spark" dataframe
psdf_1 = ps.from_pandas(pdf)
print("Tipo:", type(psdf_1))

psdf_1 #Se comporta igual a df do pandas

Tipo: <class 'pyspark.pandas.frame.DataFrame'>


Unnamed: 0,A,B,C,D
2023-01-01,-0.628268,-0.949927,-1.987397,0.845043
2023-01-02,-0.661848,1.317996,0.761608,0.814377
2023-01-03,0.791435,0.520978,0.786309,0.159898
2023-01-04,0.027629,0.092285,-0.916997,-0.76118
2023-01-05,-0.21674,0.897644,0.669499,-0.583765
2023-01-06,-0.007615,-2.112817,1.059433,-0.95098


In [12]:
#Funcionalidades do pandas podem ser usadas aqui
psdf_1.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

In [13]:
psdf_1.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [14]:
psdf_1.head(2)

Unnamed: 0,A,B,C,D
2023-01-01,-0.628268,-0.949927,-1.987397,0.845043
2023-01-02,-0.661848,1.317996,0.761608,0.814377


In [15]:
psdf_1.describe()

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,-0.115901,-0.038973,0.062076,-0.079434
std,0.534103,1.279067,1.227719,0.798694
min,-0.661848,-2.112817,-1.987397,-0.95098
25%,-0.628268,-0.949927,-0.916997,-0.76118
50%,-0.21674,0.092285,0.669499,-0.583765
75%,0.027629,0.897644,0.786309,0.814377
max,0.791435,1.317996,1.059433,0.845043


In [16]:
pdf_2 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
pdf_2.loc[dates[0]:dates[1], 'E'] = 1 #Acessando um grupo de linhas por uma label
psdf_2 = ps.from_pandas(pdf_2)

psdf_2

Unnamed: 0,A,B,C,D,E
2023-01-01,-0.628268,-0.949927,-1.987397,0.845043,1.0
2023-01-02,-0.661848,1.317996,0.761608,0.814377,1.0
2023-01-03,0.791435,0.520978,0.786309,0.159898,
2023-01-04,0.027629,0.092285,-0.916997,-0.76118,


In [17]:
psdf_2.fillna(value=5) #preenchendo valores NaN com 5.0

Unnamed: 0,A,B,C,D,E
2023-01-01,-0.628268,-0.949927,-1.987397,0.845043,1.0
2023-01-02,-0.661848,1.317996,0.761608,0.814377,1.0
2023-01-03,0.791435,0.520978,0.786309,0.159898,5.0
2023-01-04,0.027629,0.092285,-0.916997,-0.76118,5.0


In [18]:
#Outra forma de criar um dataframe spark através de um dataframe pandas

#recuperando uma sessão spark já criada; caso não exista, uma nova sessão é criada
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)
sdf.show()

+--------------------+-------------------+-------------------+-------------------+
|                   A|                  B|                  C|                  D|
+--------------------+-------------------+-------------------+-------------------+
| -0.6282676838250951|-0.9499267857182643|-1.9873967113014608| 0.8450430511585947|
| -0.6618479775814116| 1.3179960553263383| 0.7616078901967952| 0.8143774929489687|
|  0.7914345538130119| 0.5209784953643609| 0.7863088946944025|0.15989828252507737|
| 0.02762930184235269|0.09228525074539488|-0.9169973853976577|-0.7611801609103062|
|-0.21674034983353255| 0.8976437226744797| 0.6694991250601765|-0.5837653376945988|
|-0.00761533055913...|-2.1128170079008295| 1.0594326584729625|-0.9509800195875802|
+--------------------+-------------------+-------------------+-------------------+



#SQL

In [20]:
#Criando um novo dataframe spark
df = spark.createDataFrame([("Scala", 25000), ("Spark", 35000), ("PHP", 21000)])
df.show()

df.createOrReplaceTempView("sample_table") #Cria uma tabela temporária a partir de um dataframe spark
df2 = spark.sql("SELECT _2 FROM sample_table") #Fazendo um select com sql
df2.show()

+-----+-----+
|   _1|   _2|
+-----+-----+
|Scala|25000|
|Spark|35000|
|  PHP|21000|
+-----+-----+

+-----+
|   _2|
+-----+
|25000|
|35000|
|21000|
+-----+



In [21]:
#Carregando dataset
ds = spark.read.csv('data.csv', header=True)
ds.createOrReplaceTempView("car_table")

#Utilizando sql nos dados
ds1 = spark.sql("SELECT Year from car_table")
ds1.show()

+----+
|Year|
+----+
|2011|
|2011|
|2011|
|2011|
|2011|
|2012|
|2012|
|2012|
|2012|
|2013|
|2013|
|2013|
|2013|
|2013|
|2013|
|2013|
|2013|
|1992|
|1992|
|1992|
+----+
only showing top 20 rows



-----------------------------------------

In [22]:
#Criando um SparkSession para criar um SparkContext

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()
print(spark.sparkContext)
print("Spark App Name : " + spark.sparkContext.appName)

<SparkContext master=local[*] appName=pandas-on-Spark>
Spark App Name : pandas-on-Spark


In [23]:
#Criando um RDD a partir de um sparkContext
#parallelize(), textFile() e wholeTextFiles() são métodos que criam um RDD e o particionam baseado nos recursos disponiveis

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
rdd=spark.sparkContext.parallelize(data)

print(rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


In [24]:
#Recuperando quantidade de partições de um RDD
print("Quantidade de partições:"+str(rdd.getNumPartitions()))

Quantidade de partições:2


In [25]:
#Reparticionando o RDD
reparRdd = rdd.repartition(4)
print("Quantidade de repartições:"+str(reparRdd.getNumPartitions()))

Quantidade de repartições:4


In [26]:
#Cortando os itens do RDD
print("Quantidade : "+str(reparRdd.count()))

Quantidade : 12


#MapReduce

In [27]:
#Aplicando uma função com o dobro do valor
rdd2 = rdd.map(lambda x: x*2)
rdd2.collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]

In [28]:
#Aplicando a função reduce aos dados de rdd2
rdd3 = rdd2.reduce(lambda a,b: a+b)
rdd3

156

#Transformações com Spark

In [31]:
#Lendo um dado .txt
rdd4 = spark.sparkContext.textFile("test.txt")
for element in rdd4.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
by

In [32]:
#Separando as strings por espaço
rdd5=rdd4.flatMap(lambda x: x.split(" "))
for element in rdd5.collect():
    print(element)

Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Alice’s
Adventures
in
Wonderland
by
Lewis
Carroll
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
This
eBook
is
for
the
use
of
anyone
anywhere
at
no
cost
and
with
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
by

In [34]:
#Adicionando uma nova coluna com valor 1 para cada palavra
rdd6=rdd5.map(lambda x: (x,1))
for element in rdd6.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('by', 1)
('Lewis', 1)
('Carroll', 1)
('This', 1)
('eBook', 1)
('is', 1)
('for', 1)
('the', 1)
('use', 1)
('of', 1)
('anyone', 1)
('anywhere', 1)
('at', 1)
('no', 1)
('cost', 1)
('and', 1)
('with', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('by', 1)
('Lewis', 1)
('Carroll', 1)
('This', 1)
('eBook', 1)
('is', 1)
('for', 1)
('the', 1)
('use', 1)
('of', 1)
('anyone', 1)
('anywhere', 1)
('at', 1)
('no', 1)
('cost', 1)
('and', 1)
('with', 1)
('This', 1)
('eBook', 1)
('is', 1)
('for', 1)
('the', 1)
('use', 1)
('of', 1)
('anyone', 1)
('anywhere', 1)
('at', 1)
('no', 1)
('cost', 1)
('and', 1)
('with', 1)
('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('by', 1)
('Lewis', 1)
('Carroll', 1)
('This', 1)
('eBook', 1)
('is', 1)
('for', 1)
('the', 1)
('use', 1)
('of', 1)
('anyone', 1)
('anywhere', 1)
('at', 1)
('no', 1)
('cost', 1)
('and'

In [35]:
#Fazendo merge das strings iguais através da função
rdd7=rdd6.reduceByKey(lambda a, b: a+b)
for element in rdd7.collect():
    print(element)

('Project', 9)
('Gutenberg’s', 9)
('Alice’s', 18)
('in', 18)
('Lewis', 18)
('Carroll', 18)
('is', 27)
('use', 27)
('of', 27)
('anyone', 27)
('anywhere', 27)
('at', 27)
('no', 27)
('Adventures', 18)
('Wonderland', 18)
('by', 18)
('This', 27)
('eBook', 27)
('for', 27)
('the', 27)
('cost', 27)
('and', 27)
('with', 27)


In [36]:
#Ordenando os dados por keys
rdd8=rdd7.map(lambda x: (x[1],x[0])).sortByKey()
for element in rdd8.collect():
  print(element)

(9, 'Project')
(9, 'Gutenberg’s')
(18, 'Alice’s')
(18, 'in')
(18, 'Lewis')
(18, 'Carroll')
(18, 'Adventures')
(18, 'Wonderland')
(18, 'by')
(27, 'is')
(27, 'use')
(27, 'of')
(27, 'anyone')
(27, 'anywhere')
(27, 'at')
(27, 'no')
(27, 'This')
(27, 'eBook')
(27, 'for')
(27, 'the')
(27, 'cost')
(27, 'and')
(27, 'with')


In [37]:
#Filtrando palavras que possuem a letra "a"
rdd9 = rdd8.filter(lambda x : 'a' in x[1])
for element in rdd9.collect():
    print(element)

(18, 'Carroll')
(18, 'Wonderland')
(27, 'anyone')
(27, 'anywhere')
(27, 'at')
(27, 'and')
