# Estudos relacionado ao DataFrame utilizando o Apache Spark.

### Instalação dos componentes relacionado ao Spark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
findspark.init('spark-2.4.4-bin-hadoop2.7')

## Importação das Bibliotecas

In [3]:
import pyspark
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import corr, dayofweek, month

## Introdução ao DataFrame

### Criando uma sessão com SparkSession

In [4]:
spark1 = SparkSession.builder.appName('Basics').getOrCreate()

### Leitura de um arquivo JSON 

In [5]:
PATH = '/content/drive/MyDrive/Colab Notebooks/Estudos/Spark/base/people.json'
df = spark1.read.json(PATH)
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Esquema dos dados

In [6]:
# Esquema análogo ao SQL, onde pode incluir valores nulos
 
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Listagem das colunas do arquivo JSON

In [7]:
df.columns

['age', 'name']

### Descrição Matemática do conteúdo

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [9]:
df.summary().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    25%|                19|   null|
|    50%|                19|   null|
|    75%|                30|   null|
|    max|                30|Michael|
+-------+------------------+-------+



### Métodos Take e Collect

In [10]:
df.take(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [11]:
df.collect()

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

## Operações Básicas

### Criando uma nova sessão para a leitura de um arquivo CSV

In [12]:
spark2 = SparkSession.builder.appName('Ops').getOrCreate()

In [13]:
PATH = '/content/drive/MyDrive/Colab Notebooks/Estudos/Spark/base/appl_stock.csv'
df = spark2.read.csv(PATH, inferSchema = True, header = True)
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [14]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

### Listando as Colunas

In [15]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

### Tipos de Colunas

In [16]:
type(df['High'])

pyspark.sql.column.Column

In [17]:
type(df.head(2)[0])

pyspark.sql.types.Row

### Selecionando as Colunas

In [18]:
df.select('High')

DataFrame[High: double]

In [19]:
df.select('High').show()

+------------------+
|              High|
+------------------+
|        214.499996|
|        215.589994|
|            215.23|
|        212.000006|
|        212.000006|
|        213.000002|
|209.76999500000002|
|210.92999500000002|
|210.45999700000002|
|211.59999700000003|
|215.18999900000003|
|        215.549994|
|213.30999599999998|
|        207.499996|
|        204.699999|
|        213.710005|
|            210.58|
|        205.500004|
|        202.199995|
|             196.0|
+------------------+
only showing top 20 rows



In [20]:
df.select(['High', 'Close']).show()

+------------------+------------------+
|              High|             Close|
+------------------+------------------+
|        214.499996|        214.009998|
|        215.589994|        214.379993|
|            215.23|        210.969995|
|        212.000006|            210.58|
|        212.000006|211.98000499999998|
|        213.000002|210.11000299999998|
|209.76999500000002|        207.720001|
|210.92999500000002|        210.650002|
|210.45999700000002|            209.43|
|211.59999700000003|            205.93|
|215.18999900000003|        215.039995|
|        215.549994|            211.73|
|213.30999599999998|        208.069996|
|        207.499996|            197.75|
|        204.699999|        203.070002|
|        213.710005|        205.940001|
|            210.58|        207.880005|
|        205.500004|        199.289995|
|        202.199995|        192.060003|
|             196.0|        194.729998|
+------------------+------------------+
only showing top 20 rows



### Método asDict e head

In [21]:
df.head(2)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)]

In [22]:
dict1 = df.head(2)[0].asDict()
dict1

{'Adj Close': 27.727039,
 'Close': 214.009998,
 'Date': datetime.datetime(2010, 1, 4, 0, 0),
 'High': 214.499996,
 'Low': 212.38000099999996,
 'Open': 213.429998,
 'Volume': 123432400}

### Método Count

In [23]:
df.count()

1762

### Nova importação de Base


In [24]:
PATH = '/content/drive/MyDrive/Colab Notebooks/Estudos/Spark/base/sales_info.csv'
df2 = spark2.read.csv(PATH, inferSchema = True, header = True)
df2.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



### Selecionando apenas as 'Company' distintas e contando

In [25]:
df2.select('Company').distinct().show()

+-------+
|Company|
+-------+
|   APPL|
|   GOOG|
|     FB|
|   MSFT|
+-------+



In [26]:
df2.select('Company').distinct().count()

4

### Utilizando o método sample para capturar amostragens aleatórios 

In [27]:
df.sample(withReplacement = False, fraction = 0.005, seed = 101).show()

