## Imports

In [14]:
import os
import shutil
from utils import ajust_file_tmp, create_dir, directory_tmp
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col,date_format

In [24]:
tblists = create_dir(directory_tmp,"../data/input/")
list(map(ajust_file_tmp,tblists))

['events-8b908f99-0c4b-40f7-ba0c-70928dd95aeb.json, Ajustado com sucesso!',
 'events-f13735e0-1796-4e5a-ac28-b741a43283b4.json, Ajustado com sucesso!',
 'events-5a1d977f-d295-4b8b-aa8f-3a0dbb661dfb.json, Ajustado com sucesso!',
 'events-1f7df57a-e1c9-4b76-b795-962de9cc292e.json, Ajustado com sucesso!',
 'events-a17ec8ab-8207-47e4-b632-77067f2622ae.json, Ajustado com sucesso!',
 'events-d9107ca9-619e-4bf2-b281-4b4a34f7fb44.json, Ajustado com sucesso!',
 'events-ea3d7fe8-e94a-4380-b1c1-0e9e7b972168.json, Ajustado com sucesso!',
 'events-88d5d460-d3ab-4058-9abd-fd07e033a337.json, Ajustado com sucesso!',
 'events-2924398a-d266-4f0e-b9f9-d7d35979ec7f.json, Ajustado com sucesso!',
 'events-6dea23dd-f46e-4f35-b948-4abe9cd6d1d7.json, Ajustado com sucesso!',
 'events-c3c321e9-d229-4434-b8d2-a561698b184f.json, Ajustado com sucesso!',
 'events-b9cd4135-4cba-44c6-8e00-3a46e97fc915.json, Ajustado com sucesso!',
 'events-6feecebf-d7a1-4cf1-a153-ffb120d88e8a.json, Ajustado com sucesso!',
 'events-845

## SPARK

In [25]:
spark = SparkSession.builder \
      .appName("pismo") \
      .getOrCreate()

In [26]:
df = spark.read.option("recursiveFileLookup","true").json("../data/tmp/")
df.printSchema()
df.show(10)
df.count()

                                                                                

root
 |-- data: struct (nullable = true)
 |    |-- addresses: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- neighborhood: string (nullable = true)
 |    |    |    |-- number: string (nullable = true)
 |    |    |    |-- postcode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- state_short: string (nullable = true)
 |    |    |    |-- street: string (nullable = true)
 |    |    |    |-- suffix: string (nullable = true)
 |    |-- amount: string (nullable = true)
 |    |-- company: struct (nullable = true)
 |    |    |-- addresses: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- city: string (nullable = true)
 |    |    |    |    |-- neighborhood: string (nullable = true)
 |    |    |    |    |-- number: string (nullable = true)
 |    |    |    |    |-- postcode: string (nullable = 

101821

In [27]:
windowSpec  = Window.partitionBy("event_id").orderBy(col("timestamp").desc())
df = df.withColumn("rw",row_number().over(windowSpec))
df = df.filter(df.rw==1)
df.count()

81609

In [28]:
df = df.withColumn("partition",date_format(df.timestamp,"yyyyMMdd"))\
    .drop("rw")
df.show()



+--------------------+-----------+--------------------+-------------+-------------------+---------+
|                data|     domain|            event_id|   event_type|          timestamp|partition|
+--------------------+-----------+--------------------+-------------+-------------------+---------+
|{null, 9887.81, n...|transaction|00049b25-b426-4e2...|     creation|2021-01-04T23:48:19| 20210104|
|{null, 7986.51, n...|transaction|00069a69-d72e-480...|     creation|2021-02-02T05:10:08| 20210202|
|{null, 7687.7, nu...|transaction|0006d7ed-37c8-4e6...|     creation|2021-02-21T11:24:43| 20210221|
|{null, null, null...|    account|0007cd0e-80f2-4cb...|status-change|2021-01-11T15:35:12| 20210111|
|{null, 6454.48, n...|transaction|000f8c08-042a-49d...|     creation|2021-02-27T09:33:10| 20210227|
|{null, 4899.99, n...|transaction|00157e0f-231f-42a...|     creation|2021-01-06T09:50:33| 20210106|
|{null, 3446.77, n...|transaction|0015cc39-a7fb-476...|     creation|2021-01-21T05:54:11| 20210121|


                                                                                

In [29]:
df = df.select("*","data.*").drop("data")
df.printSchema()

root
 |-- domain: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- partition: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- neighborhood: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- postcode: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- state_short: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- suffix: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- addresses: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- neighborhood: string (nullable = true)
 |    |    |    |-- number

In [30]:
df.show()



+-----------+--------------------+-------------+-------------------+---------+---------+-------+------------------+--------------------+--------------------+------+----------+--------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|     domain|            event_id|   event_type|          timestamp|partition|addresses| amount|           company|               debit|                from|    id|new_status|    old_status|              person|              reason|           recipient|                  to|             type|
+-----------+--------------------+-------------+-------------------+---------+---------+-------+------------------+--------------------+--------------------+------+----------+--------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|transaction|00049b25-b426-4e2...|     creation|2021-01-04T23:48:19| 20210104|     null|9887.81|              null|      

                                                                                

In [31]:
df.write.partitionBy("domain","partition").mode("overwrite").parquet("../data/output/")

                                                                                

## Revome tmp files

In [22]:
shutil.rmtree(directory_tmp)