# Data Treatment

In this notebook we download the data from S3 and do some necessary data treatment to answer the proposed questions.

We save a copy of the treated data as a `parquet` file in the `../files` folder.

## Requirements

Below we install all the extra libraries.

In [1]:
!pip install python-dotenv



## Initialization & Loading

We import all the necessary libraries. Notice that we are using the `python-dotenv` library installed above to import the `.env` file located in the parent folder as an environment variable.

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

import unicodedata
from dotenv import load_dotenv
from pathlib import Path
import os

env_path = str(Path().resolve().parent / ".env")

load_dotenv(dotenv_path=env_path);

Next we connect to the Spark master container. Notice that we also load the packages needed to download data from Amazon S3.

In [3]:
# per docs https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Anonymous_Login_with_AnonymousAWSCredentialsProvider

spark = SparkSession.builder.appName('chatbot').master('spark://spark:7077') \
    .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.11.563') \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider') \
    .getOrCreate();

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b5e21858-ab93-42e9-80bf-4092d55882c0;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 108ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b5e21858-ab93-42e9-80bf-4092d55882c0
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/3ms)
22/09/23 21:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your pla

Now, we read the data itself from Amazon S3.

In [4]:
df = spark.read.option("sep", ";").option('header', 'true').csv(os.getenv('S3_FILE_PATH'))

22/09/23 21:41:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

A sanity check to see if the data was loaded correctly:

In [5]:
df.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+------------+--------------------+--------------------+--------------------+-----------+-----------------+-----+------+
|   timestamp|           messageId|      conversationId|              userId|messageText|          channel|botId|source|
+------------+--------------------+--------------------+--------------------+-----------+-----------------+-----+------+
|1614567604.0|210c2f5f-f7b2-406...|40b54398-77db-481...|bf6a4d079b51b73fd...|    começar|         whatsapp| 1567|  user|
|1614567608.0|dd331cbb-4e0c-4f1...|b68806b9-7e79-462...|b0e978871c376ef13...|        olá|         telegram| 1567|  user|
|1614567643.0|bb0c5d93-ffc2-400...|8061e54e-6a39-405...|73c659979af4ae8df...|    começar|         telegram| 1567|  user|
|1614567704.0|4dc4a6b7-535e-4c2...|23ba09a6-b744-466...|a0ab3c262eda9b9a1...|        Olá|         telegram| 1567|  user|
|1614567740.0|643f087f-6a79-4fb...|1389b06a-1bdc-465...|ea0ebd01ce3123bf4...|        ola|facebook messeger| 1567|  user|
|1614567794.0|80594787-d08f-498.

                                                                                

Another sanity check: lets see if we have over 50000 rows as claimed by the documentation that was given to us.

In [6]:
print(f'Rows: {df.count()}')



Rows: 516642


                                                                                

It seems that the data was loaded correctly. Let's save the file just to keep it safe.

In [7]:
df.coalesce(1).write.parquet("/app/files/raw_data", mode='overwrite')

                                                                                

## Casting

Since we loaded data from a `CSV` file, it is a best-practice to cast the data in their correct type.

First, let's see the current schema:

In [8]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- messageId: string (nullable = true)
 |-- conversationId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- messageText: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- botId: string (nullable = true)
 |-- source: string (nullable = true)



Per the provided documentation (and as we can see above), the `timestamp` column is in unix epoch. We will cast that column as a 'normal' timestamp and, to help our analysis, we are also going to create a `date` column based on that timestamp.

One other column that we can cast to a different type is `botId`. As seen in the data snippet shown above, the data in this column can be casted as `integer`. 

In [9]:
df = df.withColumn('botId', F.col('botId').cast(T.IntegerType())) \
    .withColumn('timestamp', F.from_unixtime('timestamp').cast(T.TimestampType())) \
    .withColumn('date', F.to_date('timestamp'))

In [10]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- messageId: string (nullable = true)
 |-- conversationId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- messageText: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- botId: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- date: date (nullable = true)



With the data casted in their appropriated types, we will register this table in SparkSQL as `chatbot`.

In [11]:
df.createOrReplaceTempView('chatbot')

## Profiling

Before start cleaning the data, let's do some data profiling to check that some of our assumptions about the data are correct.

