<a href="https://colab.research.google.com/github/Pablo98767/Desafio/blob/main/Aprendendo_Spark_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Fazendo a instalação do spark
!pip install pyspark

In [None]:
#importação da biblioteca 
import pyspark

In [None]:
# Instanciando a biblioteca e iniciando uma sessão spark para iniciar a aplição.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


Criando um Dataframe
---



In [None]:
# importando bibliotecas do spark para trabalhar com criação de dataframe e algumas auxiliares
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row


In [None]:
#Em primeiro lugar, você pode criar um PySpark DataFrame a partir de uma lista de linhas
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

df

In [None]:
#Crie um DataFrame PySpark a partir de um DataFrame pandas

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

In [None]:
#Crie um PySpark DataFrame com um esquema explícito.

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

***OBS : os dataframes a cima trazem os mesmo dados, só são 3 tipos de maneiras que você pode usar para criar dataframes.*** 

In [None]:
# Visualizando os dados
df.show()
df.printSchema()

#Visualizando os Dados

In [None]:
df.show(1)

OBS : Como alternativa, você pode habilitar spark.sql.repl.eagerEval.enableda configuração para a avaliação antecipada do PySpark DataFrame em notebooks como o Jupyter. O número de linhas a serem exibidas pode ser controlado por meio spark.sql.repl.eagerEval.maxNumRowsda configuração.

In [None]:
# O comando abaixo chama o spark.sql.repl.eagerEval.enabled. Em seguida nós chamamos nosso df
# O objetivo é trazer uma tabela melhorada para nosso dataframe, facilitando nosso visual.
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

In [None]:
#As linhas também podem ser mostradas verticalmente. Isso é útil quando as linhas são muito longas para serem exibidas horizontalmente
df.show(1, vertical=True)

*** Podemos  ver o esquema do DataFrame e os nomes das colunas da seguinte maneira:***

In [None]:
# Usando o comando do pandas
df.columns

In [None]:
# usando o print do spark 
df.printSchema()

In [None]:
#Mostrar o resumo do DataFrame usando a função select
df.select("a", "b", "c").describe().show()

In [None]:
#A função abaixo coleta os dados distribuídos para o lado do driver como os dados locais em Python. 
# Observe que isso pode gerar um erro de falta de memória quando o conjunto de dados é muito grande para caber no lado do driver porque ele coleta todos os dados dos executores para o lado do driver.
df.collect()

In [None]:
#Para evitar lançar uma exceção de falta de memória, use DataFrame.take()ou DataFrame.tail().
df.take(1)

In [None]:
#O PySpark DataFrame também fornece a conversão de volta para um DataFrame do pandas para aproveitar a API do pandas. 
#Observe que toPandas também coleta todos os dados no lado do motorista que podem facilmente causar um erro de falta de memória 
#quando os dados são muito grandes para caber no lado do motorista.
df.toPandas()

#Selecionando e acessando dados

***O PySpark DataFrame é avaliado lentamente e simplesmente selecionar uma coluna não aciona o cálculo, mas retorna uma Columninstância.***

In [None]:
#selecionando a coluna a com pandas
df.a

***Na verdade, a maioria das operações em colunas retornam Columns.***

In [None]:
#usando o sistema de seleção de coluna do spark
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

***Esses Columns podem ser usados ​​para selecionar as colunas de um DataFrame. Por exemplo, DataFrame.select()pega as Columninstâncias que retornam outro DataFrame.***

In [None]:
df.select(df.c).show()

In [None]:
#Atribuir nova Columninstância.

df.withColumn('upper_c', upper(df.c)).show()
# neste caso foi criada uma coluna chamada upper_c, e depois ela recebeu os dados da coluna c usando o comando upper_c.

In [None]:
#Para selecionar um subconjunto de linhas, use DataFrame.filter().
df.filter(df.a == 1).show()

#Aplicando uma função

***O PySpark oferece suporte a vários UDFs e APIs para permitir que os usuários executem funções nativas do Python. Consulte também as últimas UDFs do Pandas e APIs de função do Pandas . Por exemplo, o exemplo abaixo permite que os usuários usem diretamente as APIs em uma série de pandas dentro da função nativa do Python.***

In [None]:
# Importando as bibliotecas para criar nossas funções
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [None]:
#criando as funções

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

***Outro exemplo é DataFrame.mapInPandasque permite que os usuários usem diretamente as APIs em um DataFrame pandas sem quaisquer restrições, como o comprimento do resultado.***

In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

#Dados de agrupamento

*** PySpark DataFrame também fornece uma maneira de lidar com dados agrupados usando a abordagem comum, estratégia de divisão-aplicação-combinação. Ele agrupa os dados por uma determinada condição, aplica uma função a cada grupo e os combina de volta ao DataFrame.***

In [None]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

In [None]:
#Agrupar e depois aplicar a avg()função aos grupos resultantes.
df.groupby('color').avg().show()

In [None]:
#Você também pode aplicar uma função nativa do Python em cada grupo usando a API do pandas.
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

In [None]:
#Co-agrupamento e aplicação de uma função.
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

#Obtendo dados dentro/fora ¶

***O CSV é direto e fácil de usar. Parquet e ORC são formatos de arquivo compactos e eficientes para leitura e gravação mais rápidas.

Existem muitas outras fontes de dados disponíveis no PySpark, como JDBC, texto, binaryFile, Avro, etc. Consulte também o guia mais recente do Spark SQL, DataFrames e conjuntos de dados na documentação do Apache Spark.
***

#Leitura CSV

In [None]:
# cria  arquivos csv do nosso dataframe 
df.write.csv('/content/foo.csv', header=True)

# aplica a leitura dos dados csv
spark.read.csv('/content/foo.csv', header=True).show()

#Leitura Parquet

In [None]:
#pega o dataframe e tranforma em arquivo parquet
df.write.parquet('bar.parquet')
#faz a leitura do arquivo parquet
spark.read.parquet('bar.parquet').show()

#Leitura de arquivos orc

In [None]:
#Trabsforma nosso dataframe em arquivo orc
df.write.orc('zoo.orc')

#fa a leitura do arquivo orc
spark.read.orc('zoo.orc').show()

#Trabalhando com SQL¶

In [None]:
#chamando meu dataframe e trandormando-o em uma tabela sql
df.createOrReplaceTempView("tableA")

#fazendo a leitura dessa tabela sql
spark.sql("SELECT count(*) from tableA").show()

In [None]:
#Além disso, os UDFs podem ser registrados e invocados no SQL imediatamente:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

In [None]:
#Essas expressões SQL podem ser misturadas diretamente e usadas como colunas PySpark.
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()