#### Pipeline ETL para ingesta en base de datos relacional

In [1]:
# importamos librerias
import os
import pandas as pd

In [2]:
# Cambiar el directorio de trabajo al directorio donde se encuentran los archivos JSON
%cd .\florida_data

# Obtener una lista de archivos en el directorio actual
archivos = os.listdir()

# Crear un DataFrame vacío para almacenar los datos
df_florida = pd.DataFrame()

# Iterar sobre cada archivo en la lista de archivos
for archivo in archivos:
    # Verificar si el archivo tiene la extensión .json
    if archivo.endswith('.json'):
        # Leer el archivo JSON y cargarlo en un DataFrame
        data = pd.read_json(archivo, lines=True)
        # Concatenar los datos del archivo al DataFrame principal
        df_florida = pd.concat([df_florida, data], axis=0)
print("finalizo cargan de datos")



c:\Users\sebas\OneDrive\Escritorio\Proyecto-Grupal-Google-yelp\Sprint 2 Ingenieria de Datos\pipeline ETL\google\florida_data
finalizo cargan de datos


In [3]:
path = r"C:\Users\sebas\OneDrive\Escritorio\Proyecto-Grupal-Google-yelp\Sprint 2 Ingenieria de Datos\pipeline ETL\google\florida_data\1.json"

# Cargar el archivo JSON en un DataFrame de Pandas
df_florida = pd.read_json(path, orient='records', lines=True)

In [4]:
# Chequeamos tamaño del dataset
df_florida.shape

(150000, 8)

In [5]:
# Elimino la columna pics y resp 
df_florida = df_florida.drop(['pics','resp','name'],axis=1)

In [6]:
# Elimino duplicados del df 
df_florida = df_florida.drop_duplicates()

# Elimino nulos 
df_florida = df_florida.dropna()

In [7]:
# Verifico nulos
print(df_florida.isnull().sum())

# Verifico duplicados
print("\nDuplicados: ",df_florida.duplicated().sum())

user_id    0
time       0
rating     0
text       0
gmap_id    0
dtype: int64

Duplicados:  0


In [8]:
import datetime

# Normalizamos la columna time creando la columna time_normalizado con la fecha en el formato %d/%m/%y %H:%M:%S 
times = [] 
for data in df_florida['time']/1000 :
    times.append(datetime.datetime.utcfromtimestamp(data).strftime('%d/%m/%y %H:%M:%S'))
df_florida['time_normalizado']=times

# Cambiamos el valor de la columna time_normalizado de object a datetime 
df_florida['time_normalizado'] = pd.to_datetime(df_florida['time_normalizado'])

  df_florida['time_normalizado'] = pd.to_datetime(df_florida['time_normalizado'])


In [9]:
# Verificamos el cambio 
df_florida.dtypes

user_id                    float64
time                         int64
rating                       int64
text                        object
gmap_id                     object
time_normalizado    datetime64[ns]
dtype: object

Filtramos las reseñas de 2014 en adelante

In [10]:
# Extraer el año de la columna "time_normalizado" y crear una nueva columna "year"
df_florida["year"] = df_florida["time_normalizado"].dt.year

# Valores que deseas conservar en la columna "year"
valores_a_mantener = [2014, 2015, 2016 , 2017, 2018 , 2019 , 2020 , 2021]

# Eliminar los valores de la columna "year" que no están en la lista de valores_a_mantener
df_florida = df_florida[df_florida["year"].isin(valores_a_mantener)]

# Eliminamos la columna year
df_florida = df_florida.drop(['year'],axis=1)

# Eliminamos la columna time
df_florida = df_florida.drop(['time'],axis=1)

In [11]:
# cambio el nombre de la columna gmap_id a business_id y time_normalizado a date
df_florida.rename(columns={'gmap_id': 'business_id', 'time_normalizado': 'timestamp'}, inplace=True)


In [12]:
# Importamos las bibliotecas de procesamiento de texto y análisis de sentimiento
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Inicializar el analizador de sentimientos
sia = SentimentIntensityAnalyzer()

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\sebas\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [13]:
# Función para asignar valores según la escala de sentimiento
def obtener_puntuacion_sentimiento(texto):
    if pd.isnull(texto) or texto == '':
        return 1  # Devolver neutral si está vacío o es NaN
    elif isinstance(texto, str):
        sentimiento = sia.polarity_scores(texto)
        puntuacion_compuesta = sentimiento['compound']
        if puntuacion_compuesta >= -0.05:
            return 2  # Buena puntuación
        elif puntuacion_compuesta <= -0.05:
            return 0  # Mala puntuación
        else:
            return 1
    else: 
        return 1  # Devolver neutral para valores que no son str

In [14]:
# Convertir la columna 'text' a cadena de texto
df_florida['text'] = df_florida['text'].astype(str)

In [15]:
# Aplicar la función obtener_puntuacion_sentimiento a la columna 'text'
df_florida['sentiment_analysis'] = df_florida['text'].apply(obtener_puntuacion_sentimiento)

In [16]:
# Eliminar la columna 'text' ya que no la necesitamos con 'sentiment_analysis'
df_florida = df_florida.drop(columns=['text'], axis=1)

In [17]:
# chequeamos los resultados
df_florida.head(2)

Unnamed: 0,user_id,rating,business_id,timestamp,sentiment_analysis
0,1.014719e+20,1,0x8893863ea87bd5dd:0x9383ebf973e74abb,2021-03-08 15:07:30,0
1,1.154772e+20,5,0x8893863ea87bd5dd:0x9383ebf973e74abb,2020-07-18 00:13:37,0


In [18]:
# Chequeamos tamaño del dataset
df_florida.shape

(91766, 5)

In [19]:
# Exportamos el df en formato csv
df_florida.to_csv('review.csv', index=False)