In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "500mb") \
    .appName("Ex2") \
    .getOrCreate()

22/07/04 16:03:53 WARN Utils: Your hostname, computador resolves to a loopback address: 127.0.1.1; using 10.0.0.135 instead (on interface wlp2s0)
22/07/04 16:03:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/07/04 16:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Encontrar o número de pedidos, o número de produtos e o número de vendedores.

In [3]:
tabela_produtos = spark.read.parquet('./data/products_parquet/')
tabela_vendedores = spark.read.parquet('./data/sellers_parquet/')
tabela_vendas = spark.read.parquet('./data/sales_parquet/')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
num_produtos = tabela_produtos.count()
num_vendas = tabela_vendas.count()
num_vendedores = tabela_vendedores.count()

                                                                                

In [5]:
print(f'O número de produtos é: {num_produtos}')
print(f'O número de vendas é: {num_vendas}')
print(f'O número de vendedores é: {num_vendedores}')

O número de produtos é: 75000000
O número de vendas é: 20000040
O número de vendedores é: 10


## Número de produtos vendidos, no mínimo, uma vez

In [7]:
tabela_vendas.agg(countDistinct(col('product_id'))).show()



+-----------------+
|count(product_id)|
+-----------------+
|           993429|
+-----------------+





## Produto mais vendido

In [8]:
tabela_vendas.groupBy(col('product_id')).agg(count("*").alias("cnt")).orderBy(col("cnt").desc()).limit(1).show()



+----------+--------+
|product_id|     cnt|
+----------+--------+
|         0|19000000|
+----------+--------+



                                                                                

## Número de produtos distintos vendidos em cada dia

In [9]:
tabela_vendas.groupBy(col('date')).agg(countDistinct(col('product_id')).alias('distinct_product_sold')).orderBy(
            col('distinct_product_sold').desc()).show()



+----------+---------------------+
|      date|distinct_product_sold|
+----------+---------------------+
|2020-07-06|               100765|
|2020-07-09|               100501|
|2020-07-01|               100337|
|2020-07-03|               100017|
|2020-07-02|                99807|
|2020-07-05|                99796|
|2020-07-04|                99791|
|2020-07-07|                99756|
|2020-07-08|                99662|
|2020-07-10|                98973|
+----------+---------------------+





## Retorno médio das ordens

In [23]:
tabela_vendas.join(
    tabela_produtos, tabela_vendas['product_id'] == tabela_produtos['product_id'], 'inner'
    ).agg(avg(tabela_produtos['price'] * tabela_vendas['num_pieces_sold'])).show()



+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+



                                                                                

## Para cada vendedor, qual a contribuição média percentual de uma ordem, para a quota diária do vendedor

In [29]:
print(tabela_vendas.join(broadcast(tabela_vendedores), tabela_vendas["seller_id"] == tabela_vendedores["seller_id"], "inner").withColumn(
    "ratio", tabela_vendas["num_pieces_sold"]/tabela_vendedores["daily_target"]
).groupBy(tabela_vendas["seller_id"]).agg(avg("ratio")).show())

                                                                                

+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        7|2.595228787788170...|
|        3| 1.62888537056594E-4|
|        8|9.213030375408861E-5|
|        0|2.019885898946922...|
|        5|4.211073965904022E-5|
|        6|4.782147194369122E-5|
|        9|3.837913136180238E-5|
|        1|1.964233366461014...|
|        4|3.296428039825817E-5|
|        2|6.690408001060484E-5|
+---------+--------------------+

None


## Quem são o segundo maior vendedor e o menor vendedor para cada produto? Quem são aqueles com produtos iguais á "product_id = 0"

### Calculando o número de produtos vendidos por cada vendedor


In [32]:
tabela_vendas = tabela_vendas.groupBy(
    col('product_id'), col('seller_id')
    ).agg(sum('num_pieces_sold').alias('num_pieces_sold'))

### Cria-se as funções Window, uma para ordenar de modo crescente e a outra para modo decrescente
### Particiona-se por product_id e ordena por produtos vendidos


