In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when, monotonically_increasing_id, size, split, length

In [2]:
spark= SparkSession.builder\
        .appName("AmazonReviewsGold")\
        .config("spark.driver.memory", "6g")\
        .config("spark.executor.memory", "6g")\
        .getOrCreate()

print("SparkSession iniciando para a Camada Gold.")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/16 01:45:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/16 01:45:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


SparkSession iniciando para a Camada Gold.


In [3]:
silver_path="/app/data/silver/train_silver"

print(f"Carregando dados da camada Silver de:{silver_path}")
df_silver= spark.read.format("parquet").load(silver_path)

print("Schema da tabela Silver carregada:")
df_silver.printSchema()

Carregando dados da camada Silver de:/app/data/silver/train_silver


                                                                                

Schema da tabela Silver carregada:
root
 |-- label: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- review: string (nullable = true)



In [4]:
#Criando DataFrame de trabalho para o Gold a partir da Silver
df_gold =df_silver
#Gerando um id para cada review ter seu rastreio11
df_gold = df_gold.withColumn("review_id", monotonically_increasing_id())


In [5]:
#Criando sentiment_description 1 para ruim 2 para bom
df_gold = df_gold.withColumn("sentiment_description", when(col("label") == 1, "Ruim").when(col("label") == 2, "Bom").otherwise("Desconhecido"))


In [6]:
#Calcular contagens de caracteres para para saber como é o padrão de comentarios bons e ruins
df_gold = df_gold.withColumn("title_char_count", length(col("title")))
df_gold = df_gold.withColumn("review_char_count", length(col("review")))


In [7]:
df_gold_final = df_gold.select(col("review_id"), col("label").alias("sentiment_label_id"), col("sentiment_description"), col("title").alias("cleaned_title"), col("review").alias("cleaned_review"),col("title_char_count"),col("review_char_count"))

print("Schema da tabela Gold final:")
df_gold_final.printSchema()
print("Amostra da tabela Gold final (primeiras 5 linhas):")
df_gold_final.show(5,truncate=True)

Schema da tabela Gold final:
root
 |-- review_id: long (nullable = false)
 |-- sentiment_label_id: integer (nullable = true)
 |-- sentiment_description: string (nullable = false)
 |-- cleaned_title: string (nullable = true)
 |-- cleaned_review: string (nullable = true)
 |-- title_char_count: integer (nullable = true)
 |-- review_char_count: integer (nullable = true)

Amostra da tabela Gold final (primeiras 5 linhas):


                                                                                

+---------+------------------+---------------------+--------------------+--------------------+----------------+-----------------+
|review_id|sentiment_label_id|sentiment_description|       cleaned_title|      cleaned_review|title_char_count|review_char_count|
+---------+------------------+---------------------+--------------------+--------------------+----------------+-----------------+
|        0|                 2|                  Bom|Stuning even for ...|This sound track ...|              30|              394|
|        1|                 2|                  Bom|The best soundtra...|I'm reading a lot...|              37|              470|
|        2|                 2|                  Bom|            Amazing!|This soundtrack i...|               8|              353|
|        3|                 2|                  Bom|Excellent Soundtrack|I truly like this...|              20|              721|
|        4|                 2|                  Bom|Remember, Pull Yo...|If you've played 

In [8]:
#Salvando minha tabela gold como parquet
gold_output_path = "/app/data/gold/amazon_reviews_ennriched_features"

print(f"Salvando tabela gold em formato parquet em:{gold_output_path}")

df_gold_final.write.mode("overwrite").format("parquet").save(gold_output_path)
print(f"Tabela Gold salva com sucesso em {gold_output_path}")


Salvando tabela gold em formato parquet em:/app/data/gold/amazon_reviews_ennriched_features


                                                                                

Tabela Gold salva com sucesso em /app/data/gold/amazon_reviews_ennriched_features


In [9]:
spark.stop()