## Leitura de Base

### SQL

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

### Bases Externas

In [0]:
base = "server-estoque"
database_host = "server-estoque.database.windows.net"
database_port = "1433"
database_name = "estoque"
table = "dbo.posicao_estoque"
user = "root-estoque"
password = "Amb@!Stock"

In [0]:
url = f"jdbc:sqlserver://{database_host}:{database_port};database={database_name};user={user}@{base};password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

url

In [0]:
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

In [0]:
posicao_estoque = (spark.read
                   .format('jdbc')
                   .option('driver', driver)
                   .option('url', url)
                   .option('dbtable', table)
                   .option('user', user)
                   .option('', password)
                   .load())

posicao_estoque.display()

## Seleção Colunas

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

### Select

In [0]:
df_orders.select(['order_id', 'store_id']).display()

### Where

In [0]:
from pyspark.sql import functions as F

In [0]:
df_orders.filter(
    (F.col('order_amount') > 100) &
    (F.col('channel_id') == '1')
).display()

### Join

In [0]:
df_stores = spark.sql("SELECT store_id, store_name FROM bronze.stores")
df_stores.display()

In [0]:
df_orders_join = df_orders[['store_id', 'order_id', 'order_amount']].join(df_stores, on=['store_id'], how='left')
df_orders_join.display()

## Cálculos Básicos e Agregações

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
from pyspark.sql import functions as F

### Cálculos Básicos

In [0]:
df_orders[['order_id', 'store_id', 'order_delivery_fee', 'order_delivery_cost']] \
    .withColumn('total_delivery_cost', F.col('order_delivery_fee') + F.col('order_delivery_cost')) \
.display()

### Agregações

In [0]:
df_orders \
    .groupBy(['store_id']) \
    .agg({'order_amount':'sum'}) \
    .withColumnRenamed('sum(order_amount)', 'total') \
.display()

## Dados Duplicados

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
df_orders = df_orders.coalesce(1).dropDuplicates()

## Casting

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
df_orders[['order_amount', 'order_moment_created']] \
    .withColumn('order_amount', F.col('order_amount').cast(DoubleType())) \
    .withColumn('order_moment_created', F.to_date(F.regexp_replace(F.substring(F.col('order_moment_created'), 1, 9), ' ', ''), 'M/d/yyyy')) \
.display()

## Detecção de Anomalias

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
df_orders = df_orders.withColumn('order_amount', F.col('order_amount').cast(DoubleType()))

In [0]:
df_orders.describe().display()

In [0]:
df_orders.filter(
    (F.col('order_amount') < 10000)
).display()

## Tratamento de dados condicional

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
df_orders[['channel_id']].distinct().display()

In [0]:
from pyspark.sql import functions as F

In [0]:
df_orders_filter = df_orders[['order_id', 'store_id', 'order_amount', 'channel_id']].filter(
    (F.col('channel_id').isin(['1', '10', '11']))
)

In [0]:
df_orders_filter \
    .withColumn('channel_name', F.when(F.col('channel_id') == '1', 'APP') \
                .when(F.col('channel_id') == '10', 'SITE') \
                .otherwise('MARKET PLACE')
               ) \
.display()

## Técnicas de Preenchimento para Dados Ausentes

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
df_orders_filter = df_orders[['order_id', 'order_delivery_cost']].withColumn('order_delivery_cost', F.col('order_delivery_cost').cast(DoubleType()))
df_orders_filter.display()

In [0]:
mean_order_delivery_cost = df_orders_filter.groupBy().agg({'order_delivery_cost':'mean'}).collect()[0]['avg(order_delivery_cost)']
mean_order_delivery_cost = round(mean_order_delivery_cost, 2)
mean_order_delivery_cost

In [0]:
df_orders_filter \
    .withColumn('order_delivery_cost', F.when(F.col('order_delivery_cost').isNull(), F.lit(mean_order_delivery_cost)) \
               .otherwise(F.col('order_delivery_cost'))\
               )\
.display()

## Criando Tabela Tratada

In [0]:
df_orders = spark.sql("SELECT * FROM bronze.orders")
df_orders.display()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
df_orders_estrutura = df_orders[['order_id', 'store_id', 'order_amount', 'order_moment_created']] \
    .withColumn('order_id', F.col('order_id').cast(IntegerType())) \
    .withColumn('store_id', F.col('store_id').cast(IntegerType())) \
    .withColumn('order_amount', F.col('order_amount').cast(DoubleType())) \
    .withColumn('order_moment_created', F.to_date(F.regexp_replace(F.substring(F.col('order_moment_created'), 1, 9), ' ', ''), 'M/d/yyyy'))

In [0]:
df_orders_filter = df_orders_estrutura.filter(
    (F.col('order_amount') < 10000) &
    (~F.col('order_moment_created').isNull())
)

In [0]:
df_orders_filter = df_orders_filter.coalesce(1).dropDuplicates()
df_orders_filter.display()

In [0]:
df_stores = spark.sql("SELECT store_id, store_name FROM bronze.stores")
df_stores.display()

In [0]:
df_stores_estrura = df_stores.withColumn('store_id', F.col('store_id').cast(IntegerType()))

In [0]:
df_stores_estrura = df_stores_estrura.coalesce(1).dropDuplicates()

In [0]:
df_orders_stores = df_orders_filter.join(df_stores_estrura, on=['store_id'], how='inner')

In [0]:
df_orders_stores_total_amount = df_orders_stores \
    .groupBy(['order_moment_created', 'store_id', 'store_name']) \
    .agg({'order_amount':'sum'}) \
    .withColumnRenamed('sum(order_amount)', 'total_amount')

In [0]:
df_orders_stores_total_amount.write \
    .mode('overwrite') \
    .saveAsTable('prata.orders_store_total_amount')

In [0]:
%sql
SELECT * FROM prata.orders_store_total_amount