In [None]:
# ETL australian_user_reviews.csv
import json
import ast
import pandas as pd
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk
import math

nltk.download('vader_lexicon')

# Define la función para realizar el análisis de sentimiento
def analyze_sentiment(text):
	analyzer = SentimentIntensityAnalyzer()
	sentiment = analyzer.polarity_scores(text)
	if sentiment['compound'] >= 0.05:
		return 2
	elif sentiment['compound'] <= -0.05:
		return 0
	else:
		return 1

user_reviews = []

# Abre el archivo y recorrerlo para agregar las reseñas a la lista
with open('../data_sources/json/australian_user_reviews.json', encoding='utf-8') as f:
	for line in f:
		object = json.loads(json.dumps(ast.literal_eval(line)))
		user_reviews.append(object)

# Crea el dataframe a partir de la lista
df_user_reviews = pd.DataFrame(user_reviews)

# Normaliza la columna reviews
normalized = pd.json_normalize(user_reviews, record_path=['reviews'], meta=['user_id'] )

# Elimina las filas vacias
normalized = normalized.dropna()

# Eliminar duplicados
clean_items = normalized.drop_duplicates(keep='first')

clean_items['review'] = clean_items['review'].apply(analyze_sentiment)
clean_items['review'].fillna(1, inplace=True)

# Dividir el DataFrame en bloques de 1000 filas
rows_per_file = 10000
total_rows = len(clean_items)
total_files = math.ceil(total_rows / rows_per_file)

# Recorrer y crear archivos Parquet
for i in range(total_files):
    s = i * rows_per_file
    e = min((i + 1) * rows_per_file, total_rows)
    block = clean_items.iloc[s:e]

    # Crear el nombre del archivo Parquet (puedes modificar el nombre como desees)
    file_name = f"../data_sources/parquet/user_reviews/dataset_{i + 1}.parquet"

    # Guardar el bloque actual en un archivo Parquet
    block.to_parquet(file_name)

    print(f"Archivo {file_name} guardado con éxito. Filas {s + 1} a {e} del DataFrame.")