In [2]:
import sys
import os
import pandas as pd
from pathlib import Path
import pyarrow.parquet as pq

# Add project root to path to allow imports from src
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from src.data_preprocessing import load_parquet_to_df

## Obtaining the Dataframes ##

The dataframes used are: 
- Consum Total Agregat (CTA)
- Repte Consums Anòmals (RCA)
- Fuistes Experència Client (FEC)
- Incidències Comptadors Intelligents (ICI)

In [3]:
# Define the path to your raw data file
file_names = ["consum_total_agregat.parquet", "repte_consums_anomals.parquet", "fuites_experiencia_client.parquet","incidencies_comptadors_intelligents.parquet"]
files = ["../data/" + file for file in file_names]
sample_files = ["../data/sample_" + file for file in file_names]

# Run just with samples until being sure for time optimization 
df_CTA = load_parquet_to_df(files[0])
df_RCA = load_parquet_to_df(files[1])
df_FEC = load_parquet_to_df(files[2])
df_ICI = load_parquet_to_df(files[3])

df_list = [df_CTA,df_RCA, df_FEC, df_ICI]

['POLIZA_SUMINISTRO', 'FECHA', 'CONSUMO_REAL', 'SECCIO_CENSAL', 'US_AIGUA_GEST', 'NUM_MUN_SGAB', 'NUM_DTE_MUNI', 'NUM_COMPLET', 'DATA_INST_COMP', 'MARCA_COMP', 'CODI_MODEL', 'DIAM_COMP']
['POLISSA_SUBM', 'CODI_ANOMALIA', 'START_DATE', 'END_DATE', 'US_AIGUA_SUBM', 'SECCIO_CENSAL', 'NUMEROSERIECONTADOR', 'CONSUMO_REAL', 'FECHA_HORA']
['POLISSA_SUBM', 'DATA_INI_FACT', 'DATA_FIN_FACT', 'CREATED_MENSAJE', 'CODIGO_MENSAJE', 'TIPO_MENSAJE', 'US_AIGUA_SUBM', 'SECCIO_CENSAL', 'NUMEROSERIECONTADOR', 'CONSUMO_REAL', 'FECHA_HORA']
['POLIZA_SUMINISTRO', 'FECHA', 'CONSUMO_REAL', 'SECCIO_CENSAL', 'US_AIGUA_GEST', 'NUM_MUN_SGAB', 'NUM_DTE_MUNI', 'NUM_COMPLET', 'DATA_INST_COMP', 'MARCA_COMP', 'CODI_MODEL', 'DIAM_COMP']


In [4]:
data_dir = "../data/"
for file_name in file_names:
    full_path = os.path.join(data_dir, file_name)
    sample_path = os.path.join(data_dir, f"sample_{file_name}")
    
    # Load full dataset
  #  df = pd.read_parquet(full_path)
    parquet_file = pq.ParquetFile(full_path)
    num_rows = parquet_file.metadata.num_rows
    sample_size = max(1, int(num_rows * 0.1))
    df_sample = parquet_file.read_row_group(0).to_pandas()
    df_sample = df_sample.iloc[:sample_size]
    
    # Compute 10% of the data (at least 1 row)
  #  sample_size = max(1, int(len(df) * 0.1))
    #df_sample = df.iloc[:sample_size]
    
    # Save sample parquet
    df_sample.to_parquet(sample_path, index=False)
    
    print(f"Sample saved: {sample_path} ({sample_size} rows)")


Sample saved: ../data/sample_consum_total_agregat.parquet (1711270 rows)
Sample saved: ../data/sample_repte_consums_anomals.parquet (2119597 rows)
Sample saved: ../data/sample_fuites_experiencia_client.parquet (7637224 rows)
Sample saved: ../data/sample_incidencies_comptadors_intelligents.parquet (1711270 rows)


# Cleaning and Verification Dataframes

In [None]:
file_name=[
    "consum_total_agregat.parquet",
    "repte_consums_anomals.parquet",
    "fuites_experiencia_client.parquet",
    "incidencies_comptadors_intelligents.parquet"
]

