# Data Cleaning and Aggregation Pipeline

## Overview
This Jupyter notebook contains the pipeline designed for cleaning, aggregating, and resampling our datasets of telecommunications data from city of Milan. 
This datasets contains information about SMS, calls, and internet traffic across different geographical squares over time. 
The objective is to prepare the data for further analysis by selecting relevant columns, handling missing values, aggregating data by time intervals, and resampling the traffic data to a consistent frequency.

## Dataset Structure
The datasets to be processed are expected to have the following columns:
- `square_id`: The ID of the geographical square.
- `time_interval`: The timestamp, indicating the interval of data collection.
- `country_code`: The country code for the data entry.
- `sms_in`, `sms_out`: The number of incoming and outgoing SMS messages.
- `call_in`, `call_out`: The number of incoming and outgoing calls.
- `internet_traffic`: The amount of internet traffic.

Files are in text format, with data entries separated by tabs, and are named according to the date of data collection (e.g., `sms-call-internet-mi-2013-11-01.txt`).

## Prerequisites
Before running this notebook, ensure that you have the following prerequisites:
1. **Python Environment**: A Python environment with Python 3.6 or later.
2. **Pandas Library**: The Pandas library installed for data manipulation and analysis. Install it using `pip install pandas` if not already installed.
3. **Dataset**: Ensure that your dataset files are placed in a designated folder. This code assumes that your raw datasets are stored in `./txt_dataset/` and the cleaned datasets will be saved to `./cleaned_dataset_30/`.


In [1]:
# necassary imports
import pandas as pd
import os

## Function Definitions
Below are the modular functions used in this notebook for reading and preprocessing data, aggregating data, resampling data, cleaning individual files, and processing multiple files in a directory.


In [2]:
def read_and_preprocess(file_path, selected_columns):
    """
    Reads and preprocesses the data from a given file.
    
    Parameters:
    - file_path: The path to the file to be read.
    - selected_columns: The columns to select from the dataset.
    
    Returns:
    A DataFrame with the selected columns, filled missing values, and processed data.
    """
    df = pd.read_csv(file_path, sep='\t', names=['square_id', 'time_interval', 'country_code', 'sms_in', 'sms_out', 'call_in', 'call_out', 'internet_traffic'])
    df.fillna(0, inplace=True)
    return df[selected_columns]

def aggregate_data(df):
    """
    Aggregates data by square_id and time_interval, sums the values, and resets the index.
    
    Parameters:
    - df: The DataFrame to be aggregated.
    
    Returns:
    An aggregated DataFrame with time_interval converted to datetime and set as index.
    """
    df = df.groupby(['square_id', 'time_interval']).sum().reset_index()
    df['time_interval'] = pd.to_datetime(df['time_interval'], unit="ms")
    df.set_index('time_interval', inplace=True)
    return df

def resample_data(df, frequency='H'):
    """
    Resamples the DataFrame to the specified frequency, summing the internet traffic.
    
    Parameters:
    - df: The DataFrame to be resampled.
    - frequency: The frequency for resampling, default is 'H' (hourly).
    
    Returns:
    A resampled DataFrame with internet traffic summed up.
    """
    return df.groupby('square_id').resample(frequency)['internet_traffic'].agg('sum').loc[lambda x: x>0].reset_index()

def clean_data(file_path, selected_columns, destination_path):
    """
    Cleans the data by reading, preprocessing, aggregating, and resampling,
    then saves the cleaned data to a CSV file.
    
    Parameters:
    - file_path: The path to the source file, excluding the extension.
    - selected_columns: The columns to be selected for processing.
    - destination_path: The path to save the cleaned data, excluding the extension.
    """
    df = read_and_preprocess(file_path + '.txt', selected_columns)
    df = aggregate_data(df)
    df = resample_data(df)
    df.to_csv(destination_path + '.csv', index=False)

def process_files(source_folder, destination_folder, selected_columns):
    """
    Processes all files in the source folder, cleans the data, and saves it to the destination folder.
    
    Parameters:
    - source_folder: The path to the folder containing source files.
    - destination_folder: The path to the folder where cleaned files will be saved.
    - selected_columns: The columns to be selected for processing.
    """
    file_list = [file for file in os.listdir(source_folder) if file.endswith('.txt')]
    for file_name in file_list:
        base_name = os.path.splitext(file_name)[0]
        clean_data(os.path.join(source_folder, base_name), selected_columns, os.path.join(destination_folder, 'cleaned-' + base_name))
        print(f"{destination_folder}cleaned-{base_name}.csv finished \n")


## Execution
This section executes the data cleaning pipeline on all files in the specified source folder, demonstrating the pipeline's application to real-world datasets.

In [None]:
# Define the folder paths and selected columns for cleaning the data
txt_dataset_folder = './txt_dataset/'
cleaned_dataset_folder = './cleaned_dataset/'
# cleaned_dataset_folder_30minutes_interval = './cleaned_dataset/'
selected_columns = ['square_id', 'time_interval', 'internet_traffic']

# Start processing files from the source folder and save cleaned data to the destination folder
process_files(txt_dataset_folder, cleaned_dataset_folder, selected_columns)

## Conclusion
This notebook provides a detailed walkthrough of the data cleaning and aggregation pipeline tailored for telecommunications data. By leveraging the modular functions defined and executed in this notebook, we can efficiently process raw datasets to extract meaningful insights and prepare the data for further analysis.
