In [62]:
import pandas as pd
import hashlib
import duckdb as dk
from slugify import slugify
from datetime import datetime, timedelta
import csv
import os
from typing import Union
from pprint import pprint as pp
import chardet
import json

In [33]:
def read_csv(file_path: str, change_columns: bool = True, modify_file: bool = False) -> pd.DataFrame:

    """convierte un csv a un DataFrame, sin immportar su encoding o sep"""

    # error, file does not exist
    if not os.path.isfile(file_path):
        raise ValueError(f"El archivo ingresado no existe o esta mal escrito.")


    # detectamos el encoding del csv
    with open(file_path, 'rb') as archivo:
            resultado = chardet.detect(archivo.read())
            encoding = resultado['encoding']


    # abrimos el archivo como lectura y cargamos sus valore en una variable
    with open(file_path, 'r', encoding=encoding) as file:

        dialect = csv.Sniffer().sniff(file.read(4096))

        # reiniciamos la posicion en la que se esta apuntando en el archivo
        file.seek(0)

        delimiter =  dialect.delimiter
        csv_reader = csv.reader(file, delimiter=delimiter)
        data = list(csv_reader)

    columns = data[0]
    new_columns = []

    # acomodamos los nombre de las columnas y cambiamos las columnas que salgan repetidas
    if change_columns:

        for column in columns:
            name = slugify(column, separator="_")

            if name not in new_columns:
                new_columns.append(name)
            else:
                i = 0
                while True:
                    new_name = f"{name}_{i}"
                    if new_name not in new_columns:
                        new_columns.append(new_name)
                        break
                    i+=1


    df = pd.DataFrame(data[1:], columns= new_columns)

    if modify_file:
        df.to_csv(file_path, sep=",", encoding="utf8")

    return df


In [34]:
def generar_hash(texto):
    # Crear un objeto hash SHA256
    sha256_hash = hashlib.sha256()

    # Convertir el texto en bytes y actualizar el hash
    sha256_hash.update(texto.encode('utf-8'))

    # Obtener el hash en formato hexadecimal
    hash_resultado = sha256_hash.hexdigest()

    return hash_resultado

In [66]:
def to_json(report, path):
    with open(path, 'w', encoding="utf8") as file:
        json.dump(report, file, indent=4)

In [55]:
def check_changes(*args: str, get_report: bool = False) -> dict:
    
    legend: dict = {str(i):args[i] for i in range(len(args))}
    report = {}   
    
    # lista con los skus en los cuales se detecto un cambio
    changes_detected: dict = {}
    
    df = read_csv(args[0])
    
    for i in range(1,len(args)):
    
        # df = df_list[i-1]
        df2 = read_csv(args[i])
        
        before = str(i-1)
        after = str(i)
        
        # columns del df
        df_columns = df.columns
        
        # lista con los sku de la data
        sku_df = [sku[0] for sku in dk.sql("select distinct sku from df").fetchall()]

        # sku que no aparecen en la data
        # sku_not_in_df = [sku[0] for sku in dk.sql("select distinct sku from df2 where sku not in (select distinct sku from df)").fetchall()]

        for sku in sku_df:
            
            data = dk.sql(f"select * from df where sku = '{sku}'").df().iloc[0].to_dict()
            previous_date = dk.sql(f"select * from df2 where sku = '{sku}'").df().iloc[0].to_dict()
            
            for col in df_columns:
                
                # hasheamos los strings para comparar si son diferentes
                data_hash = generar_hash(data[col])
                previous_date_hash = generar_hash(previous_date[col])
                
                if data_hash != previous_date_hash:
                    
                    if sku not in changes_detected:
                        changes_detected[sku] = {}
                    
                    if col not in changes_detected[sku]:
                        
                        changes_detected[sku][col] = {
                                    before: data[col] ,
                                    after: previous_date[col]
                                }
                    else:
                        changes_detected[sku][col][before] = data[col] 
                        changes_detected[sku][col][after] = previous_date[col]
        
        df = df2
    
    report = {
        "changes detected" : changes_detected if len(changes_detected) else None,
        # "sku are not there" : sku_not_in_df if len(sku_not_in_df) else None,
        "legend": legend if len(changes_detected) else None,
    } 
    
    return report

In [60]:
data = check_changes("data/Amazon_Contents_17_10_2023.csv", "data/Amazon_Contents_18_10_2023.csv", "data/Amazon_Contents_19_10_2023.csv", "data/Amazon_Contents_20_10_2023.csv", "data/Amazon_Contents_21_10_2023.csv")
path_file = "test.json"

In [67]:
to_json(data, path_file)