In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1024m").\
        getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/11 19:39:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Clean Payments

In [3]:
payments = spark.read.parquet('payments')
payments.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------+--------+-------------------+--------+-------------+
|siteid|ticketno|     ticketdatetime|moppayed|      moptext|
+------+--------+-------------------+--------+-------------+
| 12962|   29646|2021-06-09 19:32:46|   61.42|CARTÃO DÉBITO|
| 12547|   10818|2021-06-09 08:11:11|   14.51|CARTÃO DÉBITO|
| 14402|   33186|2021-06-09 23:19:31|     8.8|CARTÃO DÉBITO|
| 16045|   59698|2021-06-09 15:37:57|    30.0|CARTÃO DÉBITO|
| 11478|   52332|2021-06-09 11:58:32|    24.8|CARTÃO DÉBITO|
| 12558|   68521|2021-06-09 13:05:07|     4.4|CARTÃO DÉBITO|
| 11907|   29361|2021-06-09 08:42:34|    42.1|    GALPFROTA|
| 15066|   62143|2021-06-09 05:16:49|     4.9|CARTÃO DÉBITO|
| 11166|    4831|2021-06-09 21:18:12|    73.0|CARTÃO DÉBITO|
| 12774|   72167|2021-06-09 08:10:54|    40.0|CARTÃO DÉBITO|
| 61129|      36|2021-06-09 11:49:50|    50.0|CARTÃO DÉBITO|
| 11377|   67639|2021-06-09 08:44:17|    15.0|CARTÃO DÉBITO|
| 12347|   17574|2021-06-09 12:02:26|    1.44|CARTÃO DÉBITO|
| 11774|   84828|2021-06

                                                                                

In [4]:
payments.printSchema()

root
 |-- siteid: string (nullable = true)
 |-- ticketno: long (nullable = true)
 |-- ticketdatetime: timestamp (nullable = true)
 |-- moppayed: double (nullable = true)
 |-- moptext: string (nullable = true)



In [5]:
payments.count()

                                                                                

9665370

In [22]:
payments.filter('siteid is NULL').show()



+------+--------+-------------------+--------+-------------+
|siteid|ticketno|     ticketdatetime|moppayed|      moptext|
+------+--------+-------------------+--------+-------------+
|  null|   66828|2021-07-02 06:50:26|    19.5|       CHEQUE|
|  null|   47293|2021-06-18 19:19:14|    30.0|    NUMERÁRIO|
|  null|     718|2022-02-20 07:20:56|    25.0|CARTAO DEBITO|
|  null|   62265|2021-07-20 14:56:04|     4.3|    NUMERÁRIO|
+------+--------+-------------------+--------+-------------+



                                                                                

In [23]:
payments.selectExpr("cast(siteid as int) siteid").show()

+------+
|siteid|
+------+
| 12962|
| 12547|
| 14402|
| 16045|
| 11478|
| 12558|
| 11907|
| 15066|
| 11166|
| 12774|
| 61129|
| 11377|
| 12347|
| 11774|
| 12513|
| 13500|
| 11774|
| 11718|
| 11682|
| 11752|
+------+
only showing top 20 rows



In [6]:
calc = (
    payments
        .groupBy('siteid', 'ticketdatetime')
        .agg(
            F.sum('moppayed').alias('moppayed'),
            F.collect_set('moptext').alias('moptext')
        )
)
calc.show()

[Stage 7:>                                                          (0 + 1) / 1]

+------+-------------------+--------+-----------+
|siteid|     ticketdatetime|moppayed|    moptext|
+------+-------------------+--------+-----------+
| 11015|2021-06-01 10:44:19|    30.0|[NUMERÁRIO]|
| 11015|2021-06-01 12:31:48|    45.0|[NUMERÁRIO]|
| 11015|2021-06-01 12:40:59|   87.71|[GALPFROTA]|
| 11015|2021-06-01 16:45:46|   67.97|[GALPFROTA]|
| 11015|2021-06-02 09:06:46|   57.76|[GALPFROTA]|
| 11015|2021-06-02 09:25:13|  110.09|[NUMERÁRIO]|
| 11015|2021-06-02 09:36:49|    15.0|[NUMERÁRIO]|
| 11015|2021-06-02 20:46:21|    10.0|[NUMERÁRIO]|
| 11015|2021-06-02 20:50:27|    15.0|[NUMERÁRIO]|
| 11015|2021-06-03 08:38:59|    30.0|[NUMERÁRIO]|
| 11015|2021-06-03 13:20:09|   16.89|[NUMERÁRIO]|
| 11015|2021-06-03 14:37:23|   47.66|[NUMERÁRIO]|
| 11015|2021-06-03 17:39:25|    57.6|[NUMERÁRIO]|
| 11015|2021-06-04 15:37:29|   28.93|[NUMERÁRIO]|
| 11015|2021-06-04 18:09:52|   63.29|[NUMERÁRIO]|
| 11015|2021-06-04 19:11:55|    20.0|[NUMERÁRIO]|
| 11015|2021-06-04 19:38:16|    10.0|[NUMERÁRIO]|


                                                                                

In [16]:
calc.select('moptext').distinct().collect()

                                                                                

[Row(moptext=['CHEQUE', 'NUMERÁRIO', 'CARTÃO DÉBITO']),
 Row(moptext=['VENDING TABACO']),
 Row(moptext=['PARCERIA UBER']),
 Row(moptext=['VALES', 'NUMERÁRIO']),
 Row(moptext=['CARTÃO CONTINENTE', 'NUMERÁRIO', 'CARTÃO DÉBITO']),
 Row(moptext=['NUMER┴RIO']),
 Row(moptext=['CHEQUE', 'NUMERÁRIO']),
 Row(moptext=['VALES', 'NUMERARIO']),
 Row(moptext=['CHEQUE']),
 Row(moptext=['NUMERARIO', 'CARTAO CONTINENTE']),
 Row(moptext=['CARTAO DEBITO', 'VALES']),
 Row(moptext=['GALPFROTA', 'DRIVE-OFF']),
 Row(moptext=['CARTÃO DÉBITO']),
 Row(moptext=['PARCERIA G']),
 Row(moptext=['PARCERIA B']),
 Row(moptext=['VISA - MANUAL', 'CARTÃO DÉBITO']),
 Row(moptext=['DIF-MOP-REC\\VND', 'TICKET CAR', 'CARTÃO DÉBITO']),
 Row(moptext=['GALPFROTA', 'PARCERIA G']),
 Row(moptext=['REGULARIZAÇÃO GALPFROTA']),
 Row(moptext=['GALPFROTA', 'DIF-MOP-REC\\VND', 'CARTÃO DÉBITO']),
 Row(moptext=['DIF-MOP-REC\\VND', 'NUMERARIO']),
 Row(moptext=['CARTÃO CONTINENTE', 'DIF-MOP-REC\\VND']),
 Row(moptext=['NUMER┴RIO', 'CART├O D╔B