In [1]:
#utilizei um notebook databricks para aproveitar o SparkSQL

from pyspark.sql import SQLContext
import datetime
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [2]:
# File location and type
file_location = "/FileStore/tables/cliente_pedidos.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [3]:
temp_table_name = "tmp_pedidos"

df.createOrReplaceTempView(temp_table_name)

In [4]:
df_tmp = spark.sql('''select distinct codigo_pedido,
                                      codigo_cliente,
                                      to_date(from_unixtime(UNIX_TIMESTAMP(data_nascimento_cliente,'dd/MM/yyyy'))) as data_nascimento_cliente,
                                      FLOOR(DATEDIFF(TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP())), to_date(from_unixtime(UNIX_TIMESTAMP(data_nascimento_cliente,'dd/MM/yyyy')))) / 
                                      365.25) as idade_cliente,
                                      cast(data_pedido as int) as data_pedido
                      from tmp_pedidos
                  ''')

In [5]:
df_converted = df_tmp.withColumn('data_pedido', f.to_date(df_tmp.data_pedido.cast(dataType=t.TimestampType()))).createOrReplaceTempView("pedidos")

In [6]:
df_agg = spark.sql('''with agg_ped as
                           (select distinct codigo_cliente,
                                   data_pedido,
                                   count(distinct codigo_pedido) numero_pedidos
                            from pedidos
                            group by 1,2
                           )
                       select distinct ped.codigo_cliente,
                                       agg.numero_pedidos,
                                       collect_list(array(ped.codigo_pedido,ped.data_pedido)) as lista_pedido,
                                       ped.idade_cliente
                       from pedidos ped
                       inner join
                       agg_ped agg
                        on ped.codigo_cliente = agg.codigo_cliente
                       where agg.numero_pedidos > 2
                       and cast(ped.idade_cliente as int) < 30
                       and (ped.data_pedido = '2018-11-23' or ped.data_pedido = '2017-11-24' or ped.data_pedido = '2016-11-25')
                       group by 1,2,4
                      order by 1
                     ''')
display(df_agg)

codigo_cliente,numero_pedidos,lista_pedido,idade_cliente
047f50ac99e51749f55bc20e94b5a0e0,3,"List(List(771c2494418c023e6ab3d2db396428b9, 2017-11-24), List(89419f4a8b4d9938a1ec0137b0770155, 2017-11-24), List(c7a1196759203ea33fa72c4a9c606802, 2017-11-24))",20
07dcdb642a376c139e2f9a7223d540f6,3,"List(List(1bca7bd3579e16c29c84ec9db5002186, 2018-11-23), List(b3af82eb4fea94b5b21f81fec980fb6d, 2018-11-23), List(658d071227033aa72bba2c84084b2070, 2018-11-23))",20
07fae18495712af3f85dada65f0f2d9e,3,"List(List(5d54c8933df514979bec30d2129c1821, 2018-11-23), List(3055be1a9f75a26d770ce395d3f07667, 2018-11-23), List(6bde02cb646452fdd1a2f586c12dc3a5, 2018-11-23))",26
0b8745d06087e912b5c826498301204f,3,"List(List(b8273f00b1c06a54f98e5e66df23f929, 2018-11-23), List(4d125dca29733bc9f0bc18da51e58c67, 2018-11-23), List(3e988cb96ccd69c512f4eec11476fc33, 2018-11-23))",27
0b9ccdcb03567e2d95551fe4ec3e544c,3,"List(List(3ed1fe357ffb34aeebdd497cc1ac1738, 2017-11-24), List(9421ec916524369aefe2b5971a502033, 2017-11-24), List(e8a31afd2f366f51f17fba0b7fd77a8f, 2017-11-24))",27
0cf936eed633e9892d6bfc4147835dcd,3,"List(List(9ed6e67d3cfd6ab5b58c7a43e61a8ec0, 2018-11-23), List(fabf84630a8a59360fb0564783c5f14b, 2018-11-23), List(a1180b9a3d0f990dd775d8bd48a95fc6, 2018-11-23))",19
0ec8b1026c8aad8730656da01deaca33,3,"List(List(ab2799f3a6e5dd5e5791a283c0441a8e, 2018-11-23), List(46eef06740b2fa61a404a1468c2d088a, 2018-11-23), List(b53a4e40d0ddd8213e24998c280c8e69, 2018-11-23))",24
0ff625a0b9625af87811e82fefcb586b,3,"List(List(03e5eded6658db6bb7488335118afb4d, 2018-11-23), List(fedb324816468e2eb3d32d3ebbda7f78, 2018-11-23), List(cef49b655518db5b22ab4636c9ec6ed3, 2018-11-23))",22
10d8cbef26dcd6deabdb339431981473,3,"List(List(4df7ce929ad563414fdcccb9c25a7438, 2018-11-23), List(5837a384fe5acc9dac1e63acb945e597, 2018-11-23), List(01ae53b5a9b8fda287c8bd4fa0214179, 2018-11-23))",17
150bec13aba5b01dc0c0483e456ec5c0,3,"List(List(020b9a5fdc5fa319ea7c9026f7ecec5b, 2018-11-23), List(6a276aa169a49665fc8febdc42c83fcf, 2018-11-23), List(b07a6e1563314db20c981935da761f31, 2018-11-23))",28
