<a href="https://colab.research.google.com/github/Epilef86/DNC/blob/main/Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

A vantagem de se utilizar SQL no spark é a paralelização dos dados, o spark se tiver um banco com 1 bilhão de linhas ele vai paralelizar esses dados dentro do cluster e dentro da memória de uma meneira muito mais rápida.

In [14]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [15]:
!wget -qnc https://bin.equinox.io/c/4VmOzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip -n -q ngrok-stable-linux-amd64.zip

unzip:  cannot find or open ngrok-stable-linux-amd64.zip, ngrok-stable-linux-amd64.zip.zip or ngrok-stable-linux-amd64.zip.ZIP.


In [16]:
from pyspark.sql import SparkSession

spark =(
    SparkSession.builder
    .config('spark.ui.port', '4050')
    .appName('SparkSQL')
    .getOrCreate()
)

In [17]:
!./ngrok authtoken 2KBeQEmmd1YNlQ86GGKf3Kf3KFOkb3_6sQH7JEnVEhDxwn9A7WnT
get_ipython().system_raw('./ngrok http 4050 &')
!sleep 10
!curl -s http://localhost:4040/api/tunnels | grep -Po 'public_url':'(?=https)\K[^']*'

/bin/bash: ./ngrok: No such file or directory
/bin/bash: -c: line 0: unexpected EOF while looking for matching `''
/bin/bash: -c: line 1: syntax error: unexpected end of file


In [18]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampNTZType 

In [19]:
schema_remetente_destinatario = StructType([
    StructField('nome', StringType()),
    StructField('banco', StringType()),
    StructField('tipo', StringType()),
])

schema_base_pix = StructType([
    StructField('id_transacao', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('remetente ', schema_remetente_destinatario),
    StructField('destinatario', schema_remetente_destinatario),
    StructField('transaction_date', TimestampType()),
    StructField('chave_pix', StringType()),
    StructField('fraude', IntegerType())
])

caminho_json = './pix_transactions.json'

df = spark.read.json(
    caminho_json,
    schema = schema_base_pix,
    timestampFormat = "yyyy-MM-dd"
)


NameError: ignored

Como transformar isso em uma tabela pra que eu execute através do SQL, crio uma view temporária, ou seja uma tabela com nome: transacoes_pix, ao invés de um dataframe

In [None]:
spark.read.json(
    caminho_json,
    schema = schema_base_pix,
    timestampFormat = "yyyy-MM-dd"
).createOrReplaceTempView('transacoes_pix')

Pra executar uma SQL dentro de uma sessão spark , basta chamar a sessão spark

In [None]:
spark.sql('select * from transacoes_pix limit 10').show()

Retornou dataframe como gostaria

Como saber se é mais rápido pelo sql ou executar atraves do dataframe. Vamos fazer um teste. Primeiro fazer uma grupamento por chave pix, contar o numero de transacoes por chave pix

In [None]:
group_sql = spark.sql('select chave_pix, count(*) from transacoes_pix group by chave_pix)

Segunda maneira de executar em formto de api 

In [None]:
group_df = df.groupBy('chave_pix').count(  )

Quando der um show nos dois vai ser executado a ação, mas antes disso podemos verificar qual vai ser o plano do spark

As ações que o spark vai tomar pra execução final são exatamente iguais, tanto sql query como api spark

In [20]:
group_sql.show(
    

group_df.show()

SyntaxError: ignored

A unica diferença é que foi criada uma nova view

Com o spark.sql eu posso executar qualquer tipo de querry

In [None]:
spark.sql(  
      '''
        select
          chave_pix,
          sum(valor)
        from transacoes_pix
        group by 1
        order by 2
    '''
).show()

O group by 1 ele vai pegar a primeira selecao que eu fiz e vai agrupar. Assim como pro 2. 
Essa função acima vai somar os valores que tem dentro da minha chave_pix, posso pegar a média dos valors

In [None]:
spark.sql(  
      '''
        select
          chave_pix,
          sum(valor)
        from transacoes_pix
        group by 1
        order by 2
    '''
).show()

Posso arredondar pra duas casas decimais

In [None]:
spark.sql(  
      '''
        select
          chave_pix,
          round(sum(valor),2) 
        from transacoes_pix
        group by 1
        order by 2
    '''
).show()

Identificar a quantidade de transacoes que foram feitas para cada chave_pix

In [None]:
spark.sql(  
      '''
        select
          chave_pix,
          count(*)
        from transacoes_pix
        group by 1
        order by 2 desc
    '''
).show()

Selecionar apenas pros valores que são maiores que 10 mil

In [None]:
spark.sql(  
      '''
        select
          chave_pix,
          count(*) as count_maior_10
        from transacoes_pix
        where valor > 10000
        group by 1
        order by 2 desc
    '''
).show()

SparkSQL - Window Function

Digamos que eu queira saber quais foram as duas transacaoes mais altas para cada banco. Como podemos fazer isso utilizando row_number que faz parte do SparkSQL - Window Function. O row_number cria uma jnela de dados para cada banco e ordeno do valor maior para o menor. O valor maior vai ter index 1 e assim sucessivamente. 

Temos que selecionar o banco do destinatario e tambem saber o valor da transacao. 

In [None]:
spark.sql(
    '''
      select
        destinatario.banco,
        valor,
        row_number() over (partition by destinatario.banco order by valor desc as row_number
        from transacoes_pix
        limit 10
    '''
).show()

Pegamos por exemplo os 10 maiores transacoes do BTG

Agora como que eu posso pegar apenas as duas linhas maiores. Teria que fazer um filtro nessa própria consulta. Pra isso uso cte, que são trabelas temporarias criadas a partir de um select para selecionar desse select, ou seja, crio um sub select.

In [None]:
spark.sql(
    '''
      with cte_base_window(
      select
        banco,
        valor,
        row_number() over (partition by destinatario.banco order by valor desc as row_number
      from transacoes_pix
      )  select
          banco,
          valor
        from cte_base_window
        where row_number in (1,2)
        ''' 
).show()

Vamos tirar o sub select e colocar pra um dataframe

In [None]:
df_row_number = spark.sql('''
  select
    destinatario.banco,
    valor,
    row_number() over (partition by destinatario.banco order by valor desc as row_number
  from transacoes_pix
    '''
)

In [None]:
df_row_number.show()

In [None]:
from pyspark.sql.functions import col

df_row_number.filter(col('row_number').isin([1,2])).show()