In [1]:
# Importando bibliotecas
from faker import Faker
import random

# Instanciando objeto de geração de dados
fake = Faker()

# Configurando seeds
Faker.seed(42)
random.seed(42)

# Gerando dados aleatórios
num_rows = 350
fake_dates = [fake.date_between(start_date="-100d", end_date="today") for _ in range(num_rows)]
fake_users = [fake.first_name() for _ in range(num_rows)]
fake_jobs = [fake.job(max_length=15) for _ in range(num_rows)]
fake_file = [fake.file_extension() for _ in range(num_rows)]
fake_http_methods = [fake.http_method() for _ in range(num_rows)]
fake_servers = [fake.windows_platform_token() for _ in range(num_rows)]
fake_requests = [random.randrange(1, 200) for _ in range(num_rows)]

# Consolidando dados em um formato amigável para o Spark
spark_data = []
zipped_list = zip(fake_dates, fake_users, fake_jobs, \
    fake_http_methods, fake_file, fake_servers, \
    fake_requests)
    
for date, user, job, method, file, server, qtd in zipped_list:
    spark_data.append([date, user, job, method, file, server, qtd])

In [2]:
# Importando módulos do Spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField,\
    DateType, StringType, IntegerType

# Criando sessão no Spark
spark = SparkSession\
    .builder\
    .appName("windowing-functions")\
    .master("local[1]")\
    .getOrCreate()

# Definindo schema para os dados
sales_schema = StructType([
    StructField("date", DateType(), nullable=False),
    StructField("person", StringType(), nullable=False),
    StructField("job", StringType(), nullable=False),
    StructField("http_method", StringType(), nullable=False),
    StructField("file_ext", StringType(), nullable=False),
    StructField("server_os", StringType(), nullable=False),
    StructField("total_requests", IntegerType(), nullable=False)
])

# Criando DataFrame no Spark
df = spark.createDataFrame(data=spark_data, schema=sales_schema)\
    .orderBy("date")

# Visualizando dados
df.printSchema()
df.orderBy("date").show(20, truncate=False)

22/09/07 11:00:03 WARN Utils: Your hostname, panini-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.0.110 instead (on interface enp3s0)
22/09/07 11:00:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/07 11:00:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
 |-- date: date (nullable = false)
 |-- person: string (nullable = false)
 |-- job: string (nullable = false)
 |-- http_method: string (nullable = false)
 |-- file_ext: string (nullable = false)
 |-- server_os: string (nullable = false)
 |-- total_requests: integer (nullable = false)



                                                                                

+----------+---------+---------------+-----------+--------+-----------------------+--------------+
|date      |person   |job            |http_method|file_ext|server_os              |total_requests|
+----------+---------+---------------+-----------+--------+-----------------------+--------------+
|2022-05-30|Mitchell |Arboriculturist|DELETE     |odp     |Windows 98; Win 9x 4.90|137           |
|2022-05-30|David    |Air cabin crew |CONNECT    |wav     |Windows NT 6.2         |28            |
|2022-05-30|Katie    |Charity officer|DELETE     |avi     |Windows NT 6.1         |81            |
|2022-05-30|Jasmine  |Geochemist     |DELETE     |js      |Windows 98             |165           |
|2022-05-31|David    |Solicitor      |OPTIONS    |flac    |Windows NT 6.0         |63            |
|2022-05-31|Anne     |Stage manager  |HEAD       |webm    |Windows 98             |136           |
|2022-05-31|Debbie   |Engineer, site |TRACE      |flac    |Windows NT 4.0         |18            |
|2022-05-3

In [3]:
# Importando funções
from pyspark.sql.functions import col, expr, max

# Agrupando dados por data
df_max_requests = df.groupBy(col("date")).agg(
    max("total_requests").alias("max_requests")
).orderBy("date")

# Visualizando resultado
df_max_requests.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+----------+------------+
|      date|max_requests|
+----------+------------+
|2022-05-30|         165|
|2022-05-31|         136|
|2022-06-01|         143|
|2022-06-02|          68|
|2022-06-03|          98|
|2022-06-04|         174|
|2022-06-05|         185|
|2022-06-06|         143|
|2022-06-07|         195|
|2022-06-08|         187|
+----------+------------+
only showing top 10 rows



                                                                                

In [4]:
# Aplicando join
df_max_requests_join = df.join(
    other=df_max_requests,
    on=[df.date == df_max_requests.date],
    how="left"
).select(
    df["*"],
    df_max_requests["max_requests"]
).orderBy("date")

# Visualizando
df_max_requests_join.show(10)

