In [3]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5034f2498a0e00aac236ce49bd47eeb356b1ce048c7eb41536481041984487b6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType
from pyspark.sql.functions import col, explode_outer, when
from pyspark.sql import SparkSession

In [6]:


spark = SparkSession.builder \
    .appName("JSON") \
    .getOrCreate()

# Configuração para lidar com registros corrompidos
spark.conf.set("spark.sql.columnNameOfCorruptRecord", "_corrupt_record")

# Caminho para o arquivo JSON
file_path = "/content/base_desafio (1).json"

# Leitura do arquivo JSON com opções para lidar com registros corrompidos
df = spark.read.option("multiline", "true").json(file_path)

# Seleciona apenas o array 'items' e explode-o para criar uma linha para cada item
df_items = df.select(explode_outer("resource.items").alias("item"))


df_final = df_items.select(
    "item.*",
    "item.extras.*"
)

# Mostra os primeiros registros para verificar o resultado
df_final = df_final.drop("extras")

def rename_columns(df):
    for column in df.columns:
        # Transforma o nome da coluna para snake_case
        new_column_name = column.replace(".", "_").lower()

        # Ajusta colunas que começam com números para "etapa"
        if new_column_name[0].isdigit():
            # Encontra o primeiro espaço em branco após o número (se existir)
            space_idx = new_column_name.find(' ')
            if space_idx != -1:
                number = new_column_name[:space_idx]
                name = new_column_name[space_idx+1:]
                # Substitui espaços em branco por underscores no restante do nome
                name = name.replace(' ', '_')
                # Remove underscores extras após o número da etapa
                number = number.rstrip('_')
                new_column_name = f"etapa_{number}_{name}"
            else:
                new_column_name = f"etapa_{new_column_name}"

        df = df.withColumnRenamed(column, new_column_name)

    return df

# Aplica a renomeação para snake_case em todas as colunas
df_renamed = rename_columns(df_final)

# Função para converter colunas de etapas para booleano
def convert_etapa_columns(df):
    for column in df.columns:
        if column.startswith("etapa"):
            df = df.withColumn(column, F.when(F.col(column).isNull(), False).otherwise(F.col(column).cast("boolean")))
    return df

# Aplica a conversão das colunas de etapas para booleano
df_final = convert_etapa_columns(df_renamed)

# Mostra os primeiros registros para verificar o resultado
df_final.show()

+--------------+--------------------+--------------------+--------------------+---------+--------------------+----------+-------------------+------------------------+--------------------+--------------------+---------------------+-------------------+--------------------+---------------------------+-----------------+-------------------+------------------+--------------------------+-------------------+--------------------------+------------------------------+-------------------------------------+-----------+------------------+----------------------+------------------------------+-------------------------------------+-----------------------------------------+------------------------------------------------+------------------------------+-------------------------------------+-----------------------------------------+------------------------------------------------+-------------------------------+---------------+----------------------+--------------------+---------------------+-------------

In [14]:
def remove_step_columns(df):
    # Seleciona todas as colunas que começam com "etapa"
    step_columns = [col for col in df.columns if col.startswith("etapa")]

    # Seleciona colunas específicas do dataframe original (df) e armazena em variáveis
    removed_columns = df.select(["identity"] + step_columns)  # Seleciona identity e colunas de etapa
    removed_person_remove = df.select(["cliente"] + ["name"] + ["email"] + ["city"] + ["phonenumber"] + ["bairro"] + ["cpf"] + ["uf"] + ["cep"] + ["logradouro"] + ["datanascimento"] + ["numendereco"] + ["complemento"])
    remover_person_complete = df.select(["identity"] + ["cliente"] + ["name"] + ["email"] + ["city"] + ["phonenumber"] + ["bairro"] + ["cpf"] + ["uf"] + ["cep"] + ["logradouro"] + ["datanascimento"] + ["numendereco"] + ["complemento"])
    mensage_remove = df.select(["primeiramensagem"] + ["lastmessagedate"])
    mensage_complete = df.select(["identity"] + ["primeiramensagem"] + ["lastmessagedate"])
    service_remove = df.select(["idpedido"] + ["priceavancado"] + ["prioridade"] + ["dataagendamento1"] + ["planotv"] + ["operadora"] + ["dataagendamento2"] + ["identificadorpedido"] + ["planname"] + ["plano"] + ["planodescription"] + ["planodescription2"] + ["pontos"] + ["portabilidade"] + ["price"] + ["produto"] + ["protocolowci"] + ["termo"] + ["tipofatura"] + ["skutv"] + ["vencimentofatura"])
    service_complete = df.select(["identity"] + ["priceavancado"] + ["prioridade"] +["idpedido"] + ["dataagendamento1"] + ["planotv"] + ["operadora"] + ["dataagendamento2"] + ["identificadorpedido"] + ["planname"] + ["plano"] + ["planodescription"] + ["planodescription2"] + ["pontos"] + ["portabilidade"] + ["price"] + ["produto"] + ["protocolowci"] + ["termo"] + ["tipofatura"] + ["skutv"] + ["vencimentofatura"])

    # Remove as colunas de etapa do dataframe original
    df = df.drop(*step_columns)
    # Remove as colunas selecionadas de removed_person_remove
    df = df.drop(*removed_person_remove)
    # Remove as colunas selecionadas de mensage_remove
    df = df.drop(*mensage_remove)
    # Remove as colunas selecionadas de service_remove
    df = df.drop(*service_remove)

    # Retorna o dataframe modificado e os dataframes selecionados
    return df, removed_columns, remover_person_complete, mensage_complete, service_complete

# Aplica a remoção das colunas de "etapa"
df_final_parse, etapa, person, mensage, service = remove_step_columns(df_final)

parquet_path = "/content/sample_data"

# Mostra os primeiros registros de df_final_parse para verificar o resultado final
df_final_parse.show()

# Mostra as colunas removidas em um novo dataframe (df_etapa)
etapa.show()

# Mostra o dataframe 'person'
person.show()

# Mostra o dataframe 'mensage'
mensage.show()

# Mostra o dataframe 'service'
service.show()

# Salva df_final_parse como arquivo Parquet
df_final_parse.write.mode("overwrite").parquet(parquet_path + "/df_final_parse")

# Salva df_etapa como arquivo Parquet
etapa.write.mode("overwrite").parquet(parquet_path + "/etapa")

# Salva 'person' como arquivo Parquet
person.write.mode("overwrite").parquet(parquet_path + "/person")

# Salva 'mensage' como arquivo Parquet
mensage.write.mode("overwrite").parquet(parquet_path + "/mensage")

# Salva 'service' como arquivo Parquet
service.write.mode("overwrite").parquet(parquet_path + "/service")


+--------------------+----------+-----------------+--------+-----------+----------+---------+----+--------------------+--------------------+---------+---------+---------------------+-------+-------------+----+----------+-----------+-------+-----+--------------------+--------------+
|            identity|    source|primeira mensagem|sourceid|sourcetitle|sourcetype|sourceurl| tag|         utmcampaign|        utmcampaign2|utmmedium|utmsource|applicationidentifier|autosku|        canal|fila|modalidade|modalidade2|nomemae|opcao|         utmreferrer|       utmterm|
+--------------------+----------+-----------------+--------+-----------+----------+---------+----+--------------------+--------------------+---------+---------+---------------------+-------+-------------+----+----------+-----------+-------+-----+--------------------+--------------+
|781.361.413-88@do...| amplitudo|             NULL|    NULL|       NULL|      NULL|     NULL|NULL|irRzzChMpWjKN2HiG...|                NULL|     NULL| 