for file in file_names:
    sample_path=os.path.join(data_dir,f"sample_{file}")
    if not os.path.exists(sample_path):
        print(f"The file doesn't exists in: {sample_path}")
        continue
    
    print("\n"+"-"*80)
    print(f"Processing sample: {sample_path}")

    #try to load file, if not show error
    try:
        df=pd.read_parquet(sample_path)
        print(f"Correctly loaded ({len(df):,} rows, {len(df.columns)} columns)")
    except Exception as e:
        print(f"Error in uploading {sample_path}: {e}")
        continue
    
    #cleaning of duplicated elements
    bef=len(df)
    df=df.drop_duplicates()
    print(f"Duplicated removed: {bef-len(df)}")

    #cleaning of nan rows
    bef=len(df)
    df=df.dropna(how='all')
    print(f"Nan rows removed: {bef-len(df)}")

    df=df.replace("None",pd.NA)

    #show null values
    nulls=df.isnull().sum()
    null_cols=nulls[nulls>0]
    if len(null_cols) > 0:
        print("Columns with null values:")
        print(null_cols)
    else:
        print("The file doesn't have any null values!")

    #modifies data type of the date column to be easier to use in future analysis
    date_cols=[c for c in df.columns if "DATA" in c or "FECHA" in c or "DATE" in c]
    for c in date_cols:
        df[c]=pd.to_datetime(df[c],errors="coerce")

    #strips whitespaces
    df=df.map(lambda x: x.strip() if isinstance(x,str) else x)

    #save clean version
    clean_path=sample_path.replace("sample_","clean_sample_")
    df.to_parquet(clean_path,index=False)
    print(f"File cleaned and saved in: {clean_path}")
    print("-"*80 + "\n")



--------------------------------------------------------------------------------
Processing sample: ../data/sample_consum_total_agregat.parquet
Correctly loaded (1,048,576 rows, 12 columns)
Duplicated removed: 13
Nan rows removed: 0
The file doesn't have any null values!
File cleaned and saved in: ../data/clean_sample_consum_total_agregat.parquet
--------------------------------------------------------------------------------


--------------------------------------------------------------------------------
Processing sample: ../data/sample_repte_consums_anomals.parquet
Correctly loaded (1,048,576 rows, 9 columns)
Duplicated removed: 0
Nan rows removed: 0
Columns with null values:
SECCIO_CENSAL     17128
CONSUMO_REAL     294112
dtype: int64
File cleaned and saved in: ../data/clean_sample_repte_consums_anomals.parquet
--------------------------------------------------------------------------------


--------------------------------------------------------------------------------
Proces

### Cleaning Consum Total Agregat (CTA)

In [5]:
#Show stats 
print("-"*10, file_names[0], "-" *10)
df_CTA.info(show_counts=True)

---------- consum_total_agregat.parquet ----------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17112709 entries, 0 to 17112708
Data columns (total 12 columns):
 #   Column             Non-Null Count     Dtype  
---  ------             --------------     -----  
 0   POLIZA_SUMINISTRO  17112709 non-null  object 
 1   FECHA              17112709 non-null  object 
 2   CONSUMO_REAL       17112709 non-null  int64  
 3   SECCIO_CENSAL      5800000 non-null   float64
 4   US_AIGUA_GEST      5800000 non-null   object 
 5   NUM_MUN_SGAB       5800000 non-null   float64
 6   NUM_DTE_MUNI       5800000 non-null   float64
 7   NUM_COMPLET        5800000 non-null   object 
 8   DATA_INST_COMP     5800000 non-null   object 
 9   MARCA_COMP         5800000 non-null   object 
 10  CODI_MODEL         5800000 non-null   float64
 11  DIAM_COMP          5800000 non-null   float64
dtypes: float64(5), int64(1), object(6)
memory usage: 1.5+ GB


In [22]:
#after cleaning
df_clean_CTA=pd.read_parquet("../data/clean_sample_consum_total_agregat.parquet")
df_clean_CTA.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048563 entries, 0 to 1048562
Data columns (total 12 columns):
 #   Column             Non-Null Count    Dtype         