+----------+--------+---------------+-----------+--------+--------------------+--------------+------------+
|      date|  person|            job|http_method|file_ext|           server_os|total_requests|max_requests|
+----------+--------+---------------+-----------+--------+--------------------+--------------+------------+
|2022-05-30| Jasmine|     Geochemist|     DELETE|      js|          Windows 98|           165|         165|
|2022-05-30|Mitchell|Arboriculturist|     DELETE|     odp|Windows 98; Win 9...|           137|         165|
|2022-05-30|   Katie|Charity officer|     DELETE|     avi|      Windows NT 6.1|            81|         165|
|2022-05-30|   David| Air cabin crew|    CONNECT|     wav|      Windows NT 6.2|            28|         165|
|2022-05-31|    Anne|  Stage manager|       HEAD|    webm|          Windows 98|           136|         136|
|2022-05-31|   David|      Solicitor|    OPTIONS|    flac|      Windows NT 6.0|            63|         136|
|2022-05-31| William|    Chi

In [5]:
# Importando funções
from pyspark.sql import Window

# Especificando janela de análise
window_date = Window.partitionBy(col("date"))

# Selecionando dados
df_max_daily_requests = df.select(
    "*",
    max(col("total_requests")).over(window_date).alias("max_daily_requests")
)

# Visualizando resultados
df_max_daily_requests.show(10)


+----------+--------+---------------+-----------+--------+--------------------+--------------+------------------+
|      date|  person|            job|http_method|file_ext|           server_os|total_requests|max_daily_requests|
+----------+--------+---------------+-----------+--------+--------------------+--------------+------------------+
|2022-05-30|   Katie|Charity officer|     DELETE|     avi|      Windows NT 6.1|            81|               165|
|2022-05-30| Jasmine|     Geochemist|     DELETE|      js|          Windows 98|           165|               165|
|2022-05-30|Mitchell|Arboriculturist|     DELETE|     odp|Windows 98; Win 9...|           137|               165|
|2022-05-30|   David| Air cabin crew|    CONNECT|     wav|      Windows NT 6.2|            28|               165|
|2022-05-31| William|    Chiropodist|       POST|     wav|      Windows NT 6.1|           130|               136|
|2022-05-31|    Anne|  Stage manager|       HEAD|    webm|          Windows 98|         

In [6]:
# Importando funções
from pyspark.sql.functions import rank

# Criando janela ordenada por requisições
window_spec = Window\
    .partitionBy(col("date"))\
    .orderBy(col("total_requests").desc())

# Criando consulta através da janela especificada
df_ranked_requests = df.select(
    "*",
    max("total_requests").over(window_spec).alias("max_daily_request"),
    rank().over(window_spec).alias("rank")
)

# Visualizando resultados
df_ranked_requests.show(10)

+----------+--------+---------------+-----------+--------+--------------------+--------------+-----------------+----+
|      date|  person|            job|http_method|file_ext|           server_os|total_requests|max_daily_request|rank|
+----------+--------+---------------+-----------+--------+--------------------+--------------+-----------------+----+
|2022-05-30| Jasmine|     Geochemist|     DELETE|      js|          Windows 98|           165|              165|   1|
|2022-05-30|Mitchell|Arboriculturist|     DELETE|     odp|Windows 98; Win 9...|           137|              165|   2|
|2022-05-30|   Katie|Charity officer|     DELETE|     avi|      Windows NT 6.1|            81|              165|   3|
|2022-05-30|   David| Air cabin crew|    CONNECT|     wav|      Windows NT 6.2|            28|              165|   4|
|2022-05-31|    Anne|  Stage manager|       HEAD|    webm|          Windows 98|           136|              136|   1|
|2022-05-31| William|    Chiropodist|       POST|     wa

In [7]:
df_ranked_requests.where(expr("rank = 1")).show(10)

+----------+---------+---------------+-----------+--------+--------------------+--------------+-----------------+----+
|      date|   person|            job|http_method|file_ext|           server_os|total_requests|max_daily_request|rank|
+----------+---------+---------------+-----------+--------+--------------------+--------------+-----------------+----+
|2022-05-30|  Jasmine|     Geochemist|     DELETE|      js|          Windows 98|           165|              165|   1|
|2022-05-31|     Anne|  Stage manager|       HEAD|    webm|          Windows 98|           136|              136|   1|
|2022-06-01|  William|Games developer|    CONNECT|     mov|      Windows NT 6.2|           143|              143|   1|
|2022-06-02|      Amy|        Curator|        GET|     png|      Windows NT 5.0|            68|               68|   1|
|2022-06-03|    Cindy|Legal secretary|    CONNECT|    docx|      Windows NT 5.2|            98|               98|   1|
|2022-06-03|Christine|  Administrator|     DELET

Incrementar:

    - Soma acumulada por mês

___

In [8]:
# Importando classe
from pyspark.sql import Window

# Definindo especificação de janela
window_spec = Window\
    .partitionBy("col_A")\
    .orderBy("col_B")

# Definindo especificação de janela
window_spec = Window\
    .partitionBy("col_A")\
    .orderBy("col_B")\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

## Preparando Dados

In [1]:
# Importando biblitoecas
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, \
    StringType, IntegerType, DoubleType, LongType
import os 

# Criando objeto de sessão
spark = SparkSession\
    .builder\
    .appName("agregacoes")\
    .getOrCreate()

