In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Compras") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [16]:
# spark is an existing SparkSession
df = spark.read.format("csv") \
                .option("header", "true") \
                .option("delimiter", ";") \
                .load("compras.txt")


In [17]:
# Displays the content of the DataFrame to stdout
df.show()

+----------+--------+--------------+-----------------+------+----------+
|      Data|    Hora|        Cidade|          Produto| Valor|FormaPagto|
+----------+--------+--------------+-----------------+------+----------+
|2015-01-01|09:00:00|     Sao Paulo|Roupas masculinas|214.05|      Amex|
|2015-01-01|09:00:00|Rio de Janeiro| Roupas femininas|153.57|      Visa|
|2015-01-01|09:00:00|      Curitiba|           Musica| 66.08|  Dinheiro|
|2015-01-01|09:00:00|Belo Horizonte|              Pet|493.51|      Visa|
|2015-01-01|09:00:00|       Aracaju|  Roupas infantis|235.63|MasterCard|
|2015-01-01|09:00:00|      Salvador|Roupas masculinas|247.18|MasterCard|
|2015-01-01|09:00:00|      Campinas|          Cameras| 379.6|      Visa|
|2015-01-01|09:00:00|     Sao Paulo|      Eletronicos| 296.8|  Dinheiro|
|2015-01-01|09:00:00|      Londrina|       Brinquedos| 25.38|      Visa|
|2015-01-01|09:00:00|Rio de Janeiro|       Brinquedos|213.88|      Visa|
|2015-01-01|09:00:00|     Sao Paulo|      Video Gam

In [18]:
df.describe().show()

+-------+----------+--------+--------+-----------+------------------+----------+
|summary|      Data|    Hora|  Cidade|    Produto|             Valor|FormaPagto|
+-------+----------+--------+--------+-----------+------------------+----------+
|  count|   1000000| 1000000| 1000000|    1000000|           1000000|   1000000|
|   mean|      null|    null|    null|       null|250.15969356334065|      null|
| stddev|      null|    null|    null|       null| 144.2829122143926|      null|
|    min|2015-01-01|09:00:00|Alvorada| Artesanato|                 0|      Amex|
|    max|2015-03-30|17:59:00| Vitoria|Video Games|             99.99|      Visa|
+-------+----------+--------+--------+-----------+------------------+----------+



In [11]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- _c0: string (nullable = true)



In [20]:
df.show(10)

+----------+--------+--------------+-----------------+------+----------+
|      Data|    Hora|        Cidade|          Produto| Valor|FormaPagto|
+----------+--------+--------------+-----------------+------+----------+
|2015-01-01|09:00:00|     Sao Paulo|Roupas masculinas|214.05|      Amex|
|2015-01-01|09:00:00|Rio de Janeiro| Roupas femininas|153.57|      Visa|
|2015-01-01|09:00:00|      Curitiba|           Musica| 66.08|  Dinheiro|
|2015-01-01|09:00:00|Belo Horizonte|              Pet|493.51|      Visa|
|2015-01-01|09:00:00|       Aracaju|  Roupas infantis|235.63|MasterCard|
|2015-01-01|09:00:00|      Salvador|Roupas masculinas|247.18|MasterCard|
|2015-01-01|09:00:00|      Campinas|          Cameras| 379.6|      Visa|
|2015-01-01|09:00:00|     Sao Paulo|      Eletronicos| 296.8|  Dinheiro|
|2015-01-01|09:00:00|      Londrina|       Brinquedos| 25.38|      Visa|
|2015-01-01|09:00:00|Rio de Janeiro|       Brinquedos|213.88|      Visa|
+----------+--------+--------------+---------------

In [23]:
# Creates a temporary view using the DataFrame
df.createOrReplaceTempView("Compras")

In [29]:
# SQL can be run over DataFrames that have been registered as a table.
compras = spark.sql("SELECT Cidade FROM Compras")

In [30]:
compras.show()

+--------------+
|        Cidade|
+--------------+
|     Sao Paulo|
|Rio de Janeiro|
|      Curitiba|
|Belo Horizonte|
|       Aracaju|
|      Salvador|
|      Campinas|
|     Sao Paulo|
|      Londrina|
|Rio de Janeiro|
|     Sao Paulo|
|     Sao Paulo|
|      Campinas|
|      Ourinhos|
|Rio de Janeiro|
|      Brasilia|
|  Porto Alegre|
|     Sao Paulo|
|       Maringa|
| Florianopolis|
+--------------+
only showing top 20 rows



