# Imports

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a0eee9694ad31e11cff22126715a2dcc3c6e7c89670cc186ddbb87fee7b625f9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, expr

# Configs

In [3]:
# Initializing SparkSession
spark = SparkSession.builder.appName('Case Oper').getOrCreate()

# Defining the path to the JSON file
path_json = '/content/drive/MyDrive/Colab Notebooks/teste-tecnico-main/base_desafio.json'
path_output = '/content/drive/MyDrive/Colab Notebooks/teste-tecnico-main/outputs/second_question'

In [4]:
# Reading JSON file into a Spark DataFrame, allowing multiline JSON objects
df = spark.read.option("multiline","true") .json(path_json)

# Second question

In [5]:
# Selecting and exploding the 'items' array from the DataFrame 'df'
items_df = df.select(explode("resource.items").alias("item"))

In [6]:
# Selecting specific columns from the 'items_df' DataFrame and renaming them to follow snake_case naming convention

items_df = items_df.select(
    col("item.name").alias("name"),
    col("item.lastMessageDate").alias("last_message_date"),
    col("item.identity").alias("identity"),
    col("item.phoneNumber").alias("phone_number"),
    col("item.source").alias("source"),
    col("item.extras.protocoloWci").alias("extras_protocolo_wci"),
    col("item.extras.UtmCampaign").alias("extras_utm_campaign"),
    col("item.extras.applicationIdentifier").alias("extras_application_identifier"),
    col("item.extras.primeiraMensagem").alias("extras_primeira_mensagem"),
    col("item.extras.produto").alias("extras_produto"),
    col("item.extras.`1. Boas vindas`").alias("extras_etapa_boas_vindas"),
    col("item.extras.prioridade").alias("extras_prioridade"),
    col("item.extras.`2. Cep`").alias("extras_etapa_cep"),
    col("item.extras.canal").alias("extras_canal"),
    col("item.extras.`99. Abandono`").alias("extras_etapa_abandono")
)

In [7]:
# Converting columns starting with 'extras_etapa_' to boolean type in the DataFrame 'items_df'
for column in items_df.columns:
    if column.startswith("extras_etapa_"):
        items_df = items_df.withColumn(column, expr(f"cast({column} as boolean)"))

In [8]:
#Only to show
items_df.show(truncate=False)

+---------+------------------------+---------------------------+---------------------+----------+---------------------+---------------------+-----------------------------+----------------------------------------------------------------------------+--------------+------------------------+-----------------+----------------+-------------+---------------------+
|name     |last_message_date       |identity                   |phone_number         |source    |extras_protocolo_wci |extras_utm_campaign  |extras_application_identifier|extras_primeira_mensagem                                                    |extras_produto|extras_etapa_boas_vindas|extras_prioridade|extras_etapa_cep|extras_canal |extras_etapa_abandono|
+---------+------------------------+---------------------------+---------------------+----------+---------------------+---------------------+-----------------------------+----------------------------------------------------------------------------+--------------+-----------------

In [9]:
#Only to show
items_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- last_message_date: string (nullable = true)
 |-- identity: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- source: string (nullable = true)
 |-- extras_protocolo_wci: string (nullable = true)
 |-- extras_utm_campaign: string (nullable = true)
 |-- extras_application_identifier: string (nullable = true)
 |-- extras_primeira_mensagem: string (nullable = true)
 |-- extras_produto: string (nullable = true)
 |-- extras_etapa_boas_vindas: boolean (nullable = true)
 |-- extras_prioridade: long (nullable = true)
 |-- extras_etapa_cep: boolean (nullable = true)
 |-- extras_canal: string (nullable = true)
 |-- extras_etapa_abandono: boolean (nullable = true)



In [None]:
items_df.repartition(1).write.mode("overwrite").parquet(f"{path_output}/items_df.parquet")

# Additional code

In [13]:
nulos_por_coluna = items_df.select([col(c).isNull().alias(c) for c in items_df.columns])

In [14]:
nulos_por_coluna.show()

+-----+-----------------+--------+------------+------+--------------------+-------------------+-----------------------------+------------------------+--------------+------------------------+-----------------+----------------+------------+---------------------+
| name|last_message_date|identity|phone_number|source|extras_protocolo_wci|extras_utm_campaign|extras_application_identifier|extras_primeira_mensagem|extras_produto|extras_etapa_boas_vindas|extras_prioridade|extras_etapa_cep|extras_canal|extras_etapa_abandono|
+-----+-----------------+--------+------------+------+--------------------+-------------------+-----------------------------+------------------------+--------------+------------------------+-----------------+----------------+------------+---------------------+
|false|            false|   false|       false| false|               false|              false|                        false|                   false|         false|                   false|            false|         

In [20]:
# Get list of columns in the DataFrame
columns = items_df.columns

# Loop through columns and count nulls
for column in columns:
    null_count = items_df.where(col(column).isNull()).count()

    # Print the count of null values for each column
    print(f"Column '{column}' has {null_count} null values.")

# Alternatively, collect counts into a dictionary
null_counts = {column: items_df.where(col(column).isNull()).count() for column in columns}
print(null_counts)

Column 'name' has 4 null values.
Column 'last_message_date' has 0 null values.
Column 'identity' has 0 null values.
Column 'phone_number' has 3 null values.
Column 'source' has 6 null values.
Column 'extras_protocolo_wci' has 4 null values.
Column 'extras_utm_campaign' has 4 null values.
Column 'extras_application_identifier' has 4 null values.
Column 'extras_primeira_mensagem' has 3 null values.
Column 'extras_produto' has 4 null values.
Column 'extras_etapa_boas_vindas' has 4 null values.
Column 'extras_prioridade' has 4 null values.
Column 'extras_etapa_cep' has 4 null values.
Column 'extras_canal' has 4 null values.
Column 'extras_etapa_abandono' has 576 null values.
{'name': 4, 'last_message_date': 0, 'identity': 0, 'phone_number': 3, 'source': 6, 'extras_protocolo_wci': 4, 'extras_utm_campaign': 4, 'extras_application_identifier': 4, 'extras_primeira_mensagem': 3, 'extras_produto': 4, 'extras_etapa_boas_vindas': 4, 'extras_prioridade': 4, 'extras_etapa_cep': 4, 'extras_canal': 4,

In [29]:
# replace null values ​​with "N/A"
items_df.na.fill('N/A')

DataFrame[name: string, last_message_date: string, identity: string, phone_number: string, source: string, extras_protocolo_wci: string, extras_utm_campaign: string, extras_application_identifier: string, extras_primeira_mensagem: string, extras_produto: string, extras_etapa_boas_vindas: boolean, extras_prioridade: bigint, extras_etapa_cep: boolean, extras_canal: string, extras_etapa_abandono: boolean]