In [27]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql import SparkSession

In [9]:
# inicialize session and get context spark
spark = SparkSession.builder.appName("teste").getOrCreate()
sc = spark.sparkContext

In [33]:
columns = ["transacao","municipio","estado", "data_atualizacao"]
data = [(22000, 'uberlandia', 'MG', '25/02/2021'),
        (25000,'rio de janeiro', 'RJ', '22/02/2021'),
        (27000,'sao paulo', 'SP', '25/02/2021'),
        (35000,'uberlandia', 'MG', '23/02/2021'),
        (1000,'uberlandia', 'MG', '21/02/2021'),
        (122000,'rio de janeiro', 'RJ', '20/02/2021'),
        (200,'belo horizonte', 'MG', '25/02/2021'),
        (800,'sao paulo', 'SP', '24/02/2021'),
        (222,'belo horizonte', 'MG', '22/02/2021'),
        (13000,'rio de janeiro', 'RJ', '23/02/2021'),
        (1000,'sao paulo', 'SP', '25/02/2021'),
        (30000,'uberlandia', 'MG', '21/02/2021')]
#rdd = spark.sparkContext.parallelize(data)


In [34]:
schema = StructType([ \
    StructField("transacao",IntegerType(),True), \
    StructField("municipio",StringType(),True), \
    StructField("estado",StringType(),True), \
    StructField("data_atualizacao", StringType(), True),
  ])

In [52]:
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- transacao: integer (nullable = true)
 |-- municipio: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data_atualizacao: string (nullable = true)

+---------+--------------+------+----------------+
|transacao|municipio     |estado|data_atualizacao|
+---------+--------------+------+----------------+
|22000    |uberlandia    |MG    |25/02/2021      |
|25000    |rio de janeiro|RJ    |22/02/2021      |
|27000    |sao paulo     |SP    |25/02/2021      |
|35000    |uberlandia    |MG    |23/02/2021      |
|1000     |uberlandia    |MG    |21/02/2021      |
|122000   |rio de janeiro|RJ    |20/02/2021      |
|200      |belo horizonte|MG    |25/02/2021      |
|800      |sao paulo     |SP    |24/02/2021      |
|222      |belo horizonte|MG    |22/02/2021      |
|13000    |rio de janeiro|RJ    |23/02/2021      |
|1000     |sao paulo     |SP    |25/02/2021      |
|30000    |uberlandia    |MG    |21/02/2021      |
+---------+--------------+------+----------------+



In [53]:
df = df.withColumn("data_atualizacao", unix_timestamp(col("DATA_ATUALIZACAO"), 'dd/mm/yyyy').cast('timestamp'))

In [54]:
#Considerando o data frame agrupar por município e considerando a data de atualização definir a ordem da transação.

In [55]:
df = df.orderBy("data_atualizacao") \
        .groupby(col('municipio'),col('estado'), col('data_atualizacao')) \
        .sum() \
        .withColumnRenamed("sum(transacao)", 'transacao')

In [56]:
df.printSchema()
df.show(truncate=False)

root
 |-- municipio: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data_atualizacao: timestamp (nullable = true)
 |-- transacao: long (nullable = true)

+--------------+------+-------------------+---------+
|municipio     |estado|data_atualizacao   |transacao|
+--------------+------+-------------------+---------+
|belo horizonte|MG    |2021-01-25 00:02:00|200      |
|uberlandia    |MG    |2021-01-23 00:02:00|35000    |
|belo horizonte|MG    |2021-01-22 00:02:00|222      |
|rio de janeiro|RJ    |2021-01-20 00:02:00|122000   |
|rio de janeiro|RJ    |2021-01-22 00:02:00|25000    |
|rio de janeiro|RJ    |2021-01-23 00:02:00|13000    |
|sao paulo     |SP    |2021-01-25 00:02:00|28000    |
|uberlandia    |MG    |2021-01-25 00:02:00|22000    |
|sao paulo     |SP    |2021-01-24 00:02:00|800      |
|uberlandia    |MG    |2021-01-21 00:02:00|31000    |
+--------------+------+-------------------+---------+



In [None]:
#O data frame final precisa conter um campo ordenando as transações por município
#Transação, Município, Estado, data de Atualização e Ordem da Transação

In [57]:
w = Window().orderBy("MUNICIPIO")
df2 = df.withColumn("ORDEM_TRANSACAO", row_number().over(w))

In [58]:
df2.printSchema()
df2.show(truncate=False)

root
 |-- municipio: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data_atualizacao: timestamp (nullable = true)
 |-- transacao: long (nullable = true)
 |-- ORDEM_TRANSACAO: integer (nullable = true)

+--------------+------+-------------------+---------+---------------+
|municipio     |estado|data_atualizacao   |transacao|ORDEM_TRANSACAO|
+--------------+------+-------------------+---------+---------------+
|belo horizonte|MG    |2021-01-25 00:02:00|200      |1              |
|belo horizonte|MG    |2021-01-22 00:02:00|222      |2              |
|rio de janeiro|RJ    |2021-01-20 00:02:00|122000   |3              |
|rio de janeiro|RJ    |2021-01-22 00:02:00|25000    |4              |
|rio de janeiro|RJ    |2021-01-23 00:02:00|13000    |5              |
|sao paulo     |SP    |2021-01-25 00:02:00|28000    |6              |
|sao paulo     |SP    |2021-01-24 00:02:00|800      |7              |
|uberlandia    |MG    |2021-01-23 00:02:00|35000    |8              |
|uberla