# Definindo variáveis de diretório
home_path = os.path.expanduser('~')
data_path = os.path.join(home_path, 'dev/panini-tech-lab/data')
iot_path = os.path.join(data_path, 'iot-devices/iot_devices.json')

# Definindo schema para o arquivo a ser lido
iot_schema = StructType([
    StructField("device_id", IntegerType(), nullable=False),
    StructField("device_name", StringType(), nullable=True),
    StructField("ip", StringType(), nullable=True),
    StructField("cca2", StringType(), nullable=True),
    StructField("cca3", StringType(), nullable=True),
    StructField("cn", StringType(), nullable=True),
    StructField("latitude", DoubleType(), nullable=True),
    StructField("longitude", DoubleType(), nullable=True),
    StructField("scale", StringType(), nullable=True),
    StructField("temp", IntegerType(), nullable=True),
    StructField("humidity", IntegerType(), nullable=True),
    StructField("battery_level", StringType(), nullable=True),
    StructField("c02_level", IntegerType(), nullable=True),
    StructField("lcd", StringType(), nullable=True),
    StructField("timestamp", LongType(), nullable=False)
])

# Lendo dados
df_iot_raw = spark.read.format("json")\
    .schema(iot_schema)\
    .load(iot_path)

# Criando tabelas temporárias
df_iot_raw.createOrReplaceTempView("tbl_iot")

# Visualizando dados
df_iot_raw.printSchema()
df_iot_raw.show(10)