First, let's see what are the messages sent from the `bot` source (e.g. what are the questions the Chatbot asks).

In [12]:
bot_text = spark.sql("SELECT DISTINCT messageText AS botText FROM chatbot WHERE source = 'bot'")
bot_text.createOrReplaceTempView('bot')
bot_text.show(truncate=False)



+------------------------------------------------------------------+
|botText                                                           |
+------------------------------------------------------------------+
|Quais são seus pokemons favoritos?                                |
|Qual seu nome?                                                    |
|Qual a sua cidade?                                                |
|Qual a sua idade?                                                 |
|Olá, eu sou o robô de cadastro de pokemon favorito, vamos começar?|
+------------------------------------------------------------------+



                                                                                

This information will greatly help our data cleaning.

Now, let's see how many unique users we have:

In [13]:
spark.sql("SELECT COUNT(DISTINCT userId) FROM chatbot").show()



+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                 50000|
+----------------------+



                                                                                

We have exactly 50000 users! Let's see if we have any user that messaged the Chatbot more than once:

In [14]:
spark.sql("SELECT userID, COUNT(DISTINCT conversationId) AS chats FROM chatbot GROUP BY 1 HAVING chats > 1").show()



+------+-----+
|userID|chats|
+------+-----+
+------+-----+



                                                                                

So, each user messaged the Chatbot only once! This means that our final dataset should also have 50000 users.

Finally, let's see if we have users that messaged the Chatbot around midnight - meaning that we may have two different dates for each conversation!

In [15]:
spark.sql('SELECT conversationId, COUNT(DISTINCT date) AS days_chatted FROM chatbot GROUP BY 1 HAVING days_chatted > 1').show(truncate=False)



+------------------------------------+------------+
|conversationId                      |days_chatted|
+------------------------------------+------------+
|eb92107e-c316-4b36-89d0-53e81766eeca|2           |
|f3617919-e96c-47cc-923b-f73262802698|2           |
|737feb49-2823-4fec-b9e4-c903462ade72|2           |
|6e5addb0-2ad0-4899-90ba-645357e96cec|2           |
|2134ef44-9857-41f7-8915-5ed1c26e1266|2           |
|942af949-7871-45d8-a6ed-782e35d3cb2b|2           |
|d02a563a-f503-40cf-947d-86e7fca73994|2           |
|7c67e8b1-e731-4b02-b45e-cf096a36f0f1|2           |
|ec8c5139-1e2f-4e3d-b840-feef51e878d6|2           |
|298b1be2-474a-47e3-8f88-999748e3d5d5|2           |
|8b3dbc95-a826-41f4-8baf-c8383d628b20|2           |
|3c517060-1c1c-4127-a2bd-018bca4d5b5f|2           |
|29dae2d6-e94d-478e-b1c3-7478d9c3e022|2           |
|65736689-8e68-4a22-afcf-5d99baad8e06|2           |
|287e2811-30a5-4bc0-8623-7c2b01e57c86|2           |
|6e785891-4ba4-429f-a7ef-9c0df5121594|2           |
|66c7afd9-b7

                                                                                

Yes, we do. In any given metric, we are going to use the date the user **first interacted with the Chatbot** as reference (in other words, `min(date)`.)

## Cleaning

Now we are going to do the heavy lifting. Given the Chatbot questions, our goal by the end of this notebook is to have a dataset where each row has all the data provided by the user in a conversation with the Chatbot:

|  **date**  | **userId** | **conversationId** | **channel** |      **favourite_pokemon**      | **age** |   **city**   |  **botId**  |
|:----------:|:----------:|:------------------:|:-----------:|:-------------------------------:|:-------:|:------------:|:-----------:|
| 2022-01-01 |      A     |          1         |     sms     | [bulbasaur,charmander,squirtle] |    31   |   sao paulo  |     100     |
| 2022-01-01 |      B     |          2         |   whatsapp  |           [pikachu]             |    25   | porto alegre |     101     |
|    ...     |     ...    |         ...        |     ...     |             ...                 |   ...   |     ...      |     ...     |

Notice that we are going to have **only one row** per user and conversation (since those two fields together are a unique identifier). Also, note that the `favourite_pokemon` column is a **array**. This will be useful for future analysis.

