# Struct Types

In [None]:
'''
TIPOS BÁSICOS DE DADOS DO SPARK
ByteType() - int
ShortType() - int
IntegerType() - int
LongType() - int
FloatType() - float
DoubleType() - float
StringType() - str
BooleanType - bool
DecimalType() - float
NULL
'''

'\nTIPOS BÁSICOS DE DADOS DO SPARK\nByteType() - int\nShortType() - int\nIntegerType() - int\nLongType() - int\nFloatType() - float\nDoubleType() - float\nStringType() - str\nBooleanType - bool\nDecimalType() - float\nNULL\n'

In [None]:
'''
TIPOS COMPLEXOS DE DADOS
TimestampType() - datetime
DateType() - datetime
ArrayType() - lista, tupla, array
MapType() - dict
StructType() - lista, tupla
StructField() - Tipo do campo

'''

'\nTIPOS COMPLEXOS DE DADOS\nTimestampType() - datetime\nDateType() - datetime\nArrayType() - lista, tupla, array\nMapType() - dict\nStructType() - lista, tupla\nStructField() - Tipo do campo\n\n'

# INSTALAÇÕES

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# IMPORTAÇÕES

In [None]:
from pyspark.sql import SparkSession

In [None]:
# importando tipos de variáveis do spark
from pyspark.sql.types import *

In [None]:
# importando functions do spark
import pyspark.sql.functions as F

In [None]:
# importando window functions do spark
from pyspark.sql.window import Window

# CONFIGURANDO SPARKSESSION

In [None]:
spark = (
    SparkSession.builder
                .master('local')
                .appName('structtype')
                .config('spark.ui.port', '4050')
                .getOrCreate()
)

In [None]:
spark

# CRIANDO SCHEMA

In [None]:
esquema = (
    StructType([
        StructField('id', StringType()),
        StructField('nome_musica', StringType()),
        StructField('popularidade', IntegerType()),
        StructField('duracao_musica', IntegerType()),
        StructField('acustica', StringType()),
        StructField('danceabilidade', StringType()),
        StructField('energia', FloatType()),
        StructField('instrumentalidade', StringType()),
        StructField('nota_musical', IntegerType()),
        StructField('ao_vivo', FloatType()),
        StructField('volume', FloatType()),
        StructField('modo_de_audio', IntegerType()),
        StructField('discurso', FloatType()),
        StructField('ritmo', FloatType()),
        StructField('assinatura de tempo', FloatType()),
        StructField('valencia_de_audio', FloatType())
    ])
)

# CRIANDO DF

In [None]:
df = (
    spark.read.format('csv')
              .option('header', 'true')
              .option('inferschema', 'false')
              .option('delimiter', ',')
              .load('/content/drive/MyDrive/Datasets/spotify.csv')
)

In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- nome_musica: string (nullable = true)
 |-- popularidade: integer (nullable = true)
 |-- duracao_musica: integer (nullable = true)
 |-- acustica: string (nullable = true)
 |-- danceabilidade: string (nullable = true)
 |-- energia: float (nullable = true)
 |-- instrumentalidade: string (nullable = true)
 |-- nota_musical: integer (nullable = true)
 |-- ao_vivo: float (nullable = true)
 |-- volume: float (nullable = true)
 |-- modo_de_audio: integer (nullable = true)
 |-- discurso: float (nullable = true)
 |-- ritmo: float (nullable = true)
 |-- assinatura de tempo: float (nullable = true)
 |-- valencia_de_audio: float (nullable = true)



In [None]:
df.show(truncate=False)

+---+--------------------------+------------+--------------+----------------------+-----------------------+-------+---------------------+------------+-------+------+-------------+--------+-------+-------------------+-----------------+
|id |nome_musica               |popularidade|duracao_musica|acustica              |danceabilidade         |energia|instrumentalidade    |nota_musical|ao_vivo|volume|modo_de_audio|discurso|ritmo  |assinatura de tempo|valencia_de_audio|
+---+--------------------------+------------+--------------+----------------------+-----------------------+-------+---------------------+------------+-------+------+-------------+--------+-------+-------------------+-----------------+
|0  |Boulevard of Broken Dreams|73          |262333        |0.005520000000000001kg|0.496mol/L             |0.682  |2.94e-05             |8           |0.0589 |-4.095|1            |0.0294  |167.06 |4.0                |0.474            |
|1  |In The End                |66          |216933        |

# WINDOW FUNCTIONS

In [None]:
schema = ['nome', 'departamento', 'estado', 'salario']

dados = [('Anderson', 'vendas', 'SP', 9000),
         ('Kennedy', 'vendas', 'RJ', 4500),
         ('Luciana', 'vendas', 'SP', 4500),
         ('Marta', 'vendas', 'SP', 4500),
         ('João', 'vendas', 'SP', 4500),
         ('Diego', 'vendas', 'SP', 4500),
         ('Marilia', 'vendas', 'SP', 1200),
         ('Gustavo', 'financeiro', 'AM', 8000),
         ('Pedro', 'financeiro', 'AM', 2750),
         ('Juliana', 'financeiro', 'MG', 3000),
         ('Leticia', 'financeiro', 'RJ', 7500),
         ('Oswaldo', 'marketing', 'RJ', 2450),
         ('Denis', 'marketing', 'MG', 1300)         
        ]

In [None]:
df2 = spark.createDataFrame(dados, schema)

In [None]:
df2.show()