In [33]:
window_desc = Window.partitionBy(col('product_id')).orderBy(col('num_pieces_sold').desc())
window_asc = Window.partitionBy(col('product_id')).orderBy(col('num_pieces_sold').asc())


### Cria-se um Dense rank, para não ocorrer furos


In [34]:
tabela_vendas = tabela_vendas.withColumn(
    'rank_asc', dense_rank().over(window_asc)
).withColumn('rank_desc', dense_rank().over(window_desc))

### Seleção de produtos com apenas um único vendedor, ou produtos, quais múltiplos vendedores venderam a mesma quantidade


In [39]:
vendedores_especiais = tabela_vendas.where(col('rank_asc') == col('rank_desc')).select(
    col('product_id').alias('single_seller_product_id'), col('seller_id').alias('single_seller_seller_id'),
        lit('único vendedor ou múltiplos vendedores com os mesmos resultados').alias('type'))

In [44]:
## Segundo vendedor que mais vendeu

segundo_vendedor = tabela_vendas.where(col('rank_desc') == 2).select(
    col('product_id').alias('second_seller_product_id'), col('seller_id').alias('second_seller_seller_id'),
    lit('segundo maior vendedor').alias('type')
)

### Seleção do vendedor que menos vendeu, exclusão das colunas já incluídas e do segundo vendedor que mais vendeu


In [50]:
ultimo_vendedor = tabela_vendas.where(col('rank_asc') == 1).select(
    col('product_id'),
    col('seller_id'),
    lit('ultimo vendedor').alias('type')
).join(vendedores_especiais, (
    tabela_vendas['seller_id'] == vendedores_especiais['single_seller_seller_id']
    ) & (
    tabela_vendas['product_id'] == vendedores_especiais['single_seller_product_id']), 'left_anti'
    ).join(segundo_vendedor, (
        tabela_vendas['seller_id'] == segundo_vendedor['second_seller_seller_id']
        ) & (
        tabela_vendas['product_id'] == segundo_vendedor['second_seller_product_id']), 'left_anti')

In [52]:
## União

union_table = ultimo_vendedor.select(
    col('product_id'),
    col('seller_id'),
    col('type')
).union(segundo_vendedor.select(
    col('second_seller_product_id').alias('product_id'),
    col('second_seller_seller_id').alias('seller_id'),
    col('type')
)).union(vendedores_especiais.select(
    col('single_seller_product_id').alias('product_id'),
    col('single_seller_seller_id').alias('seller_id'),
    col('type')
))

union_table.show()



+----------+---------+---------------+
|product_id|seller_id|           type|
+----------+---------+---------------+
|  19986717|        1|ultimo vendedor|
|  40496308|        5|ultimo vendedor|
|  52606213|        7|ultimo vendedor|
|  14542470|        5|ultimo vendedor|
|  28592106|        5|ultimo vendedor|
|  17944574|        8|ultimo vendedor|
|  61475460|        7|ultimo vendedor|
|   3534470|        3|ultimo vendedor|
|  35669461|        4|ultimo vendedor|
|  32602520|        9|ultimo vendedor|
|  72017876|        1|ultimo vendedor|
|  67723231|        5|ultimo vendedor|
|  56011040|        5|ultimo vendedor|
|  34681047|        5|ultimo vendedor|
|  57735075|        9|ultimo vendedor|
|  18182299|        7|ultimo vendedor|
|  69790381|        5|ultimo vendedor|
|  31136332|        9|ultimo vendedor|
|  10978356|        7|ultimo vendedor|
|  20774718|        9|ultimo vendedor|
+----------+---------+---------------+
only showing top 20 rows





### Segundo maior vendedor e último vendedor para product_id == 0


In [55]:
union_table.where(col('product_id') == 0).show()



+----------+---------+--------------------+
|product_id|seller_id|                type|
+----------+---------+--------------------+
|         0|        0|único vendedor ou...|
+----------+---------+--------------------+