Since the `userId`, `conversationId`, `channel` and `botId` are unique across each user interaction and the `date` can be easily calculated (using the earlier definition), we only need to extract the `favourite_pokemon` list and the `age` and `city` of each user. We will do that next by using the Chatbot questions as reference.

### Feature Engineering

To help us to obtain the desired dimensions (`favourite_pokemon`, `age` and `city`), we are going to do some feature engineering first.

We are going to build a table that is going to contain, in one column, **(almost) every question the bot asked** and, in another one, **only the users answers** for said question. We will save that table as `chatbot_treated`.

In [16]:
# we build the "sourceLag" column to avoid having a two rows containing the same information
# whenever the user sent more than one message to answer the Pokémon question. 
# For example, imagine that an user answered that their favourite Pokémon was pikachu, charmander and mew in three different messages.
# Our dataset would look like this:
#
#      messageText      | messageTextAnswer | source | sourceLag
# what is your pokémon? |     pikachu       |   bot  |   user 
#      pikachu          |      null         |  user  |   bot     <- row being excluded since this information was already given in the previous row 
#     charmander        |      null         |  user  |   user
#        mew            |      null         |  user  |   user
#
# Having these two lines with 'pikachu' would affect our data aggregation later.

df = spark.sql(
    """
    WITH prep AS (
    SELECT 
        date,
        timestamp,
        userId, 
        conversationId,
        messageText,
        CASE
            WHEN source='bot' THEN LEAD(messageText) OVER (PARTITION BY userId ORDER BY timestamp ASC)
            ELSE NULL
        END AS messageTextAnswer,
        channel,
        botId,
        source,
        LAG(source) OVER (PARTITION BY userId ORDER BY timestamp ASC) AS sourceLag 
    FROM chatbot 
    )
    
    SELECT 
        date,
        timestamp,
        userId,
        conversationId,
        messageText,
        messageTextAnswer,
        channel,
        botId,
        source    
    FROM prep
    WHERE (messageTextAnswer IS NOT NULL OR sourceLag='user') AND messageTextAnswer NOT IN (SELECT botText FROM bot)
    """)
df.createOrReplaceTempView('chatbot_treated')
df.show(30)

                                                                                

+----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----+------+
|      date|          timestamp|              userId|      conversationId|         messageText|   messageTextAnswer|          channel|botId|source|
+----------+-------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----+------+
|2021-03-12|2021-03-12 07:02:52|0001a55b006e0bada...|e40467df-6f1f-4c4...|   Qual a sua idade?|                  38|              sms| 1567|   bot|
|2021-03-12|2021-03-12 09:43:15|0001a55b006e0bada...|e40467df-6f1f-4c4...|  Qual a sua cidade?|        campo grande|              sms| 1567|   bot|
|2021-03-12|2021-03-12 11:17:30|0001a55b006e0bada...|e40467df-6f1f-4c4...|Quais são seus po...|fearow florges su...|              sms| 1567|   bot|
|2021-03-12|2021-03-12 13:55:51|0001a55b006e0bada...|e40467df-6f1f-4c4...|      Qual seu nome?|              hel

                                                                                

Note that we don't have only bot questions in the `messageText` column because someone can give multiple answers to the Pokémon question (as said in the documentation) before the Chatbot replied. This is going to be fixed next.

### Pokémon Treatment

We will fix the problem mentioned above by creating a separated `pokemon` dataset:

In [17]:
pokemon = spark.sql("""
    WITH prep AS (
    SELECT 
        userId,
        conversationId,
        CASE 
            WHEN messageTextAnswer IS NOT NULL THEN messageTextAnswer
            ELSE messageText
        END AS pokemon
    FROM
        chatbot_treated
    WHERE 
        messageText= 'Quais são seus pokemons favoritos?' OR source='user'
    )
    SELECT * FROM prep
""")

pokemon.createOrReplaceTempView('pokemon')
pokemon.show(30, truncate=False)

                                                                                

