# 1. Introdução

Este notebook serve apenas para estudos e teste com o framework pyspark. Então, aqui vai constar apenas pontos soltos que fui cantando durante a aventura na API.

- Quando Spark transforma os dadots, ele não faz isso imendiatamente, a transformação, mas planeja calcular mais tarde.
- Quado há a ação `collect()` é quando realmente ocorre o cálculo.
- PySpark applications começa com inicialização de uma sessão `SparkSession` que é a o ponto de entrada do PySpark.
- Quando você roda PySpark no shell, ele automaticamente cria uma sessão spark.


## 1.2 Sobre a instalação

- A instalação foi complicada, pois não é apenas `pip install pyspark`, é necesário configurar o framework. Veja [aqui](https://spark.apache.org/docs/latest/api/python/getting_started/install.html).
- Outra opção a se pesquiser é usar o framework `findspark` que cuida de encontrar o pyspark em computador e torna-lo como o caminho como variavel ambiente.

        import findspark
        findspark.init()
        

## 1.3 Imports

In [4]:
# Criando uma sessão PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]')\
        .config('spark.driver.bindAddress','127.0.0.1')\
        .getOrCreate()

print(spark.__dir__())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/20 18:00:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/20 18:00:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/03/20 18:00:55 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


['_sc', '_jsc', '_jvm', '_jsparkSession', '_jwrapped', '_wrapped', '__module__', '__doc__', 'Builder', 'builder', '_instantiatedSession', '_activeSession', '__init__', '_repr_html_', 'newSession', 'getActiveSession', 'sparkContext', 'version', 'conf', 'catalog', 'udf', 'range', '_inferSchemaFromList', '_inferSchema', '_createFromRDD', '_createFromLocal', '_create_shell_session', 'createDataFrame', '_create_dataframe', 'sql', 'table', 'read', 'readStream', 'streams', 'stop', '__enter__', '__exit__', '_convert_from_pandas', '_get_numpy_record_dtype', '_create_from_pandas_with_arrow', '__dict__', '__weakref__', '__repr__', '__hash__', '__str__', '__getattribute__', '__setattr__', '__delattr__', '__lt__', '__le__', '__eq__', '__ne__', '__gt__', '__ge__', '__new__', '__reduce_ex__', '__reduce__', '__subclasshook__', '__init_subclass__', '__format__', '__sizeof__', '__dir__', '__class__']


Para funcionar, tive que setar hostname do spark.driver e antes tiver configurar as variáveis ambientes so SPARK. Conforme no site.

# 2. Iniciando

Um dataframe em pyspark pode ser criado via: `pyspark.sql.SparkSession.DataFrame` passando por listas tuplas, este mesmo comando podes fazer um `schema` para especificar o DataFrame. Quando isso é omitido, o PySpark cuida disso.

In [10]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

# Criando um schema explicito no pyspark
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0))

Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))

In [11]:
# Criando um pyspark dataframe a partir de um pandas dataframe
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [12]:
# Exibindo os resultados, schema.
df.show()
df.printSchema()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



# 3. Visualizando os dados

In [14]:
# Uma linha, como fosse o df.head(1)?
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



Você deixar a visualização melhor habilitando o `spark.sql.repl.eagerEval.enabled` para uma *eager evaluation* de um PySpark DataFrame nos notebooks do jupyter. Para controlar a quantidade de linhas use: `spark.sql.repl.eagerEval.maxNumRows`

In [15]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [16]:
#Visualizaçao vertical
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [17]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [18]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [19]:
# O reusmo do df, como se fosse o df.info do pandas
df.describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



'DataFrame.collect() coleta os dados distribuídos para o lado do driver como os dados locais em Python. Observe que isso pode gerar um erro de falta de memória quando o conjunto de dados é muito grande para caber no lado do driver porque ele coleta todos os dados dos executores para o lado do driver.'

In [20]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

Para evitar o errro * throwing an out-of-memory exception*

In [21]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

Pode ser convertido para pandas.dataframe, mas pode ocorer facilmente o estouro de memória.

In [22]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


# 4. Selecionando e acessando os dados

O PySpark DataFrame é avaliado preguiçosamente e simplesmente selecionar uma coluna não aciona a computação, mas retorna uma instância de Column.

In [23]:
df.a

Column<'a'>

outras operaçãoes de colunas

In [25]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

`Column` pode ser usada para operações de colunas de um dataframe. Por exemplo, `DataFrame.select()` pega uma instância `Column` que retoran outro dataframe.

In [26]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [27]:
df.withColumn('upper_c', upper(df.c)).show()


+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [29]:
# Selection um subconjunto de lnhas 
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



# 5. Agruapando Dados

PySpark suporta várias UDF's e API's para permitir executar funções nativas do python. Vamos usar o `pandas.Series` para testar.

In [5]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [7]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



a função é muito parecida com a do pandas, apenas precisa mostrar com `show()`

# 6. Obtendo dados

PySpark consege upload de dados de muitos tipos, como: csv, parquet e orc.
Há várias outras forma como JDBC, text, binaryFile, Avro, etc.

In [None]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

In [None]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

In [None]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

# 7. Trabalhando com SQL

DataFrame e SparkSQL compartilham a memsa engine assim podemos tornar um dataframe como uma tablea SQL e com isso fazer consultas nela.

In [10]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [None]:
# @pandas_udf("integer")
# def add_one(s: pd.Series) -> pd.Series:
#     return s + 1

# spark.udf.register("add_one", add_one)
# spark.sql("SELECT add_one(v1) FROM tableA").show()

In [12]:
# from pyspark.sql.functions import expr

# df.selectExpr('add_one(v1)').show()
# df.select(expr('count(*)') > 0).show()