+--------+------------+------+-------+
|    nome|departamento|estado|salario|
+--------+------------+------+-------+
|Anderson|      vendas|    SP|   9000|
| Kennedy|      vendas|    RJ|   4500|
| Luciana|      vendas|    SP|   4500|
|   Marta|      vendas|    SP|   4500|
|    João|      vendas|    SP|   4500|
|   Diego|      vendas|    SP|   4500|
| Marilia|      vendas|    SP|   1200|
| Gustavo|  financeiro|    AM|   8000|
|   Pedro|  financeiro|    AM|   2750|
| Juliana|  financeiro|    MG|   3000|
| Leticia|  financeiro|    RJ|   7500|
| Oswaldo|   marketing|    RJ|   2450|
|   Denis|   marketing|    MG|   1300|
+--------+------------+------+-------+



In [None]:
# cria uma partição do df para utilizar algum tipo de classificação
w0 = Window.partitionBy(F.col('departamento')).orderBy(F.col('salario').desc())

In [None]:
# 1) row_number - retorna o número da linha de acordo com a partição criada
df3 = df2.withColumn('numero_linha', F.row_number().over(w0))

In [None]:
df3.show()

+--------+------------+------+-------+------------+
|    nome|departamento|estado|salario|numero_linha|
+--------+------------+------+-------+------------+
| Gustavo|  financeiro|    AM|   8000|           1|
| Leticia|  financeiro|    RJ|   7500|           2|
| Juliana|  financeiro|    MG|   3000|           3|
|   Pedro|  financeiro|    AM|   2750|           4|
| Oswaldo|   marketing|    RJ|   2450|           1|
|   Denis|   marketing|    MG|   1300|           2|
|Anderson|      vendas|    SP|   9000|           1|
| Kennedy|      vendas|    RJ|   4500|           2|
| Luciana|      vendas|    SP|   4500|           3|
|   Marta|      vendas|    SP|   4500|           4|
|    João|      vendas|    SP|   4500|           5|
|   Diego|      vendas|    SP|   4500|           6|
| Marilia|      vendas|    SP|   1200|           7|
+--------+------------+------+-------+------------+



In [None]:
# rank
df3 = df3.withColumn('rank', F.rank().over(w0))

In [None]:
df3.show()

+--------+------------+------+-------+------------+----+
|    nome|departamento|estado|salario|numero_linha|rank|
+--------+------------+------+-------+------------+----+
| Gustavo|  financeiro|    AM|   8000|           1|   1|
| Leticia|  financeiro|    RJ|   7500|           2|   2|
| Juliana|  financeiro|    MG|   3000|           3|   3|
|   Pedro|  financeiro|    AM|   2750|           4|   4|
| Oswaldo|   marketing|    RJ|   2450|           1|   1|
|   Denis|   marketing|    MG|   1300|           2|   2|
|Anderson|      vendas|    SP|   9000|           1|   1|
| Kennedy|      vendas|    RJ|   4500|           2|   2|
| Luciana|      vendas|    SP|   4500|           3|   2|
|   Marta|      vendas|    SP|   4500|           4|   2|
|    João|      vendas|    SP|   4500|           5|   2|
|   Diego|      vendas|    SP|   4500|           6|   2|
| Marilia|      vendas|    SP|   1200|           7|   7|
+--------+------------+------+-------+------------+----+



In [None]:
# dense_rank
df3 = df3.withColumn('dense_rank', F.dense_rank().over(w0))

In [None]:
df3.show()

+--------+------------+------+-------+------------+----+----------+
|    nome|departamento|estado|salario|numero_linha|rank|dense_rank|
+--------+------------+------+-------+------------+----+----------+
| Gustavo|  financeiro|    AM|   8000|           1|   1|         1|
| Leticia|  financeiro|    RJ|   7500|           2|   2|         2|
| Juliana|  financeiro|    MG|   3000|           3|   3|         3|
|   Pedro|  financeiro|    AM|   2750|           4|   4|         4|
| Oswaldo|   marketing|    RJ|   2450|           1|   1|         1|
|   Denis|   marketing|    MG|   1300|           2|   2|         2|
|Anderson|      vendas|    SP|   9000|           1|   1|         1|
| Kennedy|      vendas|    RJ|   4500|           2|   2|         2|
| Luciana|      vendas|    SP|   4500|           3|   2|         2|
|   Marta|      vendas|    SP|   4500|           4|   2|         2|
|    João|      vendas|    SP|   4500|           5|   2|         2|
|   Diego|      vendas|    SP|   4500|          

In [None]:
# lag
df3 = df3.withColumn('lag', F.lag("salario", 2).over(w0))

In [None]:
df3.show()

+--------+------------+------+-------+------------+----+----------+----+
|    nome|departamento|estado|salario|numero_linha|rank|dense_rank| lag|
+--------+------------+------+-------+------------+----+----------+----+
| Gustavo|  financeiro|    AM|   8000|           1|   1|         1|null|
| Leticia|  financeiro|    RJ|   7500|           2|   2|         2|null|
| Juliana|  financeiro|    MG|   3000|           3|   3|         3|8000|
|   Pedro|  financeiro|    AM|   2750|           4|   4|         4|7500|
| Oswaldo|   marketing|    RJ|   2450|           1|   1|         1|null|
|   Denis|   marketing|    MG|   1300|           2|   2|         2|null|
|Anderson|      vendas|    SP|   9000|           1|   1|         1|null|
| Kennedy|      vendas|    RJ|   4500|           2|   2|         2|null|
| Luciana|      vendas|    SP|   4500|           3|   2|         2|9000|
|   Marta|      vendas|    SP|   4500|           4|   2|         2|4500|
|    João|      vendas|    SP|   4500|           5|