In [6]:
# configura os nós de um cluster (nesse caso estamos rodando standalone)
from pyspark.sql import SparkSession

In [7]:
# configura os nós de um cluster (nesse caso estamos rodando standalone)
spark = SparkSession.builder.appName("aula-pyspark").getOrCreate();

In [8]:
# lendo os dados de um CSV e permitindo que o Spark infira o tipo de dados. Também informamos que o arquivo contém um header
df = spark.read.csv('ABT.csv', inferSchema=True, header=True)

In [9]:
# verificando o tipo de dados da variável *df*
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
# monstrando o schema do arquivo que importamos 
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- volume: integer (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- adjclose: double (nullable = true)



In [11]:
# mostrar colunas do Data Frame
df.columns

['date', 'volume', 'open', 'high', 'low', 'close', 'adjclose']

In [12]:
# mostrar os 3 primeiros dados do data frame
df.show(3)

+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
|      date| volume|            open|             high|              low|            close|         adjclose|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
|2020-07-02|3845300|            92.5|93.05999755859375|91.93000030517578| 92.2300033569336| 92.2300033569336|
|2020-07-01|3389600|91.9800033569336| 91.9800033569336|90.43000030517578|91.63999938964844|91.63999938964844|
|2020-06-30|5220900|           88.75| 91.9000015258789|88.44000244140625|91.43000030517578|91.43000030517578|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 3 rows



In [13]:
# filter -> permite filtrar dados do data frame e mostrar apenas os 3 primeiros registros
df.filter("open<92").show(3)

+----------+-------+-----------------+-----------------+-----------------+-----------------+-----------------+
|      date| volume|             open|             high|              low|            close|         adjclose|
+----------+-------+-----------------+-----------------+-----------------+-----------------+-----------------+
|2020-07-01|3389600| 91.9800033569336| 91.9800033569336|90.43000030517578|91.63999938964844|91.63999938964844|
|2020-06-30|5220900|            88.75| 91.9000015258789|88.44000244140625|91.43000030517578|91.43000030517578|
|2020-06-29|4669300|89.58000183105469|89.69999694824219|88.08999633789062|89.01000213623047|89.01000213623047|
+----------+-------+-----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 3 rows



In [14]:
# filter -> permite filtrar dados do data frame e mostrar apenas os 3 primeiros registros
df.filter( (df["open"] < 93)  ).show(3)

+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
|      date| volume|            open|             high|              low|            close|         adjclose|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
|2020-07-02|3845300|            92.5|93.05999755859375|91.93000030517578| 92.2300033569336| 92.2300033569336|
|2020-07-01|3389600|91.9800033569336| 91.9800033569336|90.43000030517578|91.63999938964844|91.63999938964844|
|2020-06-30|5220900|           88.75| 91.9000015258789|88.44000244140625|91.43000030517578|91.43000030517578|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 3 rows



In [15]:
# filter -> permite filtrar dados do data frame -> mostrar apenas duas colunas
df.filter( (df["open"] < 93)  ).select(["open","close"]).show(3)
df.select(["open","close"]).show(3)

+----------------+-----------------+
|            open|            close|
+----------------+-----------------+
|            92.5| 92.2300033569336|
|91.9800033569336|91.63999938964844|
|           88.75|91.43000030517578|
+----------------+-----------------+
only showing top 3 rows



In [31]:
# criar uma nova coluna -> df é sempre imutável -> então devemos criar outra coluna em um outro dataframe
dfCloseOpen = df.withColumn("close/open" , df["close"]/df["open"] )


In [32]:
# verificando o tipo de dados da variável *dfCloseOpen*
type(dfCloseOpen)

pyspark.sql.dataframe.DataFrame

In [18]:
# mostrar apenas as colunas "close", "open", "close/open" -> e os 3 primeiros registros
dfCloseOpen.select(["close", "open", "close/open"]).show(3)

+-----------------+----------------+------------------+
|            close|            open|        close/open|
+-----------------+----------------+------------------+
| 92.2300033569336|            92.5| 0.997081117372255|
|91.63999938964844|91.9800033569336|0.9963035012516172|
|91.43000030517578|           88.75| 1.030197186537192|
+-----------------+----------------+------------------+
only showing top 3 rows



In [19]:
# importando as funções max e min 
from pyspark.sql.functions import max, min

In [52]:
#mostrar o processamento da função max, dividindo as colunas close/open -> apenas 3 primeiros resultados
dfCloseOpen.select(max("close/open")).show()

+------------------+
|   max(close/open)|
+------------------+
|1.1166077691829603|
+------------------+



In [51]:
#mostrar o processamento da função min, dividindo as colunas close/open -> apenas 3 primeiros resultados
dfCloseOpen.select(min("close/open")).show()

+------------------+
|   min(close/open)|
+------------------+
|0.9078590266226547|
+------------------+



In [36]:
#importando when e col para processar os dados da coluna flag
from pyspark.sql.functions import when,col

In [53]:
#novo dataframe -> com a coluna flag, com um IF-ELSE -> IF "close/open" > 1: 1 ---ELSE 0
dfCloseOpenCau = dfCloseOpen.withColumn ('flag', when( col("close/open") > 1, 1 ).otherwise(0)  )

In [41]:
#mostrando apenas 3 dados das colunas "close/open","flag"
dfCloseOpenCau.select(["close/open","flag"]).show(3)

+------------------+----+
|        close/open|flag|
+------------------+----+
| 0.997081117372255|   0|
|0.9963035012516172|   0|
| 1.030197186537192|   1|
+------------------+----+
only showing top 3 rows



In [24]:
#importando a função Year
from pyspark.sql.functions import year

In [25]:
#mostrando apenas o ano da função YEAR
dfCloseOpen.select ( year( dfCloseOpen["date"] ) ).show(3)

+----------+
|year(date)|
+----------+
|      2020|
|      2020|
|      2020|
+----------+
only showing top 3 rows



In [30]:
#atualizando dataframe para armazenar a coluna YEAR
dfCloseOpen = dfCloseOpen.withColumn ("Year", year( dfCloseOpen["date"] ) )

In [32]:
#mostrando as colunas date e Year, 3 primeiros registros
dfCloseOpen.select("date","Year").show(3)

+----------+----+
|      date|Year|
+----------+----+
|2020-07-02|2020|
|2020-07-01|2020|
|2020-06-30|2020|
+----------+----+
only showing top 3 rows



In [55]:
#importando a função Month
from pyspark.sql.functions import month

In [56]:
#mostrando apenas o ano da função MONTH
dfCloseOpen = dfCloseOpen.withColumn ("Month", month( dfCloseOpen["date"] ) )

In [37]:
#mostrando as colunas date, Month e Year, 3 primeiros registros
dfCloseOpen.select("date","Month","Year").show(3)

+----------+-----+----+
|      date|Month|Year|
+----------+-----+----+
|2020-07-02|    7|2020|
|2020-07-01|    7|2020|
|2020-06-30|    6|2020|
+----------+-----+----+
only showing top 3 rows



In [38]:
#importando função dia do mês
from pyspark.sql.functions import dayofmonth

In [39]:
#atualizando dataframe para armazenar a coluna Day
dfCloseOpen = dfCloseOpen.withColumn ("Day", dayofmonth( dfCloseOpen["date"] ) )

In [40]:
#mostrando as colunas date, Month, Year e Day, 3 primeiros registros
dfCloseOpen.select("date","Day","Month","Year").show(3)

+----------+---+-----+----+
|      date|Day|Month|Year|
+----------+---+-----+----+
|2020-07-02|  2|    7|2020|
|2020-07-01|  1|    7|2020|
|2020-06-30| 30|    6|2020|
+----------+---+-----+----+
only showing top 3 rows



In [57]:
#agrupando dados por ano e o resultado deste agrupamento deve ser o valor máximo
dfGroupByMax = dfCloseOpen.groupBy("Year").max()

AnalysisException: cannot resolve '`Year`' given input columns: [Month, adjclose, close, close/open, date, high, low, open, volume];
'Aggregate ['Year], ['Year, max(volume#17) AS max(volume)#375, max(open#18) AS max(open)#376, max(high#19) AS max(high)#377, max(low#20) AS max(low)#378, max(close#21) AS max(close)#379, max(adjclose#22) AS max(adjclose)#380, max(close/open#212) AS max(close/open)#381, max(Month#348) AS max(Month)#382]
+- Project [date#16, volume#17, open#18, high#19, low#20, close#21, adjclose#22, close/open#212, month(cast(date#16 as date)) AS Month#348]
   +- Project [date#16, volume#17, open#18, high#19, low#20, close#21, adjclose#22, (close#21 / open#18) AS close/open#212]
      +- Relation[date#16,volume#17,open#18,high#19,low#20,close#21,adjclose#22] csv


In [58]:
# ordenar os dados por ano, mostrar apenas colunas "Year", "max(close)"
from pyspark.sql.functions import col, desc
dfGroupByMax.sort( col("Year").desc() ).select(["Year", "max(close)"]).show()

NameError: name 'dfGroupByMax' is not defined

In [49]:
#mostrar apenas uma linha com todas as colunas
dfCloseOpen.show(1)

+----------+-------+----+-----------------+-----------------+----------------+----------------+-----------------+----+-----+---+
|      date| volume|open|             high|              low|           close|        adjclose|       close/open|Year|Month|Day|
+----------+-------+----+-----------------+-----------------+----------------+----------------+-----------------+----+-----+---+
|2020-07-02|3845300|92.5|93.05999755859375|91.93000030517578|92.2300033569336|92.2300033569336|0.997081117372255|2020|    7|  2|
+----------+-------+----+-----------------+-----------------+----------------+----------------+-----------------+----+-----+---+
only showing top 1 row



In [52]:
#escrevendo em Parquet na pasta /home/jovyan/parquet/
dfCloseOpen.write.mode("overwrite").parquet("/home/jovyan/parquet/ABT.parquet")

In [58]:
#Lendo o arquivo Parquet e colocando no dataframe *dfParquet*
dfParquet = spark.read.parquet("/home/jovyan/parquet/ABT.parquet")

In [54]:
#mostrar apenas duas linhas com todas as colunas (dataFrame Parquet)
dfParquet.show(2)

+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+----+-----+---+
|      date| volume|            open|             high|              low|            close|         adjclose|        close/open|Year|Month|Day|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+----+-----+---+
|2020-07-02|3845300|            92.5|93.05999755859375|91.93000030517578| 92.2300033569336| 92.2300033569336| 0.997081117372255|2020|    7|  2|
|2020-07-01|3389600|91.9800033569336| 91.9800033569336|90.43000030517578|91.63999938964844|91.63999938964844|0.9963035012516172|2020|    7|  1|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+------------------+----+-----+---+
only showing top 2 rows



In [57]:
# importando SQLContect para poder manipular o Dataframe como se fosse uma Tabela SQL
from pyspark.sql import SQLContext


In [60]:
# createOrReplaceTempView -> criar uma visão temporária com o nome myTable (visão permite acessar os dados como uma tabela SQL)
dfParquet.createOrReplaceTempView("myTable")

In [61]:
#executa uma consulta SQL no dataframe (na verdade visão criada na linha acima)
sql = spark.sql("select * from myTable where close > 92")

In [62]:
# mostra os 3 primeiros dados do resultado SQL acima
sql.show(3)

+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+-----+---+
|      date| volume|            open|             high|              low|            close|         adjclose|       close/open|Year|Month|Day|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+-----+---+
|2020-07-02|3845300|            92.5|93.05999755859375|91.93000030517578| 92.2300033569336| 92.2300033569336|0.997081117372255|2020|    7|  2|
|2020-06-10|5829100|90.8499984741211|            92.75|90.63999938964844|92.16000366210938|92.16000366210938|1.014419429939357|2020|    6| 10|
|2020-06-08|5426800|88.9800033569336|92.61000061035156|88.91000366210938|92.55999755859375|92.55999755859375|1.040233693713175|2020|    6|  8|
+----------+-------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----+-----+---+

In [42]:
# EXTRA
#remover valores nulos de todas as colunas
df.na.drop()


DataFrame[date: string, volume: int, open: double, high: double, low: double, close: double, adjclose: double]

In [46]:
# EXTRA
#remover valores nulos apenas das colunas 'open', 'close'
df.dropna(subset = ["open", "close"])

DataFrame[date: string, volume: int, open: double, high: double, low: double, close: double, adjclose: double]

In [None]:
#executa uma consulta SQL no dataframe (na verdade visão criada na linha acima)
sql = spark.sql("select * from coronaTable where `Deaths` is null").show(3)


# Aula dia 1 de junho de 2021
## Tópicos
1. UnionAll (Dataframes)


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("unionall-pyspark").getOrCreate();

In [4]:
dfAAN = spark.read.csv('AAN.csv', header=True, inferSchema=True)

In [6]:
dfAAN.show(1)

+----------+------+-----------------+-----------------+------------------+-----------------+-----------------+
|      date|volume|             open|             high|               low|            close|         adjclose|
+----------+------+-----------------+-----------------+------------------+-----------------+-----------------+
|2020-07-02|413000|46.79999923706055|47.41999816894531|44.540000915527344|44.72999954223633|44.72999954223633|
+----------+------+-----------------+-----------------+------------------+-----------------+-----------------+
only showing top 1 row



In [8]:
dfABT = spark.read.csv('ABT.csv', header=True, inferSchema=True)

In [9]:
dfABT.show(1)

+----------+-------+----+-----------------+-----------------+----------------+----------------+
|      date| volume|open|             high|              low|           close|        adjclose|
+----------+-------+----+-----------------+-----------------+----------------+----------------+
|2020-07-02|3845300|92.5|93.05999755859375|91.93000030517578|92.2300033569336|92.2300033569336|
+----------+-------+----+-----------------+-----------------+----------------+----------------+
only showing top 1 row



In [11]:
#importando DataFrame para poder fazer a união dos DataFrames
from pyspark.sql import DataFrame

In [14]:
dfUnionAll = DataFrame.unionAll(dfABT, dfAAN)

In [15]:
dfUnionAll.columns

['date', 'volume', 'open', 'high', 'low', 'close', 'adjclose']

In [21]:
#para colocar valores fixos dentro de uma coluna
from pyspark.sql.functions import lit

In [22]:
#adicionando uma nova coluna dentro da DataFrame
dfABTSymbol = dfABT.withColumn('symbol', lit('ABT') )

In [23]:
#adicionando uma nova coluna dentro da DataFrame
dfAANSymbol = dfAAN.withColumn('symbol', lit('AAN') )

In [25]:
# junção dos DataFrames com unionAll
dfUnionAll = DataFrame.unionAll(dfABTSymbol, dfAANSymbol)

In [26]:
dfUnionAll.columns

['date', 'volume', 'open', 'high', 'low', 'close', 'adjclose', 'symbol']

In [28]:
#mostrando os valores distintos da coluna 'symbol'
dfUnionAll.select('symbol').distinct().show()

+------+
|symbol|
+------+
|   AAN|
|   ABT|
+------+