+-------------------+------------------+------------------+------------------+----------+---------+------------------+
|               Date|              Open|              High|               Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+----------+---------+------------------+
|2010-03-19 00:00:00|224.79000499999998|        225.240002|221.23000299999998|    222.25|139861400|          28.79461|
|2010-04-09 00:00:00|        241.430012|        241.889996|240.46000299999997|241.789993| 83545700|31.326203000000003|
|2010-06-07 00:00:00|        258.289997|         259.14999|        250.550007|250.940002|221735500|         32.511674|
|2011-07-19 00:00:00|             378.0|378.65000200000003|            373.32|376.849987|204786400|48.824515000000005|
|2012-07-24 00:00:00|         607.37999|        609.680016|        598.509987|600.919975|141283100|         77.854922|
|2013-09-04 00:00:00|        499.560005|        

### Adicionando uma nova coluna ao DataFrame 
Fazendo a adição da diferença entre as ações, calculando o valor da açõa em alta pela a ação em baixa

In [28]:
df.withColumn('Range', df['High'] - df['Low']).limit(5).select(['High', 'Low', 'Range']).show()

+----------+------------------+------------------+
|      High|               Low|             Range|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



### Gerando uma copia do esquema acima

In [29]:
df3 = df.withColumn('Range', df['High'] - df['Low']).limit(5).select(['High', 'Low', 'Range'])
df3.show()

+----------+------------------+------------------+
|      High|               Low|             Range|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



### Renomeando a coluna Range

In [30]:
df3 = df3.withColumnRenamed('Range', 'Min-to-Max')
df3.show()

+----------+------------------+------------------+
|      High|               Low|        Min-to-Max|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



### Testando uma série de filtros com os métodos filter e where

In [31]:
df.filter('Close < 500').show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [32]:
df.filter('Close < 500 AND Open > 500').select(['Date', 'Open', 'Close']).show(5)

+-------------------+----------+------------------+
|               Date|      Open|             Close|
+-------------------+----------+------------------+
|2012-02-15 00:00:00|514.259995|        497.669975|
|2013-09-05 00:00:00|500.250008|495.26997400000005|
|2013-09-10 00:00:00|506.199997|494.63999900000005|
|2014-01-30 00:00:00|502.539993|        499.779984|
+-------------------+----------+------------------+



In [33]:
df.where('Open < 500 AND (Open - Close) > 10').select(['Date', 'Open', 'Close']).show(5)

+-------------------+------------------+------------------+
|               Date|              Open|             Close|
+-------------------+------------------+------------------+
|2010-07-21 00:00:00|        265.089993|254.23999799999999|
|2011-03-16 00:00:00|        342.000004|         330.01001|
|2011-08-04 00:00:00|        389.410007|        377.369999|
|2011-09-29 00:00:00|        401.919987|        390.570007|
|2011-11-10 00:00:00|397.02999500000004|385.22000499999996|
+-------------------+------------------+------------------+
only showing top 5 rows



In [34]:
df.filter(df['Close'] < 500).show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [35]:
df.filter((df['Close'] < 500) & (df['Open'] > 500)).select(['Date', 'Open', 'Close']).show(5)

+-------------------+----------+------------------+
|               Date|      Open|             Close|
+-------------------+----------+------------------+
|2012-02-15 00:00:00|514.259995|        497.669975|
|2013-09-05 00:00:00|500.250008|495.26997400000005|
|2013-09-10 00:00:00|506.199997|494.63999900000005|
|2014-01-30 00:00:00|502.539993|        499.779984|
+-------------------+----------+------------------+



In [36]:
df.filter((df['Open'] < 200) & ~(df['Close'] > 200)).show(5)

+-------------------+------------------+----------+------------------+----------+---------+------------------+
|               Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:00|        195.169994|200.200003|        194.420004|199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:00|        196.730003|198.370001|        191.570005|192.050003|189413000|         24.881912|
|2010-02-05 00:00:00|192.63000300000002|     196.0|        190.850002|195.460001|212576700|25.323710000000002|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
o

### Criando um DataFrame Menor para análises mais pontuais

In [37]:
df4 = df.select('High', 'Low').limit(10)
df4.show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        214.499996|212.38000099999996|
|        215.589994|        213.249994|
|            215.23|        210.750004|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|        213.000002|        208.450005|
|209.76999500000002|        206.419998|
|210.92999500000002|        204.099998|
|210.45999700000002|        209.020004|
|211.59999700000003|        205.869999|
+------------------+------------------+



### Ordenando pela coluna 'High'

In [38]:
# Ordenação Crescente

