In [43]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
df = spark.read.csv("customers.csv", header=True, inferSchema=True)

In [44]:
df.show(5)

+----------+--------+--------+-------+--------+-----+
|      date|    time|customer|product|quantity|price|
+----------+--------+--------+-------+--------+-----+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|
|04/08/2018|11:38 PM|     100|      2|       8|   79|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|
+----------+--------+--------+-------+--------+-----+
only showing top 5 rows


In [45]:
df.count()

1002

In [46]:
df.select("customer", "time").show(5)

+--------+--------+
|customer|    time|
+--------+--------+
|     100| 2:20 PM|
|     100| 3:30 PM|
|     100| 5:20 PM|
|     100|11:38 PM|
|     100| 3:52 AM|
+--------+--------+
only showing top 5 rows


In [47]:
df.selectExpr("customer as customer_id").show(5)

+-----------+
|customer_id|
+-----------+
|        100|
|        100|
|        100|
|        100|
|        100|
+-----------+
only showing top 5 rows


In [48]:
df.selectExpr("*", "(quantity = price) as equal_price").show(5)

+----------+--------+--------+-------+--------+-----+-----------+
|      date|    time|customer|product|quantity|price|equal_price|
+----------+--------+--------+-------+--------+-----+-----------+
|05/10/2018| 2:20 PM|     100|      1|      10|  816|      false|
|06/10/2018| 3:30 PM|     100|      1|      10|    1|      false|
|07/10/2018| 5:20 PM|     100|      1|      10|   10|       true|
|04/08/2018|11:38 PM|     100|      2|       8|   79|      false|
|25/03/2018| 3:52 AM|     100|      3|       1|   91|      false|
+----------+--------+--------+-------+--------+-----+-----------+
only showing top 5 rows


In [49]:
df.selectExpr("avg(price)","count(customer)").show(5)

+----------------+---------------+
|      avg(price)|count(customer)|
+----------------+---------------+
|50.9500998003992|           1002|
+----------------+---------------+



In [50]:
df.where("quantity < 8").where("customer != 100").show(5)

+----------+-------+--------+-------+--------+-----+
|      date|   time|customer|product|quantity|price|
+----------+-------+--------+-------+--------+-----+
|13/04/2018|2:24 AM|     101|      6|       6|   73|
|21/12/2017|3:09 AM|     101|      7|       6|    8|
|08/08/2018|9:16 AM|     101|      9|       0|    4|
|15/09/2018|7:33 AM|     101|     10|       5|   35|
|07/01/2018|7:58 PM|     101|      1|       6|   21|
+----------+-------+--------+-------+--------+-----+
only showing top 5 rows


In [51]:
df = df.dropna()
df.dtypes

[('date', 'string'),
 ('time', 'string'),
 ('customer', 'int'),
 ('product', 'int'),
 ('quantity', 'int'),
 ('price', 'int')]

In [52]:
from pyspark.sql.functions import sum, max

df.groupBy("customer").agg(sum("quantity")).show(5)

+--------+-------------+
|customer|sum(quantity)|
+--------+-------------+
|     108|          129|
|     101|          196|
|     115|          143|
|     126|          137|
|     103|          137|
+--------+-------------+
only showing top 5 rows


In [53]:
df.where("date = '05/10/2018'").groupBy("customer").agg(sum("price")).orderBy("sum(price)", ascending=False).show(5)
# Agafa els 5 clients amb més preu total el dia 05/10/2018

+--------+----------+
|customer|sum(price)|
+--------+----------+
|     100|       816|
|     103|        69|
+--------+----------+



In [54]:
df.select("customer").distinct().count()

31

In [55]:
# How many elements?
df.count()

1002

In [56]:
# How many DISTINCT customers?
df.select("customer").distinct().count()

31

In [57]:
# need to aggregate values per customer
df.groupBy("customer").agg(sum("price")).orderBy("sum(price)", ascending=False).show(5)

+--------+----------+
|customer|sum(price)|
+--------+----------+
|     100|      2509|
|     119|      1996|
|     121|      1896|
|     123|      1825|
|     111|      1822|
+--------+----------+
only showing top 5 rows


In [58]:
# How many products per customer?
df.groupBy("customer").agg(sum("quantity")).orderBy("sum(quantity)", ascending=False).show(5)

+--------+-------------+
|customer|sum(quantity)|
+--------+-------------+
|     101|          196|
|     122|          179|
|     117|          176|
|     100|          172|
|     124|          165|
+--------+-------------+
only showing top 5 rows