+--------------------------------+------------------------------------+------------------------------------+
|userId                          |conversationId                      |pokemon                             |
+--------------------------------+------------------------------------+------------------------------------+
|0001a55b006e0badacdd32412b1333bf|e40467df-6f1f-4c41-9a05-0a371218b568|fearow florges sudowoodo            |
|00037981db6fa4f247d261420252db49|31fea68a-a79d-4457-b544-a3bcf217b288|ninjask                             |
|0004013912834f551f715d07e75548ce|b5d4e68c-177d-4fe7-b5de-524fd441ec18|carnivine hippopotas                |
|00052c60aafed1bee9aab35a7fe114c1|a4387219-be30-4126-bc93-65fa86c8a49b|electrode                           |
|0007a057c85be431b676ed7f7de1ea97|7957d94d-1639-4510-a0b7-7caab7a05671|barboach, minun, necrozma-ultra     |
|0007ebdf52111b7f142966d4365206ed|de116548-c058-48ce-9e4e-9db67cc9cc42|minior-blue                         |
|000b29832b77f18153

As you can see above, we have a `pokemon` column with Pokémons for each user and conversation. Note that we have rows with the same `userId` and `conversationId`; this happens because the `pokemon` column is just the **raw** data from the conversation (meaning that the user sent 2 or more separate messages, each one with only one Pokémon).

In the next step we are going to fix this (by making each combination of `userId` and `conversationId` unique across each row). To do that, we leverage [Spark UDFs](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html).

In [18]:
@F.udf(returnType=T.ArrayType(elementType=T.StringType()))
def pokemon_treatment(pokemon):
    if ',' in pokemon:
        return pokemon.replace(' ','').split(',')
    else:
        return pokemon.replace(' ',',').split(',')

In [19]:
pokemon_treated = pokemon.select('userId','conversationId', 'pokemon', pokemon_treatment('pokemon').alias('pokemon_treated'))
pokemon_treated = pokemon_treated.groupBy('userId','conversationId').agg(F.collect_list('pokemon_treated').alias('pokemon_list'))
pokemon_treated = pokemon_treated.select('userId', F.flatten('pokemon_list').alias('pokemon_list'))
pokemon_treated.createOrReplaceTempView('pokemon_treated')
pokemon_treated.show(30, truncate=False)



+--------------------------------+-----------------------------------------------+
|userId                          |pokemon_list                                   |
+--------------------------------+-----------------------------------------------+
|0001a55b006e0badacdd32412b1333bf|[fearow, florges, sudowoodo]                   |
|0002a74240acbcc71caa2ec9dc9ede30|[raikou, miltank, silvally]                    |
|00037981db6fa4f247d261420252db49|[ninjask]                                      |
|00052c60aafed1bee9aab35a7fe114c1|[electrode]                                    |
|0007a057c85be431b676ed7f7de1ea97|[barboach, minun, necrozma-ultra]              |
|0008aa3c55fe166fb0173c9a8ebf025a|[pidgeotto]                                    |
|0008b5872bde10f0a64fddc114852262|[cacnea, cofagrigus]                           |
|000b1797c1571ff8693064ea2bbdec48|[stunfisk-galar, gothitelle, lombre]           |
|000b29832b77f18153e9772ff686286b|[keldeo-resolute]                              |
|000

                                                                                

Now we have the `pokemon` column that we wanted: each row has all the Pokémons that an user answered in some conversation.

As we can see below, the `pokemon_list` column is indeed a `array`:

In [20]:
pokemon_treated.printSchema()

root
 |-- userId: string (nullable = true)
 |-- pokemon_list: array (nullable = false)
 |    |-- element: string (containsNull = true)



Next, we are doing the same for the other two things that the Chatbot asked the users about: `age` and `city`.

## Age

Let's select our users ages by using the corresponding Chatbot question as reference:

In [21]:
age = spark.sql("""
SELECT
    userId,
    conversationId,
    CAST(messageTextAnswer AS integer) as age
FROM chatbot_treated
WHERE messageText='Qual a sua idade?'
GROUP BY 1,2,3
""")
age.createOrReplaceTempView('age')

In [22]:
spark.sql("SELECT age, COUNT(*) AS count FROM age GROUP BY 1 ORDER BY 1").show(100)

                                                                                