---  ------             --------------    -----         
 0   POLIZA_SUMINISTRO  1048563 non-null  object        
 1   FECHA              1048563 non-null  datetime64[ns]
 2   CONSUMO_REAL       1048563 non-null  int64         
 3   SECCIO_CENSAL      1048563 non-null  float64       
 4   US_AIGUA_GEST      1048563 non-null  object        
 5   NUM_MUN_SGAB       1048563 non-null  float64       
 6   NUM_DTE_MUNI       1048563 non-null  float64       
 7   NUM_COMPLET        1048563 non-null  object        
 8   DATA_INST_COMP     1048563 non-null  datetime64[ns]
 9   MARCA_COMP         1048563 non-null  object        
 10  CODI_MODEL         1048563 non-null  float64       
 11  DIAM_COMP          1048563 non-null  float64       
dtypes: datetime64[ns](2), float64(5), int64(1), object(4)
memory usage: 96.0+ MB


In [23]:
#to visualise the dataset
df_clean_RCA.head()

Unnamed: 0,POLISSA_SUBM,CODI_ANOMALIA,START_DATE,END_DATE,US_AIGUA_SUBM,SECCIO_CENSAL,NUMEROSERIECONTADOR,CONSUMO_REAL,FECHA_HORA
0,TZSHLTAPLXX4OYI3,163840,2024-07-08,2024-09-05,DOMÈSTIC,805601006,P22FA037836K,,2024-01-01
1,FJ2I5K246X6SG3T4,163840,2023-01-26,2023-03-27,DOMÈSTIC,805602004,P21VA155772I,,2024-01-01
2,MPIXKKMZKJXANKKB,163840,2024-05-07,2024-07-05,DOMÈSTIC,801507024,I20LA206734D,,2024-01-01
3,LV6FI7TE7BX7NKKE,163840,2023-01-11,2023-03-13,DOMÈSTIC,820002005,I19LA121835K,,2024-01-01
4,RSSOFEQOC53RL6OD,2,2023-01-16,2023-03-16,DOMÈSTIC,801906059,P15VA076725J,,2024-01-01


### Cleaning Repte Consums Anòmals (RCA)

In [None]:
#Detecció de consums anòmals dataframe cleaned 
print("-"*10, file_names[1], "-" *10)
df_RCA.info(show_counts=True)