df4.sort('High').show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|209.76999500000002|        206.419998|
|210.45999700000002|        209.020004|
|210.92999500000002|        204.099998|
|211.59999700000003|        205.869999|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|        213.000002|        208.450005|
|        214.499996|212.38000099999996|
|            215.23|        210.750004|
|        215.589994|        213.249994|
+------------------+------------------+



In [39]:
# Ordenação Decrescente

df4.sort('High', ascending = False).show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        215.589994|        213.249994|
|            215.23|        210.750004|
|        214.499996|212.38000099999996|
|        213.000002|        208.450005|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|211.59999700000003|        205.869999|
|210.92999500000002|        204.099998|
|210.45999700000002|        209.020004|
|209.76999500000002|        206.419998|
+------------------+------------------+



In [40]:
# Usando a função desc importada do modulo pyspark.sql.functions

df4.sort(desc('High')).show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        215.589994|        213.249994|
|            215.23|        210.750004|
|        214.499996|212.38000099999996|
|        213.000002|        208.450005|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|211.59999700000003|        205.869999|
|210.92999500000002|        204.099998|
|210.45999700000002|        209.020004|
|209.76999500000002|        206.419998|
+------------------+------------------+



### Utilizando o método OrderBy

In [41]:
df2.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [42]:
df2.orderBy(['Company', 'Sales'], ascending = [1, 1]).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   APPL|  Linda|130.0|
|   APPL|   John|250.0|
|   APPL|  Chris|350.0|
|   APPL|   Mike|750.0|
|     FB|  Sarah|350.0|
|     FB|   Carl|870.0|
|   GOOG|Charlie|120.0|
|   GOOG|    Sam|200.0|
|   GOOG|  Frank|340.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|   MSFT|   Tina|600.0|
+-------+-------+-----+



### Calculo do coeficiente de correlação de Pearson
Fazendo a correlação para a coluna 'Range'

In [43]:
df.withColumn('Range', df['High'] - df['Low']).agg(
    corr('Volume', 'Range')\
    .alias('Pearson-Corr')
).collect()

[Row(Pearson-Corr=0.6980419651782168)]

Portanto, temos uma correlação positiva bastante forte entre os dados de Volume e Faixa.

### Produzindo novas colunas Day-of-Week

In [44]:
df.show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [45]:
df5 = df.withColumn('Day-of-Week', dayofweek('Date'))\
        .withColumn('Month', month('Date'))\
        .select(['Day-of-week', 'Month', 'Open', 'Close'])

df5.show(5)

+-----------+-----+----------+------------------+
|Day-of-week|Month|      Open|             Close|
+-----------+-----+----------+------------------+
|          2|    1|213.429998|        214.009998|
|          3|    1|214.599998|        214.379993|
|          4|    1|214.379993|        210.969995|
|          5|    1|    211.75|            210.58|
|          6|    1|210.299994|211.98000499999998|
+-----------+-----+----------+------------------+
only showing top 5 rows



### Utilizando um conjunto de funções para determinar uma Média

In [46]:
df5 = df.withColumn('Day-of-week', dayofweek('Date'))\
        .withColumn('Month', month('Date'))\
        .select(['Day-of-week', 'Month', 'Volume'])\
        .groupby(['Day-of-week', 'Month'])\
        .mean()\
        .select(['Day-of-week', 'Month', 'avg(Volume)'])\
        .orderBy('Day-of-week', 'Month')

df5.show(50)

+-----------+-----+--------------------+
|Day-of-week|Month|         avg(Volume)|
+-----------+-----+--------------------+
|          2|    1|1.0967633636363636E8|
|          2|    2| 8.957742272727273E7|
|          2|    3| 9.421765161290322E7|
|          2|    4|1.0364189333333333E8|
|          2|    5| 9.646600416666667E7|
|          2|    6| 8.898051666666667E7|
|          2|    7| 8.163605555555555E7|
|          2|    8|      8.5287746875E7|
|          2|    9|  9.82580956521739E7|
|          2|   10| 8.972772333333333E7|
|          2|   11|          7.374762E7|
|          2|   12| 7.757132068965517E7|
|          3|    1|1.3081937586206897E8|
|          3|    2|1.0092741428571428E8|
|          3|    3|      1.0023139375E8|
|          3|    4|1.0715501333333333E8|
|          3|    5|1.0042301290322581E8|
|          3|    6| 8.865190333333333E7|
|          3|    7| 9.382107096774194E7|
|          3|    8| 9.748764193548387E7|
|          3|    9|          9.932478E7|
|          3|   