+----+-----+
| age|count|
+----+-----+
|null|    3|
|  18| 1291|
|  19| 1245|
|  20| 1247|
|  21| 1343|
|  22| 1354|
|  23| 1341|
|  24| 1297|
|  25| 1354|
|  26| 1351|
|  27| 1321|
|  28| 1296|
|  29| 1331|
|  30| 1374|
|  31| 1353|
|  32| 1330|
|  33| 1265|
|  34| 1280|
|  35| 1288|
|  36| 1271|
|  37| 1356|
|  38| 1329|
|  39| 1321|
|  40| 1264|
|  41| 1380|
|  42| 1314|
|  43| 1351|
|  44| 1324|
|  45| 1342|
|  46| 1242|
|  47| 1283|
|  48| 1266|
|  49| 1329|
|  50| 1398|
|  51| 1288|
|  52| 1241|
|  53| 1374|
|  54| 1338|
|  55| 1320|
+----+-----+



We see that we have 3 'false positives' in our `age` dataframe. Since this represents 3/50000 ~ 0.006% of our data - thus irrelevant for any business metric - we will filter them out from our final dataset.

The `users_to_filter` dataset will contain all `userId` that should not be in the final dataset.

In [23]:
users_to_filter = spark.sql("""
SELECT
    userId
FROM age
WHERE age IS NULL
""")
users_to_filter.createOrReplaceTempView('users_to_filter')

In [24]:
spark.sql("""SELECT * FROM users_to_filter""").show(truncate=False)

                                                                                

+--------------------------------+
|userId                          |
+--------------------------------+
|7f90ccd2b68cc9e4a2bf5d2ddb293f58|
|fbba7d7a4445bd91a5e7bd560a6b766e|
|f753f32e057fe1ee68a16d8852521c94|
+--------------------------------+



## City

Next, we do the same for the `city` dimension. As we shall see below, we will need to do some more advanced treatment.

First, let's select the cities:

In [25]:
city = spark.sql("""
SELECT
    userId,
    conversationId,
    messageTextAnswer as city
FROM chatbot_treated
WHERE messageText='Qual a sua cidade?'
GROUP BY 1,2,3
""")
city.createOrReplaceTempView('city')

In [26]:
spark.sql("SELECT COUNT (DISTINCT city) FROM city").show()

                                                                                

+--------------------+
|count(DISTINCT city)|
+--------------------+
|                 301|
+--------------------+



As we see above, we have about 300 different cities. Let's give them a look.

In [27]:
spark.sql("SELECT DISTINCT city FROM city ORDER BY 1").show(305, truncate=False)

                                                                                

+---------------------+
|city                 |
+---------------------+
|21                   |
|Abaete               |
|Abaeté               |
|Americana            |
|Ananindeua           |
|Anapolis             |
|Anápolis             |
|Aracaju              |
|Barueri              |
|Bauru                |
|Belem                |
|Belo Horizonte       |
|Belém                |
|Betim                |
|Blumenau             |
|Boa Vista            |
|Brasilia             |
|Brasília             |
|Camacari             |
|Camaçari             |
|Campina Grande       |
|Campinas             |
|Campo Grande         |
|Campos               |
|Canoas               |
|Carapicuiba          |
|Carapicuíba          |
|Cariacica            |
|Caruaru              |
|Cascavel             |
|Caucaia              |
|Caxias do Sul        |
|Contagem             |
|Crato                |
|Cuiaba               |
|Cuiabá               |
|Curitiba             |
|Diadema              |
|Feira de Santan

We see above that, as the documentation said, we have cities with different spellings. However, it seems that all of the different spellings **do not** misspell - accents excluded - the cities name; for example, for the city of São Paulo we seem to only have the variations 'Sao Paulo', 'são paulo' and 'sao paulo').

So, if we transform all city names to lowercase without accent (e.g. São Paulo -> sao paulo) we will fix all of those different spellings. 

Once again, to do such transformation, we will make use of Spark UDFs.

In [28]:
@F.udf
def city_treatment(city):
    if city:
        city = city.lower()
        normalized = unicodedata.normalize("NFD", city)
        decoded = normalized.encode("ascii", "ignore").decode("utf-8")
        return decoded
    else:
        return city

