#### Importar Bibliotecas 

In [8]:
from pyspark.sql import SparkSession
from datetime import datetime

#### Criar uma Sessão Spark

In [2]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("ReadCSV_web") \
    .getOrCreate()

#### Definir o Caminho do Arquivo CSV

In [3]:
today_date = datetime.today().strftime('%Y%m%d')
file_path = f"/data/not-process/{today_date}/public-sales.csv"

#### Ler o Arquivo CSV

In [4]:
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [5]:
df.show(5)

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|          â¹399|    â¹1,099|                64%|   4.2|      24,2

#### Descrever os Dados

In [6]:
df.describe().show()

+-------+----------+--------------------+--------------------+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|product_id|        product_name|            category| discounted_price|       actual_price|discount_percentage|              rating|        rating_count|       about_product|          user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+-------+----------+--------------------+--------------------+-----------------+-------------------+-------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|      2

#### Exibir o Esquema dos Dados

In [9]:
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: string (nullable = true)
 |-- actual_price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: string (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)



#### Processar os Dados

Remover Duplicatas

In [None]:
df_copy = df.select('*').dropDuplicates()

Limpar e Converter Colunas

df_copy = df_copy.withColumn("discounted_price",
    regexp_replace(
        regexp_replace(col("discounted_price"), "â¹", ""),
        ",", ""
    ).cast("float")
)

In [None]:
df_copy = df_copy.withColumn("actual_price",
    regexp_replace(
        regexp_replace(col("actual_price"), "â¹", ''),
        ",", ""
    ).cast("float")
)

In [None]:
df_copy = df_copy.withColumn("discount_percentage",
    regexp_replace(
        col("discount_percentage"), '%', ''
    ).cast("float")
)

Ajustar a Coluna de Avaliação

In [None]:
unique_ratings = df_copy.select(col("rating")).distinct()
unique_ratings_list = [row['rating'] for row in unique_ratings.collect()]

In [None]:
df_copy = df_copy.withColumn("rating", col("rating").cast("float"))
df_copy = df_copy.withColumn("rating", round(col("rating"), 2))
df_copy = df_copy.withColumn("rating", when(col("product_id") == "B08L12N5H1", 3.9).otherwise(col("rating")))

Limpar a Coluna de Contagem de Avaliações

In [None]:
df_copy = df_copy.withColumn("rating_count",
    regexp_replace(
        col("rating_count"), ',', ''
    ).cast("float")
)

#### Exibir o DataFrame Final

In [None]:
df_copy.show(5)

#### Descrever o DataFrame Processado

In [None]:
df_copy.describe().show()

#### Fechar a Sessão Spark

In [None]:
spark.stop()