<a href="https://colab.research.google.com/github/aleiarias/coursera-capstone-project/blob/main/Nubi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [134]:
!pip install -q pyspark

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, rank
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException

# Nueva sesión de Spark
spark = SparkSession.builder.appName("Nubi_Challenge").config("spark.sql.parquet.enableVectorizedReader", "false").getOrCreate()

# Ruta raiz de los archivos parquet
root_path = "compressedData"

# Ruta de los archivos diarios para el periodo analizado
day_path = f"{root_path}/year=2024/month=07/"

# Punto 1: Logging de los dias faltantes considerando los primeros 7
for dia in range(1,7):
    file_path = os.path.join(day_path, f"day=2024070{dia}")

    if not os.path.exists(file_path):
        # Si el archivo no existe, muestro mensaje en pantalla
        print(f"Error: El dia '2024070{dia}' no existe en la ruta '{day_path}'.")

#Lectura de los archivos parquet
try:
  df = spark.read.parquet(root_path,header=True)
  df.show()
  # Punto 2: Calculo de totales

  total = df.groupBy("sellerId").agg(sum("sales").alias("Total_SI"),sum("price").alias("Total_GMV"))
  total.show()

  # Punto 3: Ranking de vendendores

  #Defino ventana de partición y ordenación
  SI_spec = Window.orderBy(total["total_SI"].desc())
  GMV_spec = Window.orderBy(total["total_GMV"].desc())

  # Agrego la columna de ranking al dataFrame
  df_ranking = total.withColumn("rankSI", rank().over(SI_spec))
  df_ranking = df_ranking.withColumn("rankGMV", rank().over(GMV_spec))
  df_ranking.show()

  # Punto 4: Almaceno el dataframe en un archivo csv
  df_ranking.write.mode("overwrite").csv("desafionubi.csv", header=True)
  # Mostrar los resultados ordenados por ranking
  df_ranking.show()

  # Punto 5: Preparo y almaceno el dataframe en un archivo parquet particionado por sellerId
  df_aggregated = df.groupBy("id","day","sellerId").agg(sum("sales").alias("total_ventas"))
  df_aggregated.show()
  df_aggregated.write.mode("overwrite").partitionBy("sellerId").parquet("desafio_nubi.parquet")

  # Detener la sesión de Spark
  spark.stop()
except AnalysisException as e:
  # Log de un mensaje si el archivo parquet no existe o no puede ser leído
  if "Ruta no existe" in str(e):
      print(f"Error: El archivo parquet en la ruta '{file_path}' no existe.")
  else:
      print(f"Error al intentar leer el archivo Parquet: {str(e)}")

except Exception as e:
      print(f"Error al leer los archivos Parquet particionados")