---------- repte_consums_anomals.parquet ----------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2119597 entries, 0 to 2119596
Data columns (total 9 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   POLISSA_SUBM         2119597 non-null  object        
 1   CODI_ANOMALIA        2119597 non-null  int64         
 2   START_DATE           2119597 non-null  datetime64[ns]
 3   END_DATE             2119597 non-null  datetime64[ns]
 4   US_AIGUA_SUBM        2119597 non-null  object        
 5   SECCIO_CENSAL        2119597 non-null  object        
 6   NUMEROSERIECONTADOR  2119597 non-null  object        
 7   CONSUMO_REAL         2119597 non-null  float64       
 8   FECHA_HORA           2119597 non-null  datetime64[us]
dtypes: datetime64[ns](2), datetime64[us](1), float64(1), int64(1), object(4)
memory usage: 145.5+ MB


In [20]:
#after cleaning
df_clean_RCA=pd.read_parquet("../data/clean_sample_repte_consums_anomals.parquet")
df_clean_RCA.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048576 entries, 0 to 1048575
Data columns (total 9 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   POLISSA_SUBM         1048576 non-null  object        
 1   CODI_ANOMALIA        1048576 non-null  int64         
 2   START_DATE           1048576 non-null  datetime64[ns]
 3   END_DATE             1048576 non-null  datetime64[ns]
 4   US_AIGUA_SUBM        1048576 non-null  object        
 5   SECCIO_CENSAL        1031448 non-null  object        
 6   NUMEROSERIECONTADOR  1048576 non-null  object        
 7   CONSUMO_REAL         754464 non-null   float64       
 8   FECHA_HORA           1048576 non-null  datetime64[ns]
dtypes: datetime64[ns](3), float64(1), int64(1), object(4)
memory usage: 72.0+ MB


In [21]:
#to visualise the dataset
df_clean_RCA.head()

Unnamed: 0,POLISSA_SUBM,CODI_ANOMALIA,START_DATE,END_DATE,US_AIGUA_SUBM,SECCIO_CENSAL,NUMEROSERIECONTADOR,CONSUMO_REAL,FECHA_HORA
0,TZSHLTAPLXX4OYI3,163840,2024-07-08,2024-09-05,DOMÈSTIC,805601006,P22FA037836K,,2024-01-01
1,FJ2I5K246X6SG3T4,163840,2023-01-26,2023-03-27,DOMÈSTIC,805602004,P21VA155772I,,2024-01-01
2,MPIXKKMZKJXANKKB,163840,2024-05-07,2024-07-05,DOMÈSTIC,801507024,I20LA206734D,,2024-01-01
3,LV6FI7TE7BX7NKKE,163840,2023-01-11,2023-03-13,DOMÈSTIC,820002005,I19LA121835K,,2024-01-01
4,RSSOFEQOC53RL6OD,2,2023-01-16,2023-03-16,DOMÈSTIC,801906059,P15VA076725J,,2024-01-01


### Cleaning Fuistes Experència Client (FEC)


In [None]:
#before cleaning
print("-"*10, file_names[2], "-" *10)
df_FEC.info(show_counts=True)

---------- fuites_experiencia_client.parquet ----------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7637224 entries, 0 to 7637223
Data columns (total 11 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   POLISSA_SUBM         7637224 non-null  object        
 1   DATA_INI_FACT        7637224 non-null  object        
 2   DATA_FIN_FACT        7637224 non-null  object        
 3   CREATED_MENSAJE      5620178 non-null  datetime64[us]
 4   CODIGO_MENSAJE       5620178 non-null  object        
 5   TIPO_MENSAJE         5620178 non-null  object        
 6   US_AIGUA_SUBM        7637224 non-null  object        
 7   SECCIO_CENSAL        7420659 non-null  object        
 8   NUMEROSERIECONTADOR  7637224 non-null  object        
 9   CONSUMO_REAL         6156763 non-null  float64       
 10  FECHA_HORA           7637224 non-null  datetime64[us]
dtypes: datetime64[us](2), float64(1), object(8)
memory usage: 6

In [None]:
#after cleaning
df_clean_FEC=pd.read_parquet("../data/clean_sample_fuites_experiencia_client.parquet")
df_clean_FEC.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048576 entries, 0 to 1048575
Data columns (total 11 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   POLISSA_SUBM         1048576 non-null  object        
 1   DATA_INI_FACT        1048576 non-null  datetime64[ns]
 2   DATA_FIN_FACT        1048576 non-null  datetime64[ns]
 3   CREATED_MENSAJE      775129 non-null   datetime64[ns]
 4   CODIGO_MENSAJE       775129 non-null   object        
 5   TIPO_MENSAJE         775129 non-null   object        
 6   US_AIGUA_SUBM        1048576 non-null  object        
 7   SECCIO_CENSAL        1018942 non-null  object        
 8   NUMEROSERIECONTADOR  1048576 non-null  object        
 9   CONSUMO_REAL         898110 non-null   float64       
 10  FECHA_HORA           1048576 non-null  datetime64[ns]
dtypes: datetime64[ns](4), float64(1), object(6)
memory usage: 88.0+ MB


In [17]:
#to visualise the dataset
df_clean_FEC.head()

Unnamed: 0,POLISSA_SUBM,DATA_INI_FACT,DATA_FIN_FACT,CREATED_MENSAJE,CODIGO_MENSAJE,TIPO_MENSAJE,US_AIGUA_SUBM,SECCIO_CENSAL,NUMEROSERIECONTADOR,CONSUMO_REAL,FECHA_HORA
0,RGYFWIZ4ZRRZKX2K,2023-09-13,2023-11-14,NaT,,,DOMÈSTIC,801907090,IBAJ44VHSIRRTASA,,2024-01-01
1,HHB4U5HUQKW7IOGD,2023-08-13,2023-10-16,NaT,,,DOMÈSTIC,801909040,L2CLPPJRIPAEESV7,,2024-01-01
2,EU6AT3IKPUKCZTBU,2024-01-24,2024-03-26,NaT,,,DOMÈSTIC,801902046,45TBDJQN4LA37ZIN,,2024-01-01
3,EU6AT3IKPUKCZTBU,2023-11-27,2024-01-24,NaT,,,DOMÈSTIC,801902046,45TBDJQN4LA37ZIN,,2024-01-01
4,EWNDTPECBVEGW6AU,2023-09-29,2023-11-27,NaT,,,DOMÈSTIC,801902046,VTRAI3L24SWKVC5H,,2024-01-01


### Cleaning Incidències Comptadors Intelligents (ICI)

In [None]:
print("-"*10, file_names[2], "-" *10)
df_ICI.info(show_counts=True)

---------- fuites_experiencia_client.parquet ----------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1711270 entries, 0 to 1711269
Data columns (total 12 columns):
 #   Column             Non-Null Count    Dtype  
---  ------             --------------    -----  
 0   POLIZA_SUMINISTRO  1711270 non-null  object 
 1   FECHA              1711270 non-null  object 
 2   CONSUMO_REAL       1711270 non-null  int64  
 3   SECCIO_CENSAL      1711270 non-null  float64
 4   US_AIGUA_GEST      1711270 non-null  object 
 5   NUM_MUN_SGAB       1711270 non-null  float64
 6   NUM_DTE_MUNI       1711270 non-null  float64
 7   NUM_COMPLET        1711270 non-null  object 
 8   DATA_INST_COMP     1711270 non-null  object 
 9   MARCA_COMP         1711270 non-null  object 
 10  CODI_MODEL         1711270 non-null  float64
 11  DIAM_COMP          1711270 non-null  float64
dtypes: float64(5), int64(1), object(6)
memory usage: 156.7+ MB


In [24]:
#after cleaning
df_clean_ICI=pd.read_parquet("../data/clean_sample_incidencies_comptadors_intelligents.parquet")
df_clean_ICI.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048563 entries, 0 to 1048562
Data columns (total 12 columns):
 #   Column             Non-Null Count    Dtype         
---  ------             --------------    -----         
 0   POLIZA_SUMINISTRO  1048563 non-null  object        
 1   FECHA              1048563 non-null  datetime64[ns]
 2   CONSUMO_REAL       1048563 non-null  int64         
 3   SECCIO_CENSAL      1048563 non-null  float64       
 4   US_AIGUA_GEST      1048563 non-null  object        
 5   NUM_MUN_SGAB       1048563 non-null  float64       
 6   NUM_DTE_MUNI       1048563 non-null  float64       
 7   NUM_COMPLET        1048563 non-null  object        
 8   DATA_INST_COMP     1048563 non-null  datetime64[ns]
 9   MARCA_COMP         1048563 non-null  object        
 10  CODI_MODEL         1048563 non-null  float64       
 11  DIAM_COMP          1048563 non-null  float64       
dtypes: datetime64[ns](2), float64(5), int64(1), object(4)
memory usage: 96.0+ MB


In [25]:
#to visualise the dataset
df_clean_ICI.head()

Unnamed: 0,POLIZA_SUMINISTRO,FECHA,CONSUMO_REAL,SECCIO_CENSAL,US_AIGUA_GEST,NUM_MUN_SGAB,NUM_DTE_MUNI,NUM_COMPLET,DATA_INST_COMP,MARCA_COMP,CODI_MODEL,DIAM_COMP
0,VECWAVDUULZDSBOP,2021-01-01,1758,801903025.0,C,0.0,3.0,N5ER4KUNPNXOQQCE,2016-04-25,5557SZ47QZAZ56EQ,23.0,30.0
1,VECWAVDUULZDSBOP,2021-01-02,1854,801903025.0,C,0.0,3.0,N5ER4KUNPNXOQQCE,2016-04-25,5557SZ47QZAZ56EQ,23.0,30.0
2,VECWAVDUULZDSBOP,2021-01-03,1885,801903025.0,C,0.0,3.0,N5ER4KUNPNXOQQCE,2016-04-25,5557SZ47QZAZ56EQ,23.0,30.0
3,VECWAVDUULZDSBOP,2021-01-04,5676,801903025.0,C,0.0,3.0,N5ER4KUNPNXOQQCE,2016-04-25,5557SZ47QZAZ56EQ,23.0,30.0
4,VECWAVDUULZDSBOP,2021-01-05,4456,801903025.0,C,0.0,3.0,N5ER4KUNPNXOQQCE,2016-04-25,5557SZ47QZAZ56EQ,23.0,30.0