22/09/07 11:31:58 WARN Utils: Your hostname, panini-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.0.110 instead (on interface enp3s0)
22/09/07 11:31:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/07 11:32:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
 |-- device_id: integer (nullable = true)
 |-- device_name: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- battery_level: string (nullable = true)
 |-- c02_level: integer (nullable = true)
 |-- lcd: string (nullable = true)
 |-- timestamp: long (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+---------+--------------------+---------------+----+----+-------------+--------+---------+-------+----+--------+-------------+---------+------+-------------+
|device_id|         device_name|             ip|cca2|cca3|           cn|latitude|longitude|  scale|temp|humidity|battery_level|c02_level|   lcd|    timestamp|
+---------+--------------------+---------------+----+----+-------------+--------+---------+-------+----+--------+-------------+---------+------+-------------+
|        1|meter-gauge-1xbYRYcj|   68.161.225.1|  US| USA|United States|    38.0|    -97.0|Celsius|  34|      51|            8|      868| green|1458444054093|
|        2|   sensor-pad-2n2Pea|  213.161.254.1|  NO| NOR|       Norway|   62.47|     6.15|Celsius|  11|      70|            7|     1473|   red|1458444054119|
|        3| device-mac-36TWSKiT|      88.36.5.1|  IT| ITA|        Italy|   42.83|    12.83|Celsius|  19|      44|            2|     1556|   red|1458444054120|
|        4|   sensor-pad-4mzWkz|  66.39.173.15

                                                                                

In [12]:
# Importando funções
import random
from pyspark.sql.functions import udf, monotonically_increasing_id,\
    split, to_date, from_unixtime, col, expr

# Criando lista de valores aleatórios para serem adicionados ao DataFrame
random.seed(42)
rows = df_iot_raw.count()
date_add = [random.randint(a=-10000000, b=10000000) for i in range(rows)]

# Gerando base de dados de trabalho
df_iot = df_iot_raw.select(
    split(col("device_name"), "-")[0].alias("device_type"),
    col("cn").alias("country"),
    col("temp"),
    col("humidity"),
    col("c02_level"),
    col("timestamp").alias("timestamp_raw")
).withColumn("date_add", udf(lambda id: date_add[id])(monotonically_increasing_id()))\
    .withColumn("timestamp", from_unixtime((col("timestamp_raw") / 1000) + col("date_add")))\
    .withColumn("date", to_date(col("timestamp")))\
    .drop("timestamp_raw", "date_add")

# Visualizando dados
df_iot.orderBy("timestamp").show(10)



+-----------+-------------+----+--------+---------+-------------------+----------+
|device_type|      country|temp|humidity|c02_level|          timestamp|      date|
+-----------+-------------+----+--------+---------+-------------------+----------+
|      therm|United States|  25|      30|     1566|2015-11-25 07:39:33|2015-11-25|
|      meter|United States|  33|      43|      965|2015-11-25 07:39:35|2015-11-25|
|     sensor|United States|  16|      97|     1385|2015-11-25 07:39:36|2015-11-25|
|      meter|United States|  16|      30|     1047|2015-11-25 07:39:38|2015-11-25|
|      therm|United States|  17|      70|      816|2015-11-25 07:54:13|2015-11-25|
|     device|United States|  31|      62|      834|2015-11-25 07:54:15|2015-11-25|
|     sensor|United States|  20|      86|     1184|2015-11-25 07:54:16|2015-11-25|
|      meter| Saudi Arabia|  27|      72|     1318|2015-11-25 07:54:18|2015-11-25|
|      meter|      Hungary|  17|      42|     1256|2015-11-25 08:02:42|2015-11-25|
|   

                                                                                

## Agregando dados

In [14]:
# Importando função
from pyspark.sql.functions import avg, round, count

# Aplicando agrupamento
df_country_temp = df_iot.groupBy("country").agg(
    round(avg(col("temp")), 2).alias("avg_temp"),
    count("*").alias("measurements")
)

# Visualizando resultado
df_country_temp.orderBy("measurements", ascending=False).show(10)



+-----------------+--------+------------+
|          country|avg_temp|measurements|
+-----------------+--------+------------+
|    United States|   21.99|       68545|
|            China|   21.91|       14455|
|            Japan|   22.07|       12100|
|Republic of Korea|   22.15|       11879|
|          Germany|   21.99|        7942|
|   United Kingdom|   22.09|        6486|
|           Canada|   21.91|        6041|
|           Russia|   22.15|        5989|
|           France|   22.12|        5305|
|           Brazil|   21.96|        3224|
+-----------------+--------+------------+
only showing top 10 rows



                                                                                

In [17]:
# Importando módulos
from pyspark.sql import Window

# Definindo especificação de janela
window_spec = Window.partitionBy("country")

# Aplicando consulta
df_country_temp_window = df_iot\
    .where(expr("country != ''"))\
        .select(
        "country",
        "temp",
        "humidity",
        "c02_level",
        "date",
        round(avg("temp").over(window_spec), 2).alias("avg_cn_temp"),
        count("*").over(window_spec).alias("cn_meas")
    )

# Importando função para ordenação
from pyspark.sql.functions import rand

# Visualizando resultado
df_country_temp_window.orderBy(rand()).show(10)



22/09/07 11:58:52 WARN DAGScheduler: Broadcasting large task binary with size 1012.2 KiB
+-------------+----+--------+---------+----------+-----------+-------+
|      country|temp|humidity|c02_level|      date|avg_cn_temp|cn_meas|
+-------------+----+--------+---------+----------+-----------+-------+
|       France|  14|      85|     1201|2016-01-25|      22.12|   5305|
|United States|  12|      91|      931|2016-03-25|      21.99|  68545|
|United States|  33|      70|     1336|2016-02-10|      21.99|  68545|
|United States|  27|      89|     1584|2015-12-15|      21.99|  68545|
|        Japan|  25|      40|     1350|2016-02-06|      22.07|  12100|
|        China|  26|      28|     1560|2015-12-13|      21.91|  14455|
|        China|  28|      87|     1532|2016-02-03|      21.91|  14455|
|    Australia|  25|      38|     1376|2016-03-21|      21.91|   3119|
|United States|  13|      85|      876|2016-05-25|      21.99|  68545|
|United States|  17|      26|      862|2016-02-29|      21.

                                                                                

___

## Exemplos Práticos

### rank() e dense_rank()

CORRIGIR WINDOWING POR DATA PARA EVITAR O AGRUPAMENTO POR DATA...

A INTENÇÃO É TRAZER O MÁXIMO CONSIDERANDO A BASE INTEIRA

In [23]:
# Especificando janela
window_date_c02 = Window\
    .partitionBy("date")\
    .orderBy(col("c02_level").desc())\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Importando funções
from pyspark.sql.functions import rank, dense_rank

# Consultando datas de pico de emissão de carbono
df_c02_peak = df_iot.select(
    col("device_type"),
    col("country"),
    col("c02_level"),
    col("date"),
    col("timestamp"),
    rank().over(window_date_c02).alias("c02_rank"),
    dense_rank().over(window_date_c02).alias("c02_dense_rank")
)

# Visualizando dados
df_c02_peak.orderBy("date").show(10)



22/09/07 12:03:03 WARN DAGScheduler: Broadcasting large task binary with size 1010.3 KiB
+-----------+-----------------+---------+----------+-------------------+--------+--------------+
|device_type|          country|c02_level|      date|          timestamp|c02_rank|c02_dense_rank|
+-----------+-----------------+---------+----------+-------------------+--------+--------------+
|     sensor|    United States|     1599|2015-11-25|2015-11-25 18:56:04|       1|             1|
|     device|            China|     1599|2015-11-25|2015-11-25 17:26:27|       1|             1|
|     sensor|    United States|     1598|2015-11-25|2015-11-25 21:16:20|       3|             2|
|      therm|            Japan|     1597|2015-11-25|2015-11-25 14:58:09|       4|             3|
|     sensor|   United Kingdom|     1596|2015-11-25|2015-11-25 16:58:48|       5|             4|
|     sensor|            China|     1596|2015-11-25|2015-11-25 21:06:39|       5|             4|
|     sensor|      Netherlands|     15

                                                                                

In [28]:
# Top 3 ofensores por dia
df_c02_peak\
    .where(expr("c02_rank <= 3"))\
    .orderBy("date")\
    .show(15)



22/09/07 14:00:12 WARN DAGScheduler: Broadcasting large task binary with size 1014.0 KiB
+-----------+-------------+---------+----------+-------------------+--------+--------------+
|device_type|      country|c02_level|      date|          timestamp|c02_rank|c02_dense_rank|
+-----------+-------------+---------+----------+-------------------+--------+--------------+
|     sensor|United States|     1599|2015-11-25|2015-11-25 18:56:04|       1|             1|
|     sensor|United States|     1598|2015-11-25|2015-11-25 21:16:20|       3|             2|
|     device|        China|     1599|2015-11-25|2015-11-25 17:26:27|       1|             1|
|     sensor|United States|     1599|2015-11-26|2015-11-26 03:39:42|       1|             1|
|     device|        China|     1599|2015-11-26|2015-11-26 17:59:50|       1|             1|
|      meter|        China|     1599|2015-11-26|2015-11-26 12:05:08|       1|             1|
|     sensor|United States|     1598|2015-11-27|2015-11-27 21:21:44|      

                                                                                

In [38]:
# Importando função
from pyspark.sql.functions import collect_list, collect_set

# Quantidade de vezes que um país ficou no top 1
df_c02_peak\
    .where(expr("c02_rank == 1"))\
    .groupBy("country").agg(
        count("*").alias("count_main_c02_emissor"),
        collect_list("timestamp").alias("c02_peak_timestamps")
    )\
    .orderBy("count_main_c02_emissor", ascending=False)\
    .show(10)



22/09/07 14:17:55 WARN DAGScheduler: Broadcasting large task binary with size 1013.3 KiB
22/09/07 14:17:56 WARN DAGScheduler: Broadcasting large task binary with size 1013.6 KiB
+-----------------+----------------------+--------------------+
|          country|count_main_c02_emissor| c02_peak_timestamps|
+-----------------+----------------------+--------------------+
|    United States|                   132|[2016-01-30 19:12...|
|            China|                    37|[2016-01-31 16:15...|
|            Japan|                    25|[2016-02-03 15:14...|
|Republic of Korea|                    23|[2015-12-29 03:28...|
|          Germany|                    17|[2016-04-05 19:41...|
|   United Kingdom|                    12|[2015-12-16 13:59...|
|           Russia|                    12|[2016-04-09 20:36...|
|           Canada|                    10|[2016-01-30 15:50...|
|           Brazil|                    10|[2016-01-13 01:53...|
|           Sweden|                     7|[2016-01-23 

                                                                                

___

### percent_rank() e ntile()

In [71]:
# Importando funções
from pyspark.sql.functions import percent_rank, ntile, round

# Comparando diferentes funções de ranqueamento
df_c02_peak_ranking = df_iot.select(
    "device_type",
    "country",
    "temp",
    "humidity",
    "c02_level",
    "date",
    rank().over(window_date_c02).alias("rank"),
    dense_rank().over(window_date_c02).alias("dense_rank"),
    round(percent_rank().over(window_date_c02), 5).alias("percent_rank"),
    ntile(n=100).over(window_date_c02).alias("ntile_100")
)

# Visualizando dados
df_c02_peak_ranking.orderBy(["date", "rank"]).show(15)



22/09/07 16:07:12 WARN DAGScheduler: Broadcasting large task binary with size 1018.7 KiB
+-----------+-----------------+----+--------+---------+----------+----+----------+------------+---------+
|device_type|          country|temp|humidity|c02_level|      date|rank|dense_rank|percent_rank|ntile_100|
+-----------+-----------------+----+--------+---------+----------+----+----------+------------+---------+
|     sensor|    United States|  29|      98|     1599|2015-11-25|   1|         1|         0.0|        1|
|     device|            China|  24|      80|     1599|2015-11-25|   1|         1|         0.0|        1|
|     sensor|    United States|  20|      57|     1598|2015-11-25|   3|         2|     0.00338|        1|
|      therm|            Japan|  33|      43|     1597|2015-11-25|   4|         3|     0.00507|        1|
|     sensor|            China|  23|      34|     1596|2015-11-25|   5|         4|     0.00676|        1|
|     sensor|   United Kingdom|  32|      66|     1596|2015-11-

                                                                                

In [73]:
# Top 1% países mais emissores de um dia específico
df_c02_peak_ranking\
    .where(expr("date = '2015-11-25'"))\
    .where(expr("percent_rank <= 0.01"))\
    .show()

22/09/07 16:08:07 WARN DAGScheduler: Broadcasting large task binary with size 1001.0 KiB




22/09/07 16:08:09 WARN DAGScheduler: Broadcasting large task binary with size 1021.9 KiB
+-----------+-----------------+----+--------+---------+----------+----+----------+------------+---------+
|device_type|          country|temp|humidity|c02_level|      date|rank|dense_rank|percent_rank|ntile_100|
+-----------+-----------------+----+--------+---------+----------+----+----------+------------+---------+
|     sensor|    United States|  29|      98|     1599|2015-11-25|   1|         1|         0.0|        1|
|     device|            China|  24|      80|     1599|2015-11-25|   1|         1|         0.0|        1|
|     sensor|    United States|  20|      57|     1598|2015-11-25|   3|         2|     0.00338|        1|
|      therm|            Japan|  33|      43|     1597|2015-11-25|   4|         3|     0.00507|        1|
|     sensor|   United Kingdom|  32|      66|     1596|2015-11-25|   5|         4|     0.00676|        1|
|     sensor|            China|  23|      34|     1596|2015-11-

                                                                                

In [74]:
# Visualizando o efeito de ntile na borda de baixo
df_c02_peak_ranking\
    .where(expr("date = '2015-11-25'"))\
    .orderBy("c02_level")\
    .show(10)

22/09/07 16:08:20 WARN DAGScheduler: Broadcasting large task binary with size 1001.0 KiB


[Stage 221:>                                                        (0 + 4) / 4]

22/09/07 16:08:21 WARN DAGScheduler: Broadcasting large task binary with size 1020.4 KiB
+-----------+---------------+----+--------+---------+----------+----+----------+------------+---------+
|device_type|        country|temp|humidity|c02_level|      date|rank|dense_rank|percent_rank|ntile_100|
+-----------+---------------+----+--------+---------+----------+----+----------+------------+---------+
|      meter|  United States|  33|      85|      801|2015-11-25| 592|       426|     0.99831|      100|
|      meter|Slovak Republic|  15|      77|      801|2015-11-25| 592|       426|     0.99831|      100|
|     sensor|  United States|  12|      29|      802|2015-11-25| 590|       425|     0.99493|      100|
|     device|  United States|  11|      82|      802|2015-11-25| 590|       425|     0.99493|      100|
|     sensor|        Germany|  25|      50|      803|2015-11-25| 585|       424|     0.98649|       99|
|      meter|  United States|  20|      96|      803|2015-11-25| 585|       424

                                                                                

___

## Funções window: analytics

### lag() e lead()

In [97]:
# Importando funções
from pyspark.sql.functions import lag

# Definindo especificação de janela
window_spec = Window\
    .partitionBy("country")\
    .orderBy("timestamp")

# Obtendo temperaturas anteriores e posteriores para cálculo do delta
df_delta_temp = df_iot.where(expr("country != ''")).select(
    "country",
    "timestamp",
    "temp",
    lag("temp", offset=1).over(window_spec).alias("preceding_temp")
).withColumn("delta_temp", expr("temp - preceding_temp"))

# Visualizando resultado
df_delta_temp\
    .where(expr("country = 'United States'"))\
    .orderBy("timestamp")\
    .show(15)



22/09/07 17:25:19 WARN DAGScheduler: Broadcasting large task binary with size 1009.4 KiB
+-------------+-------------------+----+--------------+----------+
|      country|          timestamp|temp|preceding_temp|delta_temp|
+-------------+-------------------+----+--------------+----------+
|United States|2015-11-25 07:39:33|  25|          null|      null|
|United States|2015-11-25 07:39:35|  33|            25|         8|
|United States|2015-11-25 07:39:36|  16|            33|       -17|
|United States|2015-11-25 07:39:38|  16|            16|         0|
|United States|2015-11-25 07:54:13|  17|            16|         1|
|United States|2015-11-25 07:54:15|  31|            17|        14|
|United States|2015-11-25 07:54:16|  20|            31|       -11|
|United States|2015-11-25 08:02:43|  30|            20|        10|
|United States|2015-11-25 08:07:46|  14|            30|       -16|
|United States|2015-11-25 08:22:04|  26|            14|        12|
|United States|2015-11-25 08:22:06|  24|

                                                                                

In [108]:
# Especificando janela com deltas positivos recebendo as primeiras posições
window_spec_ranking = Window\
    .partitionBy("country", to_date(col("timestamp")))\
    .orderBy(col("delta_temp").desc())\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Criando nova coluna com base em ranking
df_delta_temp_rank = df_delta_temp.withColumn("rank_delta_temp", rank().over(window_spec_ranking))\

# Visualizando resultado
df_delta_temp_rank\
    .where(expr("country = 'United States' and to_date(timestamp) = '2015-11-25'"))\
    .orderBy("rank_delta_temp")\
    .show(15)



22/09/07 17:43:46 WARN DAGScheduler: Broadcasting large task binary with size 1019.7 KiB
+-------------+-------------------+----+--------------+----------+---------------+
|      country|          timestamp|temp|preceding_temp|delta_temp|rank_delta_temp|
+-------------+-------------------+----+--------------+----------+---------------+
|United States|2015-11-25 18:49:27|  34|            13|        21|              1|
|United States|2015-11-25 23:41:12|  31|            10|        21|              1|
|United States|2015-11-25 23:12:58|  33|            13|        20|              3|
|United States|2015-11-25 12:48:48|  34|            15|        19|              4|
|United States|2015-11-25 11:21:19|  29|            10|        19|              4|
|United States|2015-11-25 22:09:26|  32|            13|        19|              4|
|United States|2015-11-25 18:56:04|  29|            11|        18|              7|
|United States|2015-11-25 17:15:25|  28|            10|        18|              7

                                                                                

In [112]:
# Importando função
from pyspark.sql.functions import lag, lead

# Visualizando temperaturas anteriores e posteriores
df_delta_temp = df_iot.where(expr("country != ''")).select(
    "country",
    "timestamp",
    "temp",
    lag("temp", offset=1).over(window_spec).alias("preceding_temp"),
    lead("temp", offset=1).over(window_spec).alias("following_temp")
)

# Visualizando resultado
df_delta_temp\
    .where(expr("country = 'United States'"))\
    .orderBy("timestamp")\
    .show(10)



22/09/07 17:51:48 WARN DAGScheduler: Broadcasting large task binary with size 1006.8 KiB
+-------------+-------------------+----+--------------+--------------+
|      country|          timestamp|temp|preceding_temp|following_temp|
+-------------+-------------------+----+--------------+--------------+
|United States|2015-11-25 07:39:33|  25|          null|            33|
|United States|2015-11-25 07:39:35|  33|            25|            16|
|United States|2015-11-25 07:39:36|  16|            33|            16|
|United States|2015-11-25 07:39:38|  16|            16|            17|
|United States|2015-11-25 07:54:13|  17|            16|            31|
|United States|2015-11-25 07:54:15|  31|            17|            20|
|United States|2015-11-25 07:54:16|  20|            31|            30|
|United States|2015-11-25 08:02:43|  30|            20|            14|
|United States|2015-11-25 08:07:46|  14|            30|            26|
|United States|2015-11-25 08:22:04|  26|            14|    

                                                                                

___

## Funções window: agregações

In [116]:
# Importando função
from pyspark.sql.functions import sum

# Defindo janela
window_spec = Window\
    .partitionBy("country", "date")\
    .orderBy("timestamp")\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Retornando soma acumulada de emissão de gás carbônico por dia
df_c02_cumulative = df_iot.select(
    "country",
    "date",
    "timestamp",
    "c02_level",
    sum("c02_level").over(window_spec).alias("cumulative_c02")
)

# Visualizando resultado
df_c02_cumulative\
    .where(expr("country = 'United States'"))\
    .orderBy("timestamp")\
    .show(15)



22/09/07 18:12:34 WARN DAGScheduler: Broadcasting large task binary with size 1008.2 KiB
+-------------+----------+-------------------+---------+--------------+
|      country|      date|          timestamp|c02_level|cumulative_c02|
+-------------+----------+-------------------+---------+--------------+
|United States|2015-11-25|2015-11-25 07:39:33|     1566|          1566|
|United States|2015-11-25|2015-11-25 07:39:35|      965|          2531|
|United States|2015-11-25|2015-11-25 07:39:36|     1385|          3916|
|United States|2015-11-25|2015-11-25 07:39:38|     1047|          4963|
|United States|2015-11-25|2015-11-25 07:54:13|      816|          5779|
|United States|2015-11-25|2015-11-25 07:54:15|      834|          6613|
|United States|2015-11-25|2015-11-25 07:54:16|     1184|          7797|
|United States|2015-11-25|2015-11-25 08:02:43|      967|          8764|
|United States|2015-11-25|2015-11-25 08:07:46|     1434|         10198|
|United States|2015-11-25|2015-11-25 08:22:04| 

                                                                                

### Média Móvel

In [134]:
# Importando função
from pyspark.sql.functions import avg

# Definindo especificação de janela
window_spec_avg = Window\
    .partitionBy("country", "date")\
    .orderBy("timestamp")\
    .rowsBetween(-2, Window.currentRow)

# Retornando a média móvel de temperatura
df_rolling_avg_temp = df_iot.select(
    "country",
    "date",
    "timestamp",
    "temp",
    round(avg("temp").over(window_spec_avg), 2).alias("rolling_avg_temp")
)

# Visualizando resultado
df_rolling_avg_temp\
    .where(expr("country = 'United States'"))\
    .orderBy("timestamp")\
    .show(15)



22/09/07 18:37:55 WARN DAGScheduler: Broadcasting large task binary with size 1011.6 KiB
+-------------+----------+-------------------+----+----------------+
|      country|      date|          timestamp|temp|rolling_avg_temp|
+-------------+----------+-------------------+----+----------------+
|United States|2015-11-25|2015-11-25 07:39:33|  25|            25.0|
|United States|2015-11-25|2015-11-25 07:39:35|  33|            29.0|
|United States|2015-11-25|2015-11-25 07:39:36|  16|           24.67|
|United States|2015-11-25|2015-11-25 07:39:38|  16|           21.67|
|United States|2015-11-25|2015-11-25 07:54:13|  17|           16.33|
|United States|2015-11-25|2015-11-25 07:54:15|  31|           21.33|
|United States|2015-11-25|2015-11-25 07:54:16|  20|           22.67|
|United States|2015-11-25|2015-11-25 08:02:43|  30|            27.0|
|United States|2015-11-25|2015-11-25 08:07:46|  14|           21.33|
|United States|2015-11-25|2015-11-25 08:22:04|  26|           23.33|
|United States

                                                                                

In [137]:
# Definindo especificação de janela para função lag
window_spec_lag = Window\
    .partitionBy("country", "date")\
    .orderBy("timestamp")

# Criando média móvel na mão via lag
df_avg_lag = df_iot.select(
    "country",
    "date",
    "timestamp",
    "temp",
    lag("temp", offset=1).over(window_spec_lag).alias("temp_t1"),
    lag("temp", offset=2).over(window_spec_lag).alias("temp_t2"),
    round(avg("temp").over(window_spec_avg), 2).alias("window_rolling_avg")
).selectExpr(
    "*",
    "case when temp_t1 is null and temp_t2 is null then temp \
        when temp_t1 is not null and temp_t2 is null then round((temp + temp_t1) / 2, 2) \
        else round((temp + temp_t1 + temp_t2) / 3, 2) \
    end as manual_rolling_avg"
)

# Visualizando resultado
df_avg_lag\
    .where(expr("country = 'United States'"))\
    .orderBy("timestamp")\
    .show(15)



22/09/07 18:42:18 WARN DAGScheduler: Broadcasting large task binary with size 1017.2 KiB
+-------------+----------+-------------------+----+-------+-------+------------------+------------------+
|      country|      date|          timestamp|temp|temp_t1|temp_t2|window_rolling_avg|manual_rolling_avg|
+-------------+----------+-------------------+----+-------+-------+------------------+------------------+
|United States|2015-11-25|2015-11-25 07:39:33|  25|   null|   null|              25.0|              25.0|
|United States|2015-11-25|2015-11-25 07:39:35|  33|     25|   null|              29.0|              29.0|
|United States|2015-11-25|2015-11-25 07:39:36|  16|     33|     25|             24.67|             24.67|
|United States|2015-11-25|2015-11-25 07:39:38|  16|     16|     33|             21.67|             21.67|
|United States|2015-11-25|2015-11-25 07:54:13|  17|     16|     16|             16.33|             16.33|
|United States|2015-11-25|2015-11-25 07:54:15|  31|     17|    

                                                                                

In [145]:
days = lambda i: i * 86400 

# Especificando janela para média móvel dos últimos 7 dias
window_spec_avg_timeseries = Window\
    .partitionBy("country", "date")\
    .orderBy("timestamp")\
    .rangeBetween(-days(7), 0)

# Aplicando média móvel em janela temporal
df_rolling_avg_temp_dynamic = df_iot.select(
    "country",
    "date",
    "timestamp",
    "temp",
    avg("temp").over(window_spec_avg_timeseries).alias("rolling_avg_temp")
)

AnalysisException: cannot resolve 'RANGE BETWEEN CAST(-604800L AS STRING) FOLLOWING AND CURRENT ROW' due to data type mismatch: The data type of the lower bound 'string' does not match the expected data type '(numeric or interval day to second or interval year to month or interval)'.;
'Project [country#944, date#970, timestamp#961, temp#9, avg(temp#9) windowspecdefinition(country#944, date#970, timestamp#961 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, cast(-604800 as string), currentrow$())) AS rolling_avg_temp#5292]
+- Project [device_type#943, country#944, temp#9, humidity#10, c02_level#12, timestamp#961, date#970]
   +- Project [device_type#943, country#944, temp#9, humidity#10, c02_level#12, timestamp_raw#945L, date_add#953, timestamp#961, to_date(timestamp#961, None, Some(America/Sao_Paulo)) AS date#970]
      +- Project [device_type#943, country#944, temp#9, humidity#10, c02_level#12, timestamp_raw#945L, date_add#953, from_unixtime(cast(((cast(timestamp_raw#945L as double) / cast(1000 as double)) + cast(date_add#953 as double)) as bigint), yyyy-MM-dd HH:mm:ss, Some(America/Sao_Paulo)) AS timestamp#961]
         +- Project [device_type#943, country#944, temp#9, humidity#10, c02_level#12, timestamp_raw#945L, <lambda>(monotonically_increasing_id())#952 AS date_add#953]
            +- Project [split(device_name#1, -, -1)[0] AS device_type#943, cn#5 AS country#944, temp#9, humidity#10, c02_level#12, timestamp#14L AS timestamp_raw#945L]
               +- Relation [device_id#0,device_name#1,ip#2,cca2#3,cca3#4,cn#5,latitude#6,longitude#7,scale#8,temp#9,humidity#10,battery_level#11,c02_level#12,lcd#13,timestamp#14L] json


Assuntos:

- Teoria sobre funções *window*
- Obtenção e preparação dos dados
- Exemplo inicial de agregações e funções window
- Etapas para aplicação de funções window (definição de especificação, etc)
    - Exemplos práticos de função window
    - Funções de ranking
        - rank() e dense_rank()
        - row_number()
        - percent_rank(),
        - ntile()
    - Funções analíticas
        - lag()
        - lead()
    - Funções de agregação
        - exemplos práticos (média, máximo, soma cumulativa, etc)