In [38]:
 comprasAmex = spark.sql("SELECT * FROM Compras WHERE FormaPagto='Visa'")

In [39]:
comprasAmex.show(10)

+----------+--------+--------------+----------------+------+----------+
|      Data|    Hora|        Cidade|         Produto| Valor|FormaPagto|
+----------+--------+--------------+----------------+------+----------+
|2015-01-01|09:00:00|Rio de Janeiro|Roupas femininas|153.57|      Visa|
|2015-01-01|09:00:00|Belo Horizonte|             Pet|493.51|      Visa|
|2015-01-01|09:00:00|      Campinas|         Cameras| 379.6|      Visa|
|2015-01-01|09:00:00|      Londrina|      Brinquedos| 25.38|      Visa|
|2015-01-01|09:00:00|Rio de Janeiro|      Brinquedos|213.88|      Visa|
|2015-01-01|09:00:00|     Sao Paulo|     Video Games| 53.26|      Visa|
|2015-01-01|09:00:00|Rio de Janeiro|          Musica|260.65|      Visa|
|2015-01-01|09:00:00|      Brasilia|          Jardim| 136.9|      Visa|
|2015-01-01|09:00:00|  Porto Alegre|Roupas femininas|483.82|      Visa|
|2015-01-01|09:00:00| Florianopolis| Roupas infantis|309.16|      Visa|
+----------+--------+--------------+----------------+------+----

In [46]:
 comprasAgrupadasPorCidadeEProduto = spark.sql("SELECT Cidade, Produto, SUM(Valor) AS Total FROM Compras GROUP BY Cidade, Produto ORDER BY Cidade, Total DESC")

In [48]:
comprasPorCidadeEProduto.show(100)

+--------------+-----------------+------------------+
|        Cidade|          Produto|             Total|
+--------------+-----------------+------------------+
|      Alvorada|  Roupas infantis|         272187.44|
|      Alvorada|           Musica|         142340.19|
|      Alvorada|              Pet|142288.91999999998|
|      Alvorada|      Eletronicos|141403.83999999997|
|      Alvorada|              CDs|141069.50999999998|
|      Alvorada|       Brinquedos|140814.51000000004|
|      Alvorada|             DVDs|140263.55000000002|
|      Alvorada|       Artesanato|139969.58000000002|
|      Alvorada| Roupas femininas|138878.38999999998|
|      Alvorada|   Saude e beleza|         136908.63|
|      Alvorada|          Cameras|134084.87000000002|
|      Alvorada|Roupas masculinas|133671.43000000002|
|      Alvorada|     Computadores|133586.27000000002|
|      Alvorada|      Video Games|132411.46000000002|
|      Alvorada|           Livros|         130861.86|
|      Alvorada|         Esp

In [59]:
# Creates a temporary view using the DataFrame
comprasAgrupadasPorCidadeEProduto.createOrReplaceTempView("ComprasAgrupadas")

In [88]:
produtosMaisVendidosPorCidade = spark.sql("SELECT Cidade, Produto, Total FROM ComprasAgrupadas T1 WHERE EXISTS (SELECT 1 AS Total FROM ComprasAgrupadas T2 WHERE T2.Cidade = T1.Cidade GROUP BY Cidade HAVING MAX(T2.Total) = T1.Total)")

In [90]:
produtosMaisVendidosPorCidade.show(100)

+-------------------+-----------------+------------------+
|             Cidade|          Produto|             Total|
+-------------------+-----------------+------------------+
|           Alvorada|  Roupas infantis|         272187.44|
|          Americana|  Roupas infantis| 272371.0099999999|
|     Angra dos Reis|  Roupas infantis|         286476.88|
|            Aracaju|  Roupas infantis|279384.14999999997|
|          Aracatuba|  Roupas infantis|256492.71000000005|
|        Barra Mansa|  Roupas infantis| 276998.1899999999|
|            Barueri|  Roupas infantis|         272080.05|
|              Bauru|  Roupas infantis|270793.58999999997|
|              Belem|  Roupas infantis|270546.40000000014|
|     Belo Horizonte|  Roupas infantis|268459.77999999997|
|          Boa Vista|  Roupas infantis|268096.23000000004|
|           Brasilia|  Roupas infantis|261432.73000000004|
|          Cabo Frio|  Roupas infantis| 537187.2599999999|
|           Campinas|  Roupas infantis| 563500.269999999

In [91]:
produtosMaisVendidosPorCidade.coalesce(1).write.option("header", "true").csv("produto_mais_vendido_por_cidade_df")