In [29]:
city = city.withColumn('city', city_treatment('city'))
city.createOrReplaceTempView('city')
city.select('city').distinct().orderBy(F.asc('city')).show(300)

                                                                                

+--------------------+
|                city|
+--------------------+
|                  21|
|              abaete|
|           americana|
|          ananindeua|
|            anapolis|
|             aracaju|
|             barueri|
|               bauru|
|               belem|
|      belo horizonte|
|               betim|
|            blumenau|
|           boa vista|
|            brasilia|
|            camacari|
|      campina grande|
|            campinas|
|        campo grande|
|              campos|
|              canoas|
|         carapicuiba|
|           cariacica|
|             caruaru|
|            cascavel|
|             caucaia|
|       caxias do sul|
|            contagem|
|               crato|
|              cuiaba|
|            curitiba|
|             diadema|
|    feira de santana|
|       florianopolis|
|           fortaleza|
|       foz do iguacu|
|              franca|
|             goiania|
|governador valadares|
|            gravatai|
|             guaruja|
|          

With the exception of the odd city `21` our transformation seems to be worked.

Now we add the corresponding `userId` of the city `21` to our `users_to_filter` dataset:

In [30]:
users_to_filter = spark.sql("""
SELECT
    userId
FROM city
WHERE city = 21
UNION
SELECT userId FROM users_to_filter
""")
users_to_filter.createOrReplaceTempView('users_to_filter')
users_to_filter.show()

                                                                                

+--------------------+
|              userId|
+--------------------+
|d4aa22a26cbcbc560...|
|7f90ccd2b68cc9e4a...|
|fbba7d7a4445bd91a...|
|f753f32e057fe1ee6...|
+--------------------+



## Final Dataframe

With the Pokémon dimension treated and the `city` and `age` dimensions extracted, we can build our final dataframe. Notice that we are excluding the 4 users from it.

In [31]:
final = spark.sql("""
WITH users AS (
SELECT
    min(date) as date,
    userId,
    conversationId,
    channel,
    botId
FROM chatbot_treated
WHERE userId NOT IN (SELECT userId FROM users_to_filter)
GROUP BY 2,3,4,5
)

SELECT 
    users.date,
    users.userId,
    users.conversationId,
    channel,
    pokemon_list AS favourite_pokemon,
    age,
    city,
    botId
FROM 
    users
LEFT JOIN pokemon_treated ON users.userId = pokemon_treated.userId
LEFT JOIN age ON users.userId = age.userId
LEFT JOIN city ON users.userId = city.userId
""")

final.show()

[Stage 134:>                                                        (0 + 1) / 1]

+----------+--------------------+--------------------+-----------------+--------------------+---+-----------------+-----+
|      date|              userId|      conversationId|          channel|   favourite_pokemon|age|             city|botId|
+----------+--------------------+--------------------+-----------------+--------------------+---+-----------------+-----+
|2021-03-12|0001a55b006e0bada...|e40467df-6f1f-4c4...|              sms|[fearow, florges,...| 38|     campo grande| 1567|
|2021-04-19|00037981db6fa4f24...|31fea68a-a79d-445...|        instagram|           [ninjask]| 23|         brasilia| 1567|
|2021-04-15|0004013912834f551...|b5d4e68c-177d-4fe...|              sms|[carnivine, hippo...| 28|           palmas| 1567|
|2021-04-02|00052c60aafed1bee...|a4387219-be30-412...|         whatsapp|         [electrode]| 55|           suzano| 1567|
|2021-03-10|0007a057c85be431b...|7957d94d-1639-451...|facebook messeger|[barboach, minun,...| 39|        boa vista| 1567|
|2021-04-28|0007ebdf5211

                                                                                

In [32]:
final.count()

                                                                                

49996

As we expected, we have 4 users less than the original 50000. 

Now, we save the data in `parquet`. Before that, lets see how much partitions our `final` dataframe have.

In [33]:
final.rdd.getNumPartitions()

                                                                                

2

Not bad. Since the data itself is small, we will reduce to one partition in order to save it.

In [34]:
final.coalesce(1).write.parquet("/app/files/treated_data", mode='overwrite')

                                                                                

Lastly, we stop our program.

In [None]:
spark.stop()