In [60]:
# how many times customer id number 100 has purchased more than 5 items?
from pyspark.sql.functions import col, max, sum

df.select("customer", "quantity").filter((col("customer") == 100) & (col("quantity") > 5)).count()

16

In [62]:
df.groupBy("customer").agg(max("quantity")).orderBy("max(quantity)", ascending=False).show(5)

+--------+-------------+
|customer|max(quantity)|
+--------+-------------+
|     100|           10|
|     108|            9|
|     101|            9|
|     115|            9|
|     126|            9|
+--------+-------------+
only showing top 5 rows


DataFrame[date: string, time: string, customer: int, product: int, quantity: int, price: int]

In [65]:
df.select("customer", "product").filter(col("customer") == 100).distinct().orderBy("product").show()

+--------+-------+
|customer|product|
+--------+-------+
|     100|      1|
|     100|      2|
|     100|      3|
|     100|      4|
|     100|      5|
|     100|      6|
|     100|      7|
|     100|      8|
|     100|      9|
|     100|     10|
+--------+-------+



In [83]:
# which is the product with highest price?
df.selectExpr("product", "price", "quantity").filter(col("quantity") == 1).select("product", "price").distinct().orderBy("price", ascending=False).show(1)

+-------+-----+
|product|price|
+-------+-----+
|      5|   98|
+-------+-----+
only showing top 1 row


In [85]:
df = spark.read.csv("worldcities.csv", header=True, inferSchema=True)

df.show(10)

+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|       city| city_ascii|     lat|     lng|     country|iso2|iso3|      admin_name|capital|population|        id|
+-----------+-----------+--------+--------+------------+----+----+----------------+-------+----------+----------+
|      Tokyo|      Tokyo|  35.687|139.7495|       Japan|  JP| JPN|           Tōkyō|primary|  3.7785E7|1392685764|
|    Jakarta|    Jakarta|  -6.175|106.8275|   Indonesia|  ID| IDN|         Jakarta|primary|  3.3756E7|1360771077|
|      Delhi|      Delhi|   28.61|   77.23|       India|  IN| IND|           Delhi|  admin|  3.2226E7|1356872604|
|  Guangzhou|  Guangzhou|   23.13|  113.26|       China|  CN| CHN|       Guangdong|  admin|   2.694E7|1156237133|
|     Mumbai|     Mumbai| 19.0761| 72.8775|       India|  IN| IND|     Mahārāshtra|  admin|  2.4973E7|1356226629|
|     Manila|     Manila| 14.5958|120.9772| Philippines|  PH| PHL|          Manila|prima

In [87]:
# country population
df.select("country", "population").groupBy("country").agg(sum("population")).orderBy("sum(population)", ascending=False).show(10)

+-------------+---------------+
|      country|sum(population)|
+-------------+---------------+
|        China|  1.360845815E9|
|        India|   5.14693978E8|
|United States|   3.79426737E8|
|       Brazil|   1.91184513E8|
|        Japan|   1.84894147E8|
|  Philippines|   1.37832184E8|
|       Mexico|   1.17363046E8|
|       Turkey|   1.07562012E8|
|    Indonesia|   1.05083586E8|
|       Russia|   1.01867797E8|
+-------------+---------------+
only showing top 10 rows


In [90]:
df.groupBy("country").count().orderBy("count", ascending=False).show()

+--------------+-----+
|       country|count|
+--------------+-----+
|         India| 7108|
| United States| 5344|
|        Brazil| 2961|
|       Germany| 1759|
|         China| 1732|
|   Philippines| 1584|
|United Kingdom| 1365|
|         Italy| 1357|
|         Japan| 1344|
|        France| 1160|
|        Russia| 1059|
|        Mexico| 1014|
|    Madagascar|  832|
|         Spain|  783|
|      Colombia|  732|
|        Turkey|  711|
|       Morocco|  584|
|       Algeria|  553|
|     Argentina|  502|
|          Iran|  501|
+--------------+-----+
only showing top 20 rows


In [94]:
df.filter(col("country") == "Spain").select("city", "population").orderBy("population", ascending=True).show(10)

+-------------------+----------+
|               city|population|
+-------------------+----------+
|Caldas de Malavella|    8509.0|
|               Tona|    8511.0|
|           Vidreras|    8538.0|
|          Colindres|    8540.0|
|            Polinyá|    8555.0|
|            Reinosa|    8566.0|
|San Juan de Palamós|    8603.0|
|           Monachil|    8608.0|
|           Trujillo|    8619.0|
|         Santiponce|    8625.0|
+-------------------+----------+
only showing top 10 rows
